Type Route Cutovers + Reduce IP Tracker cleanup pressure

This commit is contained in:
Alexey
2026-05-10 13:55:01 +03:00
parent 6cb72b3b6c
commit eef2a38c75
9 changed files with 42 additions and 20 deletions

View File

@@ -225,6 +225,9 @@ pub enum ProxyError {
#[error("ME connection lost")] #[error("ME connection lost")]
MiddleConnectionLost, MiddleConnectionLost,
#[error("Session terminated")]
RouteSwitched,
// ============= Config Errors ============= // ============= Config Errors =============
#[error("Config error: {0}")] #[error("Config error: {0}")]
Config(String), Config(String),

View File

@@ -32,6 +32,7 @@ pub struct UserIpTracker {
limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>, limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>,
limit_window: Arc<RwLock<Duration>>, limit_window: Arc<RwLock<Duration>>,
last_compact_epoch_secs: Arc<AtomicU64>, last_compact_epoch_secs: Arc<AtomicU64>,
cleanup_queue_len: Arc<AtomicU64>,
cleanup_queue: Arc<Mutex<HashMap<(String, IpAddr), usize>>>, cleanup_queue: Arc<Mutex<HashMap<(String, IpAddr), usize>>>,
cleanup_drain_lock: Arc<AsyncMutex<()>>, cleanup_drain_lock: Arc<AsyncMutex<()>>,
} }
@@ -72,6 +73,7 @@ impl UserIpTracker {
limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)), limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)),
limit_window: Arc::new(RwLock::new(Duration::from_secs(30))), limit_window: Arc::new(RwLock::new(Duration::from_secs(30))),
last_compact_epoch_secs: Arc::new(AtomicU64::new(0)), last_compact_epoch_secs: Arc::new(AtomicU64::new(0)),
cleanup_queue_len: Arc::new(AtomicU64::new(0)),
cleanup_queue: Arc::new(Mutex::new(HashMap::new())), cleanup_queue: Arc::new(Mutex::new(HashMap::new())),
cleanup_drain_lock: Arc::new(AsyncMutex::new(())), cleanup_drain_lock: Arc::new(AsyncMutex::new(())),
} }
@@ -120,6 +122,9 @@ impl UserIpTracker {
match self.cleanup_queue.lock() { match self.cleanup_queue.lock() {
Ok(mut queue) => { Ok(mut queue) => {
let count = queue.entry((user, ip)).or_insert(0); let count = queue.entry((user, ip)).or_insert(0);
if *count == 0 {
self.cleanup_queue_len.fetch_add(1, Ordering::Relaxed);
}
*count = count.saturating_add(1); *count = count.saturating_add(1);
self.cleanup_deferred_releases self.cleanup_deferred_releases
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
@@ -127,6 +132,9 @@ impl UserIpTracker {
Err(poisoned) => { Err(poisoned) => {
let mut queue = poisoned.into_inner(); let mut queue = poisoned.into_inner();
let count = queue.entry((user.clone(), ip)).or_insert(0); let count = queue.entry((user.clone(), ip)).or_insert(0);
if *count == 0 {
self.cleanup_queue_len.fetch_add(1, Ordering::Relaxed);
}
*count = count.saturating_add(1); *count = count.saturating_add(1);
self.cleanup_deferred_releases self.cleanup_deferred_releases
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
@@ -156,6 +164,9 @@ impl UserIpTracker {
} }
pub(crate) async fn drain_cleanup_queue(&self) { pub(crate) async fn drain_cleanup_queue(&self) {
if self.cleanup_queue_len.load(Ordering::Relaxed) == 0 {
return;
}
let Ok(_drain_guard) = self.cleanup_drain_lock.try_lock() else { let Ok(_drain_guard) = self.cleanup_drain_lock.try_lock() else {
return; return;
}; };
@@ -173,6 +184,7 @@ impl UserIpTracker {
break; break;
}; };
if let Some(count) = queue.remove(&key) { if let Some(count) = queue.remove(&key) {
self.cleanup_queue_len.fetch_sub(1, Ordering::Relaxed);
drained.insert(key, count); drained.insert(key, count);
} }
} }
@@ -191,6 +203,7 @@ impl UserIpTracker {
break; break;
}; };
if let Some(count) = queue.remove(&key) { if let Some(count) = queue.remove(&key) {
self.cleanup_queue_len.fetch_sub(1, Ordering::Relaxed);
drained.insert(key, count); drained.insert(key, count);
} }
} }
@@ -294,12 +307,17 @@ impl UserIpTracker {
} }
} }
pub async fn run_periodic_maintenance(self: Arc<Self>) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
self.drain_cleanup_queue().await;
self.maybe_compact_empty_users().await;
}
}
pub async fn memory_stats(&self) -> UserIpTrackerMemoryStats { pub async fn memory_stats(&self) -> UserIpTrackerMemoryStats {
let cleanup_queue_len = self let cleanup_queue_len = self.cleanup_queue_len.load(Ordering::Relaxed) as usize;
.cleanup_queue
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.len();
let active_ips = self.active_ips.read().await; let active_ips = self.active_ips.read().await;
let recent_ips = self.recent_ips.read().await; let recent_ips = self.recent_ips.read().await;
let active_entries = active_ips.values().map(HashMap::len).sum(); let active_entries = active_ips.values().map(HashMap::len).sum();

View File

@@ -13,7 +13,7 @@ use crate::config::{ProxyConfig, RstOnCloseMode};
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::ip_tracker::UserIpTracker; use crate::ip_tracker::UserIpTracker;
use crate::proxy::ClientHandler; use crate::proxy::ClientHandler;
use crate::proxy::route_mode::{ROUTE_SWITCH_ERROR_MSG, RouteRuntimeController}; use crate::proxy::route_mode::RouteRuntimeController;
use crate::proxy::shared_state::ProxySharedState; use crate::proxy::shared_state::ProxySharedState;
use crate::startup::{COMPONENT_LISTENERS_BIND, StartupTracker}; use crate::startup::{COMPONENT_LISTENERS_BIND, StartupTracker};
use crate::stats::beobachten::BeobachtenStore; use crate::stats::beobachten::BeobachtenStore;
@@ -498,7 +498,7 @@ pub(crate) fn spawn_tcp_accept_loops(
); );
let route_switched = matches!( let route_switched = matches!(
&e, &e,
crate::error::ProxyError::Proxy(msg) if msg == ROUTE_SWITCH_ERROR_MSG crate::error::ProxyError::RouteSwitched
); );
match (peer_close_reason, me_closed) { match (peer_close_reason, me_closed) {

View File

@@ -78,6 +78,11 @@ pub(crate) async fn spawn_runtime_tasks(
stats_maintenance.run_periodic_user_stats_maintenance().await; stats_maintenance.run_periodic_user_stats_maintenance().await;
}); });
let ip_tracker_maintenance = ip_tracker.clone();
tokio::spawn(async move {
ip_tracker_maintenance.run_periodic_maintenance().await;
});
let detected_ip_v4: Option<IpAddr> = probe.detected_ipv4.map(IpAddr::V4); let detected_ip_v4: Option<IpAddr> = probe.detected_ipv4.map(IpAddr::V4);
let detected_ip_v6: Option<IpAddr> = probe.detected_ipv6.map(IpAddr::V6); let detected_ip_v6: Option<IpAddr> = probe.detected_ipv6.map(IpAddr::V6);
debug!( debug!(

View File

@@ -18,8 +18,7 @@ use crate::error::{ProxyError, Result};
use crate::protocol::constants::*; use crate::protocol::constants::*;
use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce}; use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce};
use crate::proxy::route_mode::{ use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state, RelayRouteMode, RouteCutoverState, affected_cutover_state, cutover_stagger_delay,
cutover_stagger_delay,
}; };
use crate::proxy::shared_state::{ use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState, ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
@@ -360,7 +359,7 @@ where
"Cutover affected direct session, closing client connection" "Cutover affected direct session, closing client connection"
); );
tokio::time::sleep(delay).await; tokio::time::sleep(delay).await;
break Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string())); break Err(ProxyError::RouteSwitched);
} }
tokio::select! { tokio::select! {
result = &mut relay_result => { result = &mut relay_result => {

View File

@@ -23,8 +23,7 @@ use crate::error::{ProxyError, Result};
use crate::protocol::constants::{secure_padding_len, *}; use crate::protocol::constants::{secure_padding_len, *};
use crate::proxy::handshake::HandshakeSuccess; use crate::proxy::handshake::HandshakeSuccess;
use crate::proxy::route_mode::{ use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state, RelayRouteMode, RouteCutoverState, affected_cutover_state, cutover_stagger_delay,
cutover_stagger_delay,
}; };
use crate::proxy::shared_state::{ use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState, ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
@@ -1188,7 +1187,7 @@ where
tokio::time::sleep(delay).await; tokio::time::sleep(delay).await;
let _ = me_pool.send_close(conn_id).await; let _ = me_pool.send_close(conn_id).await;
me_pool.registry().unregister(conn_id).await; me_pool.registry().unregister(conn_id).await;
return Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string())); return Err(ProxyError::RouteSwitched);
} }
// Per-user ad_tag from access.user_ad_tags; fallback to general.ad_tag (hot-reloadable) // Per-user ad_tag from access.user_ad_tags; fallback to general.ad_tag (hot-reloadable)
@@ -1690,7 +1689,7 @@ where
stats.as_ref(), stats.as_ref(),
) )
.await; .await;
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string())); main_result = Err(ProxyError::RouteSwitched);
break; break;
} }

