diff --git a/src/proxy/middle_relay/quota.rs b/src/proxy/middle_relay/quota.rs index 618056c..3a04c00 100644 --- a/src/proxy/middle_relay/quota.rs +++ b/src/proxy/middle_relay/quota.rs @@ -41,11 +41,12 @@ pub(super) async fn reserve_user_quota_with_yield( return Err(MiddleQuotaReserveError::DeadlineExceeded); } tokio::select! { - _ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {} + biased; _ = cancel.cancelled() => { stats.increment_quota_acquire_cancelled_total(); return Err(MiddleQuotaReserveError::Cancelled); } + _ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {} } backoff_rounds = backoff_rounds.saturating_add(1); if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS { @@ -128,11 +129,12 @@ pub(super) async fn wait_for_traffic_budget_or_cancel( return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded); } tokio::select! { - _ = tokio::time::sleep(next_refill_delay()) => {} + biased; _ = cancel.cancelled() => { stats.increment_flow_wait_middle_rate_limit_cancelled_total(); return Err(ProxyError::TrafficBudgetWaitCancelled); } + _ = tokio::time::sleep(next_refill_delay()) => {} } let wait_ms = wait_started_at .elapsed() diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 35021aa..e9fc601 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -52,6 +52,8 @@ async fn writer_command_loop( ) -> Result<()> { loop { tokio::select! { + biased; + _ = cancel.cancelled() => return Ok(()), cmd = rx.recv() => { match cmd { Some(WriterCommand::Data(payload)) => { @@ -69,7 +71,6 @@ async fn writer_command_loop( Some(WriterCommand::Close) | None => return Ok(()), } } - _ = cancel.cancelled() => return Ok(()), } } } @@ -108,6 +109,7 @@ async fn ping_loop( Duration::from_secs(wait) }; tokio::select! { + biased; _ = cancel_ping_token.cancelled() => return, _ = tokio::time::sleep(startup_jitter) => {} } @@ -131,6 +133,7 @@ async fn ping_loop( Duration::from_secs(secs) }; tokio::select! { + biased; _ = cancel_ping_token.cancelled() => return, _ = tokio::time::sleep(wait) => {} } @@ -191,6 +194,7 @@ async fn rpc_proxy_req_signal_loop( }; tokio::select! { + biased; _ = cancel_signal.cancelled() => return, _ = tokio::time::sleep(Duration::from_millis(startup_jitter_ms)) => {} } @@ -207,6 +211,7 @@ async fn rpc_proxy_req_signal_loop( }; tokio::select! { + biased; _ = cancel_signal.cancelled() => return, _ = tokio::time::sleep(wait) => {} } diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index 66bf83d..adad859 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -269,12 +269,13 @@ pub(crate) async fn reader_loop( fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed)); let mut retry_only = false; let n = tokio::select! { + biased; + _ = cancel.cancelled() => return Ok(()), res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?, _ = tokio::time::sleep(backlog_retry_delay), if backlog_retry_enabled => { retry_only = true; 0usize }, - _ = cancel.cancelled() => return Ok(()), }; if retry_only { let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);