From 10c7cb2e0c70d553e6a797d7ad94fb80f3d1e5d5 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 10 May 2026 14:12:15 +0300 Subject: [PATCH] Middle Relay Cancellation Errors --- src/error.rs | 9 +++++++++ src/proxy/middle_relay.rs | 10 +++++----- .../tests/middle_relay_atomic_quota_invariant_tests.rs | 2 +- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/error.rs b/src/error.rs index 9a839dd..ff58f4e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -228,6 +228,15 @@ pub enum ProxyError { #[error("Session terminated")] RouteSwitched, + #[error("Traffic budget wait cancelled")] + TrafficBudgetWaitCancelled, + + #[error("Traffic budget wait deadline exceeded")] + TrafficBudgetWaitDeadlineExceeded, + + #[error("ME client writer cancelled")] + MiddleClientWriterCancelled, + // ============= Config Errors ============= #[error("Config error: {0}")] Config(String), diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 6b0329c..8fadf9b 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -697,7 +697,7 @@ async fn wait_for_traffic_budget( let wait_started_at = Instant::now(); if deadline.is_some_and(|deadline| wait_started_at >= deadline) { - return Err(ProxyError::Proxy("traffic budget wait deadline exceeded".into())); + return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded); } tokio::time::sleep(next_refill_delay()).await; let wait_ms = wait_started_at @@ -741,13 +741,13 @@ async fn wait_for_traffic_budget_or_cancel( let wait_started_at = Instant::now(); if deadline.is_some_and(|deadline| wait_started_at >= deadline) { stats.increment_flow_wait_middle_rate_limit_cancelled_total(); - return Err(ProxyError::Proxy("traffic budget wait deadline exceeded".into())); + return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded); } tokio::select! { _ = tokio::time::sleep(next_refill_delay()) => {} _ = cancel.cancelled() => { stats.increment_flow_wait_middle_rate_limit_cancelled_total(); - return Err(ProxyError::Proxy("traffic budget wait cancelled".into())); + return Err(ProxyError::TrafficBudgetWaitCancelled); } } let wait_ms = wait_started_at @@ -2763,7 +2763,7 @@ where { tokio::select! { result = client_writer.write_all(bytes) => result.map_err(ProxyError::Io), - _ = cancel.cancelled() => Err(ProxyError::Proxy("ME client writer cancelled".into())), + _ = cancel.cancelled() => Err(ProxyError::MiddleClientWriterCancelled), } } @@ -2776,7 +2776,7 @@ where { tokio::select! { result = client_writer.flush() => result.map_err(ProxyError::Io), - _ = cancel.cancelled() => Err(ProxyError::Proxy("ME client writer cancelled".into())), + _ = cancel.cancelled() => Err(ProxyError::MiddleClientWriterCancelled), } } diff --git a/src/proxy/tests/middle_relay_atomic_quota_invariant_tests.rs b/src/proxy/tests/middle_relay_atomic_quota_invariant_tests.rs index e1b3511..5cf6ac5 100644 --- a/src/proxy/tests/middle_relay_atomic_quota_invariant_tests.rs +++ b/src/proxy/tests/middle_relay_atomic_quota_invariant_tests.rs @@ -251,7 +251,7 @@ async fn me_writer_data_write_obeys_flow_cancellation() { .await; assert!( - matches!(result, Err(ProxyError::Proxy(ref message)) if message == "ME client writer cancelled"), + matches!(result, Err(ProxyError::MiddleClientWriterCancelled)), "cancelled middle writer must return a bounded cancellation error" ); assert_eq!(