Middle Relay Cancellation Errors

This commit is contained in:
Alexey
2026-05-10 14:12:15 +03:00
parent 900b574fb8
commit 10c7cb2e0c
3 changed files with 15 additions and 6 deletions

View File

@@ -228,6 +228,15 @@ pub enum ProxyError {
#[error("Session terminated")] #[error("Session terminated")]
RouteSwitched, RouteSwitched,
#[error("Traffic budget wait cancelled")]
TrafficBudgetWaitCancelled,
#[error("Traffic budget wait deadline exceeded")]
TrafficBudgetWaitDeadlineExceeded,
#[error("ME client writer cancelled")]
MiddleClientWriterCancelled,
// ============= Config Errors ============= // ============= Config Errors =============
#[error("Config error: {0}")] #[error("Config error: {0}")]
Config(String), Config(String),

View File

@@ -697,7 +697,7 @@ async fn wait_for_traffic_budget(
let wait_started_at = Instant::now(); let wait_started_at = Instant::now();
if deadline.is_some_and(|deadline| wait_started_at >= deadline) { if deadline.is_some_and(|deadline| wait_started_at >= deadline) {
return Err(ProxyError::Proxy("traffic budget wait deadline exceeded".into())); return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded);
} }
tokio::time::sleep(next_refill_delay()).await; tokio::time::sleep(next_refill_delay()).await;
let wait_ms = wait_started_at let wait_ms = wait_started_at
@@ -741,13 +741,13 @@ async fn wait_for_traffic_budget_or_cancel(
let wait_started_at = Instant::now(); let wait_started_at = Instant::now();
if deadline.is_some_and(|deadline| wait_started_at >= deadline) { if deadline.is_some_and(|deadline| wait_started_at >= deadline) {
stats.increment_flow_wait_middle_rate_limit_cancelled_total(); stats.increment_flow_wait_middle_rate_limit_cancelled_total();
return Err(ProxyError::Proxy("traffic budget wait deadline exceeded".into())); return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded);
} }
tokio::select! { tokio::select! {
_ = tokio::time::sleep(next_refill_delay()) => {} _ = tokio::time::sleep(next_refill_delay()) => {}
_ = cancel.cancelled() => { _ = cancel.cancelled() => {
stats.increment_flow_wait_middle_rate_limit_cancelled_total(); stats.increment_flow_wait_middle_rate_limit_cancelled_total();
return Err(ProxyError::Proxy("traffic budget wait cancelled".into())); return Err(ProxyError::TrafficBudgetWaitCancelled);
} }
} }
let wait_ms = wait_started_at let wait_ms = wait_started_at
@@ -2763,7 +2763,7 @@ where
{ {
tokio::select! { tokio::select! {
result = client_writer.write_all(bytes) => result.map_err(ProxyError::Io), result = client_writer.write_all(bytes) => result.map_err(ProxyError::Io),
_ = cancel.cancelled() => Err(ProxyError::Proxy("ME client writer cancelled".into())), _ = cancel.cancelled() => Err(ProxyError::MiddleClientWriterCancelled),
} }
} }
@@ -2776,7 +2776,7 @@ where
{ {
tokio::select! { tokio::select! {
result = client_writer.flush() => result.map_err(ProxyError::Io), result = client_writer.flush() => result.map_err(ProxyError::Io),
_ = cancel.cancelled() => Err(ProxyError::Proxy("ME client writer cancelled".into())), _ = cancel.cancelled() => Err(ProxyError::MiddleClientWriterCancelled),
} }
} }

View File

@@ -251,7 +251,7 @@ async fn me_writer_data_write_obeys_flow_cancellation() {
.await; .await;
assert!( assert!(
matches!(result, Err(ProxyError::Proxy(ref message)) if message == "ME client writer cancelled"), matches!(result, Err(ProxyError::MiddleClientWriterCancelled)),
"cancelled middle writer must return a bounded cancellation error" "cancelled middle writer must return a bounded cancellation error"
); );
assert_eq!( assert_eq!(