Explicit Reasons of Session Fallback Cleanup + ME Close

This commit is contained in:
Alexey
2026-05-10 13:50:36 +03:00
parent 090b2ca636
commit 6cb72b3b6c
6 changed files with 44 additions and 13 deletions

View File

@@ -222,6 +222,9 @@ pub enum ProxyError {
#[error("Proxy error: {0}")] #[error("Proxy error: {0}")]
Proxy(String), Proxy(String),
#[error("ME connection lost")]
MiddleConnectionLost,
// ============= Config Errors ============= // ============= Config Errors =============
#[error("Config error: {0}")] #[error("Config error: {0}")]
Config(String), Config(String),

View File

@@ -494,7 +494,7 @@ pub(crate) fn spawn_tcp_accept_loops(
let me_closed = matches!( let me_closed = matches!(
&e, &e,
crate::error::ProxyError::Proxy(msg) if msg == "ME connection lost" crate::error::ProxyError::MiddleConnectionLost
); );
let route_switched = matches!( let route_switched = matches!(
&e, &e,

View File

@@ -2106,6 +2106,20 @@ async fn render_metrics(
0 0
} }
); );
let _ = writeln!(
out,
"# HELP telemt_session_drop_fallback_total Session reservations cleaned by Drop instead of explicit async release"
);
let _ = writeln!(out, "# TYPE telemt_session_drop_fallback_total counter");
let _ = writeln!(
out,
"telemt_session_drop_fallback_total {}",
if core_enabled {
stats.get_session_drop_fallback_total()
} else {
0
}
);
let _ = writeln!( let _ = writeln!(
out, out,

View File

@@ -32,7 +32,13 @@ struct UserConnectionReservation {
user: String, user: String,
ip: IpAddr, ip: IpAddr,
tracks_ip: bool, tracks_ip: bool,
active: bool, state: SessionReservationState,
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum SessionReservationState {
Active,
Released,
} }
impl UserConnectionReservation { impl UserConnectionReservation {
@@ -49,28 +55,29 @@ impl UserConnectionReservation {
user, user,
ip, ip,
tracks_ip, tracks_ip,
active: true, state: SessionReservationState::Active,
} }
} }
async fn release(mut self) { async fn release(mut self) {
if !self.active { if self.state != SessionReservationState::Active {
return; return;
} }
if self.tracks_ip { if self.tracks_ip {
self.ip_tracker.remove_ip(&self.user, self.ip).await; self.ip_tracker.remove_ip(&self.user, self.ip).await;
} }
self.active = false; self.state = SessionReservationState::Released;
self.stats.decrement_user_curr_connects(&self.user); self.stats.decrement_user_curr_connects(&self.user);
} }
} }
impl Drop for UserConnectionReservation { impl Drop for UserConnectionReservation {
fn drop(&mut self) { fn drop(&mut self) {
if !self.active { if self.state != SessionReservationState::Active {
return; return;
} }
self.active = false; self.state = SessionReservationState::Released;
self.stats.increment_session_drop_fallback_total();
self.stats.decrement_user_curr_connects(&self.user); self.stats.decrement_user_curr_connects(&self.user);
if self.tracks_ip { if self.tracks_ip {
self.ip_tracker.enqueue_cleanup(self.user.clone(), self.ip); self.ip_tracker.enqueue_cleanup(self.user.clone(), self.ip);

View File

@@ -1310,7 +1310,7 @@ where
let Some(first) = msg else { let Some(first) = msg else {
debug!(conn_id, "ME channel closed"); debug!(conn_id, "ME channel closed");
shrink_session_vec(&mut frame_buf, shrink_threshold); shrink_session_vec(&mut frame_buf, shrink_threshold);
return Err(ProxyError::Proxy("ME connection lost".into())); return Err(ProxyError::MiddleConnectionLost);
}; };
let mut batch_frames = 0usize; let mut batch_frames = 0usize;
@@ -1575,7 +1575,7 @@ where
Ok(None) => { Ok(None) => {
debug!(conn_id, "ME channel closed"); debug!(conn_id, "ME channel closed");
shrink_session_vec(&mut frame_buf, shrink_threshold); shrink_session_vec(&mut frame_buf, shrink_threshold);
return Err(ProxyError::Proxy("ME connection lost".into())); return Err(ProxyError::MiddleConnectionLost);
} }
Err(_) => { Err(_) => {
max_delay_fired = true; max_delay_fired = true;
@@ -1853,10 +1853,7 @@ where
// When client closes, but ME channel stopped as unregistered - it isnt error // When client closes, but ME channel stopped as unregistered - it isnt error
if client_closed if client_closed
&& matches!( && matches!(writer_result, Err(ProxyError::MiddleConnectionLost))
writer_result,
Err(ProxyError::Proxy(ref msg)) if msg == "ME connection lost"
)
{ {
writer_result = Ok(()); writer_result = Ok(());
} }

View File

@@ -285,6 +285,7 @@ pub struct Stats {
flow_wait_middle_rate_limit_total: AtomicU64, flow_wait_middle_rate_limit_total: AtomicU64,
flow_wait_middle_rate_limit_cancelled_total: AtomicU64, flow_wait_middle_rate_limit_cancelled_total: AtomicU64,
flow_wait_middle_rate_limit_ms_total: AtomicU64, flow_wait_middle_rate_limit_ms_total: AtomicU64,
session_drop_fallback_total: AtomicU64,
telemetry_core_enabled: AtomicBool, telemetry_core_enabled: AtomicBool,
telemetry_user_enabled: AtomicBool, telemetry_user_enabled: AtomicBool,
telemetry_me_level: AtomicU8, telemetry_me_level: AtomicU8,
@@ -1530,6 +1531,12 @@ impl Stats {
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
} }
} }
pub fn increment_session_drop_fallback_total(&self) {
if self.telemetry_core_enabled() {
self.session_drop_fallback_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_endpoint_quarantine_total(&self) { pub fn increment_me_endpoint_quarantine_total(&self) {
if self.telemetry_me_allows_normal() { if self.telemetry_me_allows_normal() {
self.me_endpoint_quarantine_total self.me_endpoint_quarantine_total
@@ -2401,6 +2408,9 @@ impl Stats {
self.flow_wait_middle_rate_limit_ms_total self.flow_wait_middle_rate_limit_ms_total
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
} }
pub fn get_session_drop_fallback_total(&self) -> u64 {
self.session_drop_fallback_total.load(Ordering::Relaxed)
}
pub fn increment_user_connects(&self, user: &str) { pub fn increment_user_connects(&self, user: &str) {
if !self.telemetry_user_enabled() { if !self.telemetry_user_enabled() {