View File

@@ -4,8 +4,6 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::watch; use tokio::sync::watch;
pub(crate) const ROUTE_SWITCH_ERROR_MSG: &str = "Session terminated";
#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[repr(u8)] #[repr(u8)]
pub(crate) enum RelayRouteMode { pub(crate) enum RelayRouteMode {

View File

@@ -661,7 +661,7 @@ async fn integration_route_cutover_and_quota_overlap_fails_closed_and_releases_s
assert!( assert!(
matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. })) matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. }))
|| matches!(relay_result, Err(ProxyError::Proxy(ref msg)) if msg == crate::proxy::route_mode::ROUTE_SWITCH_ERROR_MSG), || matches!(relay_result, Err(ProxyError::RouteSwitched)),
"overlap race must fail closed via quota enforcement or generic cutover termination" "overlap race must fail closed via quota enforcement or generic cutover termination"
); );

View File

@@ -1491,7 +1491,7 @@ async fn direct_relay_cutover_midflight_releases_route_gauge() {
assert!( assert!(
matches!( matches!(
relay_result, relay_result,
Err(ProxyError::Proxy(ref msg)) if msg == ROUTE_SWITCH_ERROR_MSG Err(ProxyError::RouteSwitched)
), ),
"client-visible cutover error must stay generic and avoid route-internal metadata" "client-visible cutover error must stay generic and avoid route-internal metadata"
); );
@@ -1631,7 +1631,7 @@ async fn direct_relay_cutover_storm_multi_session_keeps_generic_errors_and_relea
assert!( assert!(
matches!( matches!(
relay_result, relay_result,
Err(ProxyError::Proxy(ref msg)) if msg == ROUTE_SWITCH_ERROR_MSG Err(ProxyError::RouteSwitched)
), ),
"storm-cutover termination must remain generic for all direct sessions" "storm-cutover termination must remain generic for all direct sessions"
); );
@@ -1937,7 +1937,7 @@ async fn adversarial_direct_relay_cutover_integrity() {
assert!( assert!(
matches!( matches!(
result, result,
Err(ProxyError::Proxy(ref msg)) if msg == ROUTE_SWITCH_ERROR_MSG Err(ProxyError::RouteSwitched)
), ),
"Session must terminate with route switch error on cutover" "Session must terminate with route switch error on cutover"
); );