Observability + Cancellation for Middle Quota + Traffic Waits

This commit is contained in:
Alexey
2026-05-10 13:38:11 +03:00
parent 3f9ac87daf
commit e10c070dc1
3 changed files with 131 additions and 10 deletions

View File

@@ -784,6 +784,20 @@ async fn render_metrics(
0
}
);
let _ = writeln!(
out,
"# HELP telemt_quota_acquire_cancelled_total Quota acquisitions cancelled before reservation completed"
);
let _ = writeln!(out, "# TYPE telemt_quota_acquire_cancelled_total counter");
let _ = writeln!(
out,
"telemt_quota_acquire_cancelled_total {}",
if core_enabled {
stats.get_quota_acquire_cancelled_total()
} else {
0
}
);
let _ = writeln!(
out,
@@ -2055,6 +2069,43 @@ async fn render_metrics(
0
}
);
let _ = writeln!(
out,
"# HELP telemt_flow_wait_events_total Flow wait events by reason, direction, and outcome"
);
let _ = writeln!(out, "# TYPE telemt_flow_wait_events_total counter");
let _ = writeln!(
out,
"telemt_flow_wait_events_total{{reason=\"middle_rate_limit\",direction=\"down\",outcome=\"waited\"}} {}",
if core_enabled {
stats.get_flow_wait_middle_rate_limit_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_flow_wait_events_total{{reason=\"middle_rate_limit\",direction=\"down\",outcome=\"cancelled\"}} {}",
if core_enabled {
stats.get_flow_wait_middle_rate_limit_cancelled_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_flow_wait_ms_total Flow wait time in milliseconds by reason and direction"
);
let _ = writeln!(out, "# TYPE telemt_flow_wait_ms_total counter");
let _ = writeln!(
out,
"telemt_flow_wait_ms_total{{reason=\"middle_rate_limit\",direction=\"down\"}} {}",
if core_enabled {
stats.get_flow_wait_middle_rate_limit_ms_total()
} else {
0
}
);
let _ = writeln!(
out,

View File

@@ -69,6 +69,12 @@ const QUOTA_RESERVE_BACKOFF_MAX_MS: u64 = 16;
const QUOTA_RESERVE_MAX_BACKOFF_ROUNDS: usize = 16;
const ME_CHILD_JOIN_TIMEOUT: Duration = Duration::from_secs(2);
enum MiddleQuotaReserveError {
LimitExceeded,
Contended,
Cancelled,
}
#[derive(Default)]
pub(crate) struct DesyncDedupRotationState {
current_started_at: Option<Instant>,
@@ -626,7 +632,8 @@ async fn reserve_user_quota_with_yield(
bytes: u64,
limit: u64,
stats: &Stats,
) -> std::result::Result<u64, QuotaReserveError> {
cancel: &CancellationToken,
) -> std::result::Result<u64, MiddleQuotaReserveError> {
let mut backoff_ms = QUOTA_RESERVE_BACKOFF_MIN_MS;
let mut backoff_rounds = 0usize;
loop {
@@ -634,7 +641,7 @@ async fn reserve_user_quota_with_yield(
match user_stats.quota_try_reserve(bytes, limit) {
Ok(total) => return Ok(total),
Err(QuotaReserveError::LimitExceeded) => {
return Err(QuotaReserveError::LimitExceeded);
return Err(MiddleQuotaReserveError::LimitExceeded);
}
Err(QuotaReserveError::Contended) => {
stats.increment_quota_contention_total();
@@ -644,11 +651,17 @@ async fn reserve_user_quota_with_yield(
}
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}
_ = cancel.cancelled() => {
stats.increment_quota_acquire_cancelled_total();
return Err(MiddleQuotaReserveError::Cancelled);
}
}
backoff_rounds = backoff_rounds.saturating_add(1);
if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS {
stats.increment_quota_contention_timeout_total();
return Err(QuotaReserveError::Contended);
return Err(MiddleQuotaReserveError::Contended);
}
backoff_ms = backoff_ms
.saturating_mul(2)
@@ -698,6 +711,7 @@ async fn wait_for_traffic_budget_or_cancel(
direction: RateDirection,
bytes: u64,
cancel: &CancellationToken,
stats: &Stats,
) -> Result<()> {
if bytes == 0 {
return Ok(());
@@ -718,6 +732,7 @@ async fn wait_for_traffic_budget_or_cancel(
tokio::select! {
_ = tokio::time::sleep(next_refill_delay()) => {}
_ = cancel.cancelled() => {
stats.increment_flow_wait_middle_rate_limit_cancelled_total();
return Err(ProxyError::Proxy("traffic budget wait cancelled".into()));
}
}
@@ -731,6 +746,7 @@ async fn wait_for_traffic_budget_or_cancel(
consume.blocked_cidr,
wait_ms,
);
stats.observe_flow_wait_middle_rate_limit_ms(wait_ms);
}
Ok(())
@@ -1718,22 +1734,29 @@ where
payload.len() as u64,
limit,
stats.as_ref(),
&flow_cancel,
)
.await
{
Ok(_) => {}
Err(QuotaReserveError::LimitExceeded) => {
Err(MiddleQuotaReserveError::LimitExceeded) => {
main_result = Err(ProxyError::DataQuotaExceeded {
user: user.clone(),
});
break;
}
Err(QuotaReserveError::Contended) => {
Err(MiddleQuotaReserveError::Contended) => {
main_result = Err(ProxyError::Proxy(
"ME C->ME quota reservation contended".into(),
));
break;
}
Err(MiddleQuotaReserveError::Cancelled) => {
main_result = Err(ProxyError::Proxy(
"ME C->ME quota reservation cancelled".into(),
));
break;
}
}
stats.add_user_octets_from_handle(user_stats, payload.len() as u64);
} else {
@@ -2430,19 +2453,26 @@ where
let data_len = data.len() as u64;
if let (Some(limit), Some(user_stats)) = (quota_limit, quota_user_stats) {
let soft_limit = quota_soft_cap(limit, quota_soft_overshoot_bytes);
match reserve_user_quota_with_yield(user_stats, data_len, soft_limit, stats).await {
match reserve_user_quota_with_yield(user_stats, data_len, soft_limit, stats, cancel)
.await
{
Ok(_) => {}
Err(QuotaReserveError::LimitExceeded) => {
Err(MiddleQuotaReserveError::LimitExceeded) => {
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite);
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
}
Err(QuotaReserveError::Contended) => {
Err(MiddleQuotaReserveError::Contended) => {
return Err(ProxyError::Proxy(
"ME D->C quota reservation contended".into(),
));
}
Err(MiddleQuotaReserveError::Cancelled) => {
return Err(ProxyError::Proxy(
"ME D->C quota reservation cancelled".into(),
));
}
}
}
wait_for_traffic_budget_or_cancel(
@@ -2450,6 +2480,7 @@ where
RateDirection::Down,
data_len,
cancel,
stats,
)
.await?;
@@ -2489,7 +2520,7 @@ where
} else {
trace!(conn_id, confirm, "ME->C quickack");
}
wait_for_traffic_budget_or_cancel(traffic_lease, RateDirection::Down, 4, cancel)
wait_for_traffic_budget_or_cancel(traffic_lease, RateDirection::Down, 4, cancel, stats)
.await?;
write_client_ack(client_writer, proto_tag, confirm).await?;
stats.increment_me_d2c_ack_frames_total();

View File

@@ -277,10 +277,14 @@ pub struct Stats {
quota_refund_bytes_total: AtomicU64,
quota_contention_total: AtomicU64,
quota_contention_timeout_total: AtomicU64,
quota_acquire_cancelled_total: AtomicU64,
quota_write_fail_bytes_total: AtomicU64,
quota_write_fail_events_total: AtomicU64,
me_child_join_timeout_total: AtomicU64,
me_child_abort_total: AtomicU64,
flow_wait_middle_rate_limit_total: AtomicU64,
flow_wait_middle_rate_limit_cancelled_total: AtomicU64,
flow_wait_middle_rate_limit_ms_total: AtomicU64,
telemetry_core_enabled: AtomicBool,
telemetry_user_enabled: AtomicBool,
telemetry_me_level: AtomicU8,
@@ -1459,6 +1463,12 @@ impl Stats {
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_quota_acquire_cancelled_total(&self) {
if self.telemetry_core_enabled() {
self.quota_acquire_cancelled_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn add_quota_write_fail_bytes_total(&self, bytes: u64) {
if self.telemetry_core_enabled() {
self.quota_write_fail_bytes_total
@@ -1482,6 +1492,20 @@ impl Stats {
self.me_child_abort_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn observe_flow_wait_middle_rate_limit_ms(&self, wait_ms: u64) {
if self.telemetry_core_enabled() {
self.flow_wait_middle_rate_limit_total
.fetch_add(1, Ordering::Relaxed);
self.flow_wait_middle_rate_limit_ms_total
.fetch_add(wait_ms, Ordering::Relaxed);
}
}
pub fn increment_flow_wait_middle_rate_limit_cancelled_total(&self) {
if self.telemetry_core_enabled() {
self.flow_wait_middle_rate_limit_cancelled_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_endpoint_quarantine_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_endpoint_quarantine_total
@@ -2326,6 +2350,9 @@ impl Stats {
self.quota_contention_timeout_total
.load(Ordering::Relaxed)
}
pub fn get_quota_acquire_cancelled_total(&self) -> u64 {
self.quota_acquire_cancelled_total.load(Ordering::Relaxed)
}
pub fn get_quota_write_fail_bytes_total(&self) -> u64 {
self.quota_write_fail_bytes_total.load(Ordering::Relaxed)
}
@@ -2338,6 +2365,18 @@ impl Stats {
pub fn get_me_child_abort_total(&self) -> u64 {
self.me_child_abort_total.load(Ordering::Relaxed)
}
pub fn get_flow_wait_middle_rate_limit_total(&self) -> u64 {
self.flow_wait_middle_rate_limit_total
.load(Ordering::Relaxed)
}
pub fn get_flow_wait_middle_rate_limit_cancelled_total(&self) -> u64 {
self.flow_wait_middle_rate_limit_cancelled_total
.load(Ordering::Relaxed)
}
pub fn get_flow_wait_middle_rate_limit_ms_total(&self) -> u64 {
self.flow_wait_middle_rate_limit_ms_total
.load(Ordering::Relaxed)
}
pub fn increment_user_connects(&self, user: &str) {
if !self.telemetry_user_enabled() {