From 6cb72b3b6c8debd351d069e13274452c935d511c Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 10 May 2026 13:50:36 +0300 Subject: [PATCH] Explicit Reasons of Session Fallback Cleanup + ME Close --- src/error.rs | 3 +++ src/maestro/listeners.rs | 2 +- src/metrics.rs | 14 ++++++++++++++ src/proxy/client.rs | 19 +++++++++++++------ src/proxy/middle_relay.rs | 9 +++------ src/stats/mod.rs | 10 ++++++++++ 6 files changed, 44 insertions(+), 13 deletions(-) diff --git a/src/error.rs b/src/error.rs index 49c8c81..889cbcd 100644 --- a/src/error.rs +++ b/src/error.rs @@ -222,6 +222,9 @@ pub enum ProxyError { #[error("Proxy error: {0}")] Proxy(String), + #[error("ME connection lost")] + MiddleConnectionLost, + // ============= Config Errors ============= #[error("Config error: {0}")] Config(String), diff --git a/src/maestro/listeners.rs b/src/maestro/listeners.rs index 84bd0f1..501a476 100644 --- a/src/maestro/listeners.rs +++ b/src/maestro/listeners.rs @@ -494,7 +494,7 @@ pub(crate) fn spawn_tcp_accept_loops( let me_closed = matches!( &e, - crate::error::ProxyError::Proxy(msg) if msg == "ME connection lost" + crate::error::ProxyError::MiddleConnectionLost ); let route_switched = matches!( &e, diff --git a/src/metrics.rs b/src/metrics.rs index da290c3..620c4cd 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -2106,6 +2106,20 @@ async fn render_metrics( 0 } ); + let _ = writeln!( + out, + "# HELP telemt_session_drop_fallback_total Session reservations cleaned by Drop instead of explicit async release" + ); + let _ = writeln!(out, "# TYPE telemt_session_drop_fallback_total counter"); + let _ = writeln!( + out, + "telemt_session_drop_fallback_total {}", + if core_enabled { + stats.get_session_drop_fallback_total() + } else { + 0 + } + ); let _ = writeln!( out, diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 34d4960..2188e14 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -32,7 +32,13 @@ struct UserConnectionReservation { user: String, ip: IpAddr, tracks_ip: bool, - active: bool, + state: SessionReservationState, +} + +#[derive(Clone, Copy, PartialEq, Eq)] +enum SessionReservationState { + Active, + Released, } impl UserConnectionReservation { @@ -49,28 +55,29 @@ impl UserConnectionReservation { user, ip, tracks_ip, - active: true, + state: SessionReservationState::Active, } } async fn release(mut self) { - if !self.active { + if self.state != SessionReservationState::Active { return; } if self.tracks_ip { self.ip_tracker.remove_ip(&self.user, self.ip).await; } - self.active = false; + self.state = SessionReservationState::Released; self.stats.decrement_user_curr_connects(&self.user); } } impl Drop for UserConnectionReservation { fn drop(&mut self) { - if !self.active { + if self.state != SessionReservationState::Active { return; } - self.active = false; + self.state = SessionReservationState::Released; + self.stats.increment_session_drop_fallback_total(); self.stats.decrement_user_curr_connects(&self.user); if self.tracks_ip { self.ip_tracker.enqueue_cleanup(self.user.clone(), self.ip); diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index cb6508b..9a89083 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -1310,7 +1310,7 @@ where let Some(first) = msg else { debug!(conn_id, "ME channel closed"); shrink_session_vec(&mut frame_buf, shrink_threshold); - return Err(ProxyError::Proxy("ME connection lost".into())); + return Err(ProxyError::MiddleConnectionLost); }; let mut batch_frames = 0usize; @@ -1575,7 +1575,7 @@ where Ok(None) => { debug!(conn_id, "ME channel closed"); shrink_session_vec(&mut frame_buf, shrink_threshold); - return Err(ProxyError::Proxy("ME connection lost".into())); + return Err(ProxyError::MiddleConnectionLost); } Err(_) => { max_delay_fired = true; @@ -1853,10 +1853,7 @@ where // When client closes, but ME channel stopped as unregistered - it isnt error if client_closed - && matches!( - writer_result, - Err(ProxyError::Proxy(ref msg)) if msg == "ME connection lost" - ) + && matches!(writer_result, Err(ProxyError::MiddleConnectionLost)) { writer_result = Ok(()); } diff --git a/src/stats/mod.rs b/src/stats/mod.rs index a2e662a..a0a6279 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -285,6 +285,7 @@ pub struct Stats { flow_wait_middle_rate_limit_total: AtomicU64, flow_wait_middle_rate_limit_cancelled_total: AtomicU64, flow_wait_middle_rate_limit_ms_total: AtomicU64, + session_drop_fallback_total: AtomicU64, telemetry_core_enabled: AtomicBool, telemetry_user_enabled: AtomicBool, telemetry_me_level: AtomicU8, @@ -1530,6 +1531,12 @@ impl Stats { .fetch_add(1, Ordering::Relaxed); } } + pub fn increment_session_drop_fallback_total(&self) { + if self.telemetry_core_enabled() { + self.session_drop_fallback_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_endpoint_quarantine_total(&self) { if self.telemetry_me_allows_normal() { self.me_endpoint_quarantine_total @@ -2401,6 +2408,9 @@ impl Stats { self.flow_wait_middle_rate_limit_ms_total .load(Ordering::Relaxed) } + pub fn get_session_drop_fallback_total(&self) -> u64 { + self.session_drop_fallback_total.load(Ordering::Relaxed) + } pub fn increment_user_connects(&self, user: &str) { if !self.telemetry_user_enabled() {