Middle Wait Deadlines + Tighten Session Release State

This commit is contained in:
Alexey
2026-05-10 13:58:02 +03:00
parent eef2a38c75
commit beed6b4679
2 changed files with 52 additions and 8 deletions

View File

@@ -59,24 +59,30 @@ impl UserConnectionReservation {
} }
} }
async fn release(mut self) { fn mark_released(&mut self) -> bool {
if self.state != SessionReservationState::Active { if self.state != SessionReservationState::Active {
return false;
}
self.state = SessionReservationState::Released;
true
}
async fn release(mut self) {
if !self.mark_released() {
return; return;
} }
if self.tracks_ip { if self.tracks_ip {
self.ip_tracker.remove_ip(&self.user, self.ip).await; self.ip_tracker.remove_ip(&self.user, self.ip).await;
} }
self.state = SessionReservationState::Released;
self.stats.decrement_user_curr_connects(&self.user); self.stats.decrement_user_curr_connects(&self.user);
} }
} }
impl Drop for UserConnectionReservation { impl Drop for UserConnectionReservation {
fn drop(&mut self) { fn drop(&mut self) {
if self.state != SessionReservationState::Active { if !self.mark_released() {
return; return;
} }
self.state = SessionReservationState::Released;
self.stats.increment_session_drop_fallback_total(); self.stats.increment_session_drop_fallback_total();
self.stats.decrement_user_curr_connects(&self.user); self.stats.decrement_user_curr_connects(&self.user);
if self.tracks_ip { if self.tracks_ip {

View File

@@ -72,6 +72,7 @@ enum MiddleQuotaReserveError {
LimitExceeded, LimitExceeded,
Contended, Contended,
Cancelled, Cancelled,
DeadlineExceeded,
} }
#[derive(Default)] #[derive(Default)]
@@ -632,6 +633,7 @@ async fn reserve_user_quota_with_yield(
limit: u64, limit: u64,
stats: &Stats, stats: &Stats,
cancel: &CancellationToken, cancel: &CancellationToken,
deadline: Option<Instant>,
) -> std::result::Result<u64, MiddleQuotaReserveError> { ) -> std::result::Result<u64, MiddleQuotaReserveError> {
let mut backoff_ms = QUOTA_RESERVE_BACKOFF_MIN_MS; let mut backoff_ms = QUOTA_RESERVE_BACKOFF_MIN_MS;
let mut backoff_rounds = 0usize; let mut backoff_rounds = 0usize;
@@ -650,6 +652,10 @@ async fn reserve_user_quota_with_yield(
} }
tokio::task::yield_now().await; tokio::task::yield_now().await;
if deadline.is_some_and(|deadline| Instant::now() >= deadline) {
stats.increment_quota_contention_timeout_total();
return Err(MiddleQuotaReserveError::DeadlineExceeded);
}
tokio::select! { tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {} _ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}
_ = cancel.cancelled() => { _ = cancel.cancelled() => {
@@ -672,6 +678,7 @@ async fn wait_for_traffic_budget(
lease: Option<&Arc<TrafficLease>>, lease: Option<&Arc<TrafficLease>>,
direction: RateDirection, direction: RateDirection,
bytes: u64, bytes: u64,
deadline: Option<Instant>,
) -> Result<()> { ) -> Result<()> {
if bytes == 0 { if bytes == 0 {
return Ok(()); return Ok(());
@@ -689,6 +696,9 @@ async fn wait_for_traffic_budget(
} }
let wait_started_at = Instant::now(); let wait_started_at = Instant::now();
if deadline.is_some_and(|deadline| wait_started_at >= deadline) {
return Err(ProxyError::Proxy("traffic budget wait deadline exceeded".into()));
}
tokio::time::sleep(next_refill_delay()).await; tokio::time::sleep(next_refill_delay()).await;
let wait_ms = wait_started_at let wait_ms = wait_started_at
.elapsed() .elapsed()
@@ -711,6 +721,7 @@ async fn wait_for_traffic_budget_or_cancel(
bytes: u64, bytes: u64,
cancel: &CancellationToken, cancel: &CancellationToken,
stats: &Stats, stats: &Stats,
deadline: Option<Instant>,
) -> Result<()> { ) -> Result<()> {
if bytes == 0 { if bytes == 0 {
return Ok(()); return Ok(());
@@ -728,6 +739,10 @@ async fn wait_for_traffic_budget_or_cancel(
} }
let wait_started_at = Instant::now(); let wait_started_at = Instant::now();
if deadline.is_some_and(|deadline| wait_started_at >= deadline) {
stats.increment_flow_wait_middle_rate_limit_cancelled_total();
return Err(ProxyError::Proxy("traffic budget wait deadline exceeded".into()));
}
tokio::select! { tokio::select! {
_ = tokio::time::sleep(next_refill_delay()) => {} _ = tokio::time::sleep(next_refill_delay()) => {}
_ = cancel.cancelled() => { _ = cancel.cancelled() => {
@@ -1720,6 +1735,7 @@ where
traffic_lease.as_ref(), traffic_lease.as_ref(),
RateDirection::Up, RateDirection::Up,
payload.len() as u64, payload.len() as u64,
None,
) )
.await?; .await?;
forensics.bytes_c2me = forensics forensics.bytes_c2me = forensics
@@ -1734,6 +1750,7 @@ where
limit, limit,
stats.as_ref(), stats.as_ref(),
&flow_cancel, &flow_cancel,
None,
) )
.await .await
{ {
@@ -1756,6 +1773,12 @@ where
)); ));
break; break;
} }
Err(MiddleQuotaReserveError::DeadlineExceeded) => {
main_result = Err(ProxyError::Proxy(
"ME C->ME quota reservation deadline exceeded".into(),
));
break;
}
} }
stats.add_user_octets_from_handle(user_stats, payload.len() as u64); stats.add_user_octets_from_handle(user_stats, payload.len() as u64);
} else { } else {
@@ -2449,7 +2472,9 @@ where
let data_len = data.len() as u64; let data_len = data.len() as u64;
if let (Some(limit), Some(user_stats)) = (quota_limit, quota_user_stats) { if let (Some(limit), Some(user_stats)) = (quota_limit, quota_user_stats) {
let soft_limit = quota_soft_cap(limit, quota_soft_overshoot_bytes); let soft_limit = quota_soft_cap(limit, quota_soft_overshoot_bytes);
match reserve_user_quota_with_yield(user_stats, data_len, soft_limit, stats, cancel) match reserve_user_quota_with_yield(
user_stats, data_len, soft_limit, stats, cancel, None,
)
.await .await
{ {
Ok(_) => {} Ok(_) => {}
@@ -2469,6 +2494,11 @@ where
"ME D->C quota reservation cancelled".into(), "ME D->C quota reservation cancelled".into(),
)); ));
} }
Err(MiddleQuotaReserveError::DeadlineExceeded) => {
return Err(ProxyError::Proxy(
"ME D->C quota reservation deadline exceeded".into(),
));
}
} }
} }
wait_for_traffic_budget_or_cancel( wait_for_traffic_budget_or_cancel(
@@ -2477,6 +2507,7 @@ where
data_len, data_len,
cancel, cancel,
stats, stats,
None,
) )
.await?; .await?;
@@ -2516,7 +2547,14 @@ where
} else { } else {
trace!(conn_id, confirm, "ME->C quickack"); trace!(conn_id, confirm, "ME->C quickack");
} }
wait_for_traffic_budget_or_cancel(traffic_lease, RateDirection::Down, 4, cancel, stats) wait_for_traffic_budget_or_cancel(
traffic_lease,
RateDirection::Down,
4,
cancel,
stats,
None,
)
.await?; .await?;
write_client_ack(client_writer, proto_tag, confirm).await?; write_client_ack(client_writer, proto_tag, confirm).await?;
stats.increment_me_d2c_ack_frames_total(); stats.increment_me_d2c_ack_frames_total();