Hardened Relays and API Security paths

This commit is contained in:
Alexey
2026-05-10 13:22:54 +03:00
parent b2aa9b8c9e
commit ba1d9be5d4
2 changed files with 125 additions and 55 deletions

View File

@@ -65,6 +65,8 @@ const ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES: usize = 128 * 1024;
const QUOTA_RESERVE_SPIN_RETRIES: usize = 32; const QUOTA_RESERVE_SPIN_RETRIES: usize = 32;
const QUOTA_RESERVE_BACKOFF_MIN_MS: u64 = 1; const QUOTA_RESERVE_BACKOFF_MIN_MS: u64 = 1;
const QUOTA_RESERVE_BACKOFF_MAX_MS: u64 = 16; const QUOTA_RESERVE_BACKOFF_MAX_MS: u64 = 16;
const QUOTA_RESERVE_MAX_BACKOFF_ROUNDS: usize = 16;
const ME_CHILD_JOIN_TIMEOUT: Duration = Duration::from_secs(2);
#[derive(Default)] #[derive(Default)]
pub(crate) struct DesyncDedupRotationState { pub(crate) struct DesyncDedupRotationState {
@@ -624,6 +626,7 @@ async fn reserve_user_quota_with_yield(
limit: u64, limit: u64,
) -> std::result::Result<u64, QuotaReserveError> { ) -> std::result::Result<u64, QuotaReserveError> {
let mut backoff_ms = QUOTA_RESERVE_BACKOFF_MIN_MS; let mut backoff_ms = QUOTA_RESERVE_BACKOFF_MIN_MS;
let mut backoff_rounds = 0usize;
loop { loop {
for _ in 0..QUOTA_RESERVE_SPIN_RETRIES { for _ in 0..QUOTA_RESERVE_SPIN_RETRIES {
match user_stats.quota_try_reserve(bytes, limit) { match user_stats.quota_try_reserve(bytes, limit) {
@@ -637,6 +640,10 @@ async fn reserve_user_quota_with_yield(
tokio::task::yield_now().await; tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(backoff_ms)).await; tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
backoff_rounds = backoff_rounds.saturating_add(1);
if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS {
return Err(QuotaReserveError::Contended);
}
backoff_ms = backoff_ms backoff_ms = backoff_ms
.saturating_mul(2) .saturating_mul(2)
.min(QUOTA_RESERVE_BACKOFF_MAX_MS); .min(QUOTA_RESERVE_BACKOFF_MAX_MS);
@@ -1169,7 +1176,7 @@ where
let c2me_byte_semaphore = Arc::new(Semaphore::new(c2me_byte_budget)); let c2me_byte_semaphore = Arc::new(Semaphore::new(c2me_byte_budget));
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity); let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity);
let me_pool_c2me = me_pool.clone(); let me_pool_c2me = me_pool.clone();
let c2me_sender = tokio::spawn(async move { let mut c2me_sender = tokio::spawn(async move {
let mut sent_since_yield = 0usize; let mut sent_since_yield = 0usize;
while let Some(cmd) = c2me_rx.recv().await { while let Some(cmd) = c2me_rx.recv().await {
match cmd { match cmd {
@@ -1214,7 +1221,7 @@ where
let last_downstream_activity_ms_clone = last_downstream_activity_ms.clone(); let last_downstream_activity_ms_clone = last_downstream_activity_ms.clone();
let bytes_me2c_clone = bytes_me2c.clone(); let bytes_me2c_clone = bytes_me2c.clone();
let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config); let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config);
let me_writer = tokio::spawn(async move { let mut me_writer = tokio::spawn(async move {
let mut writer = crypto_writer; let mut writer = crypto_writer;
let mut frame_buf = Vec::with_capacity(16 * 1024); let mut frame_buf = Vec::with_capacity(16 * 1024);
let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes; let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes;
@@ -1729,14 +1736,26 @@ where
} }
drop(c2me_tx); drop(c2me_tx);
let c2me_result = c2me_sender let c2me_result = match timeout(ME_CHILD_JOIN_TIMEOUT, &mut c2me_sender).await {
.await Ok(joined) => {
.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME sender join error: {e}")))); joined.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME sender join error: {e}"))))
}
Err(_) => {
c2me_sender.abort();
Err(ProxyError::Proxy("ME sender join timeout".into()))
}
};
let _ = stop_tx.send(()); let _ = stop_tx.send(());
let mut writer_result = me_writer let mut writer_result = match timeout(ME_CHILD_JOIN_TIMEOUT, &mut me_writer).await {
.await Ok(joined) => {
.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}")))); joined.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}"))))
}
Err(_) => {
me_writer.abort();
Err(ProxyError::Proxy("ME writer join timeout".into()))
}
};
// When client closes, but ME channel stopped as unregistered - it isnt error // When client closes, but ME channel stopped as unregistered - it isnt error
if client_closed if client_closed

View File

@@ -215,6 +215,7 @@ struct StatsIo<S> {
c2s_rate_debt_bytes: u64, c2s_rate_debt_bytes: u64,
c2s_wait: RateWaitState, c2s_wait: RateWaitState,
s2c_wait: RateWaitState, s2c_wait: RateWaitState,
quota_wait: RateWaitState,
quota_limit: Option<u64>, quota_limit: Option<u64>,
quota_exceeded: Arc<AtomicBool>, quota_exceeded: Arc<AtomicBool>,
quota_bytes_since_check: u64, quota_bytes_since_check: u64,
@@ -275,6 +276,7 @@ impl<S> StatsIo<S> {
c2s_rate_debt_bytes: 0, c2s_rate_debt_bytes: 0,
c2s_wait: RateWaitState::default(), c2s_wait: RateWaitState::default(),
s2c_wait: RateWaitState::default(), s2c_wait: RateWaitState::default(),
quota_wait: RateWaitState::default(),
quota_limit, quota_limit,
quota_exceeded, quota_exceeded,
quota_bytes_since_check: 0, quota_bytes_since_check: 0,
@@ -353,6 +355,11 @@ impl<S> StatsIo<S> {
Poll::Ready(()) Poll::Ready(())
} }
fn arm_quota_wait(&mut self, cx: &mut Context<'_>) -> Poll<()> {
Self::arm_wait(&mut self.quota_wait, false, false);
Self::poll_wait(&mut self.quota_wait, cx, None, RateDirection::Up)
}
} }
#[derive(Debug)] #[derive(Debug)]
@@ -430,8 +437,13 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
if this.settle_c2s_rate_debt(cx).is_pending() { if this.settle_c2s_rate_debt(cx).is_pending() {
return Poll::Pending; return Poll::Pending;
} }
if buf.remaining() == 0 {
return Pin::new(&mut this.inner).poll_read(cx, buf);
}
let mut remaining_before = None; let mut remaining_before = None;
let mut reserved_read_bytes = 0u64;
let mut read_limit = buf.remaining();
if let Some(limit) = this.quota_limit { if let Some(limit) = this.quota_limit {
let used_before = this.user_stats.quota_used(); let used_before = this.user_stats.quota_used();
let remaining = limit.saturating_sub(used_before); let remaining = limit.saturating_sub(used_before);
@@ -440,50 +452,77 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
return Poll::Ready(Err(quota_io_error())); return Poll::Ready(Err(quota_io_error()));
} }
remaining_before = Some(remaining); remaining_before = Some(remaining);
read_limit = read_limit.min(remaining as usize);
if read_limit == 0 {
this.quota_exceeded.store(true, Ordering::Release);
return Poll::Ready(Err(quota_io_error()));
}
let desired = read_limit as u64;
let mut reserve_rounds = 0usize;
while reserved_read_bytes == 0 {
for _ in 0..QUOTA_RESERVE_SPIN_RETRIES {
match this.user_stats.quota_try_reserve(desired, limit) {
Ok(_) => {
reserved_read_bytes = desired;
break;
}
Err(crate::stats::QuotaReserveError::LimitExceeded) => {
this.quota_exceeded.store(true, Ordering::Release);
return Poll::Ready(Err(quota_io_error()));
}
Err(crate::stats::QuotaReserveError::Contended) => {}
}
}
if reserved_read_bytes == 0 {
reserve_rounds = reserve_rounds.saturating_add(1);
if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS {
if this.arm_quota_wait(cx).is_pending() {
return Poll::Pending;
}
reserve_rounds = 0;
}
}
}
} }
let before = buf.filled().len(); let limited_read = read_limit < buf.remaining();
let read_result = if limited_read {
let mut limited_buf = ReadBuf::new(buf.initialize_unfilled_to(read_limit));
match Pin::new(&mut this.inner).poll_read(cx, &mut limited_buf) {
Poll::Ready(Ok(())) => {
let n = limited_buf.filled().len();
buf.advance(n);
Poll::Ready(Ok(n))
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
} else {
let before = buf.filled().len();
match Pin::new(&mut this.inner).poll_read(cx, buf) {
Poll::Ready(Ok(())) => {
let n = buf.filled().len() - before;
Poll::Ready(Ok(n))
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
};
match Pin::new(&mut this.inner).poll_read(cx, buf) { match read_result {
Poll::Ready(Ok(())) => { Poll::Ready(Ok(n)) => {
let n = buf.filled().len() - before; if reserved_read_bytes > n as u64 {
refund_reserved_quota_bytes(
this.user_stats.as_ref(),
reserved_read_bytes - n as u64,
);
}
if n > 0 { if n > 0 {
let n_to_charge = n as u64; let n_to_charge = n as u64;
if let (Some(limit), Some(remaining)) = (this.quota_limit, remaining_before) { if let Some(remaining) = remaining_before {
let mut reserved_total = None;
let mut reserve_rounds = 0usize;
while reserved_total.is_none() {
let mut saw_contention = false;
for _ in 0..QUOTA_RESERVE_SPIN_RETRIES {
match this.user_stats.quota_try_reserve(n_to_charge, limit) {
Ok(total) => {
reserved_total = Some(total);
break;
}
Err(crate::stats::QuotaReserveError::LimitExceeded) => {
this.quota_exceeded.store(true, Ordering::Release);
buf.set_filled(before);
return Poll::Ready(Err(quota_io_error()));
}
Err(crate::stats::QuotaReserveError::Contended) => {
saw_contention = true;
}
}
}
if reserved_total.is_none() {
reserve_rounds = reserve_rounds.saturating_add(1);
if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS {
this.quota_exceeded.store(true, Ordering::Release);
buf.set_filled(before);
return Poll::Ready(Err(quota_io_error()));
}
if saw_contention {
std::thread::yield_now();
}
}
}
if should_immediate_quota_check(remaining, n_to_charge) { if should_immediate_quota_check(remaining, n_to_charge) {
this.quota_bytes_since_check = 0; this.quota_bytes_since_check = 0;
} else { } else {
@@ -495,9 +534,11 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
} }
} }
if reserved_total.unwrap_or(0) >= limit { }
this.quota_exceeded.store(true, Ordering::Release); if let Some(limit) = this.quota_limit
} && this.user_stats.quota_used() >= limit
{
this.quota_exceeded.store(true, Ordering::Release);
} }
// C→S: client sent data // C→S: client sent data
@@ -521,7 +562,18 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
} }
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
other => other, Poll::Pending => {
if reserved_read_bytes > 0 {
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_read_bytes);
}
Poll::Pending
}
Poll::Ready(Err(err)) => {
if reserved_read_bytes > 0 {
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_read_bytes);
}
Poll::Ready(Err(err))
}
} }
} }
} }
@@ -614,11 +666,10 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
if let Some(lease) = this.traffic_lease.as_ref() { if let Some(lease) = this.traffic_lease.as_ref() {
lease.refund(RateDirection::Down, shaper_reserved_bytes); lease.refund(RateDirection::Down, shaper_reserved_bytes);
} }
this.quota_exceeded.store(true, Ordering::Release); let _ = this.arm_quota_wait(cx);
return Poll::Ready(Err(quota_io_error())); return Poll::Pending;
} } else if saw_contention {
if saw_contention { std::hint::spin_loop();
std::thread::yield_now();
} }
} }
} }