From d46bda98807126d65a95fe4bbc04e9d1a96d8355 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 30 Apr 2026 11:05:04 +0300 Subject: [PATCH] Preserve synchronous IP cleanup queue contract + Rustfmt --- src/config/load.rs | 4 +--- src/ip_tracker.rs | 28 ++++------------------------ src/metrics.rs | 12 ++---------- src/proxy/client.rs | 3 +-- src/stats/beobachten.rs | 5 +---- src/tls_front/cache.rs | 6 ++---- 6 files changed, 11 insertions(+), 47 deletions(-) diff --git a/src/config/load.rs b/src/config/load.rs index 2ae0dba..20faf20 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -2618,9 +2618,7 @@ mod tests { let path = dir.join("telemt_me_route_blocking_send_timeout_zero_test.toml"); std::fs::write(&path, toml).unwrap(); let err = ProxyConfig::load(&path).unwrap_err().to_string(); - assert!( - err.contains("general.me_route_blocking_send_timeout_ms must be within [1, 5000]") - ); + assert!(err.contains("general.me_route_blocking_send_timeout_ms must be within [1, 5000]")); let _ = std::fs::remove_file(path); } diff --git a/src/ip_tracker.rs b/src/ip_tracker.rs index 290bd71..e7b5185 100644 --- a/src/ip_tracker.rs +++ b/src/ip_tracker.rs @@ -26,7 +26,6 @@ pub struct UserIpTracker { recent_entry_count: Arc, active_cap_rejects: Arc, recent_cap_rejects: Arc, - cleanup_direct_releases: Arc, cleanup_deferred_releases: Arc, max_ips: Arc>>, default_max_ips: Arc>, @@ -54,8 +53,6 @@ pub struct UserIpTrackerMemoryStats { pub active_cap_rejects: u64, /// Number of new connections rejected by the global recent-entry cap. pub recent_cap_rejects: u64, - /// Number of release cleanups completed without queueing. - pub cleanup_direct_releases: u64, /// Number of release cleanups deferred through the cleanup queue. pub cleanup_deferred_releases: u64, } @@ -69,7 +66,6 @@ impl UserIpTracker { recent_entry_count: Arc::new(AtomicU64::new(0)), active_cap_rejects: Arc::new(AtomicU64::new(0)), recent_cap_rejects: Arc::new(AtomicU64::new(0)), - cleanup_direct_releases: Arc::new(AtomicU64::new(0)), cleanup_deferred_releases: Arc::new(AtomicU64::new(0)), max_ips: Arc::new(RwLock::new(HashMap::new())), default_max_ips: Arc::new(RwLock::new(0)), @@ -119,24 +115,6 @@ impl UserIpTracker { removed_active_entries } - fn try_cleanup_active_ip(&self, user: &str, ip: IpAddr) -> bool { - let Ok(mut active_ips) = self.active_ips.try_write() else { - return false; - }; - let removed_active_entries = Self::apply_active_cleanup(&mut active_ips, user, ip, 1); - Self::decrement_counter(&self.active_entry_count, removed_active_entries); - true - } - - /// Releases an active IP reservation without waiting, falling back to deferred cleanup. - pub(crate) fn release_or_enqueue_cleanup(&self, user: String, ip: IpAddr) { - if self.try_cleanup_active_ip(&user, ip) { - self.cleanup_direct_releases.fetch_add(1, Ordering::Relaxed); - return; - } - self.enqueue_cleanup(user, ip); - } - /// Queues a deferred active IP cleanup for a later async drain. pub fn enqueue_cleanup(&self, user: String, ip: IpAddr) { match self.cleanup_queue.lock() { @@ -335,7 +313,6 @@ impl UserIpTracker { cleanup_queue_len, active_cap_rejects: self.active_cap_rejects.load(Ordering::Relaxed), recent_cap_rejects: self.recent_cap_rejects.load(Ordering::Relaxed), - cleanup_direct_releases: self.cleanup_direct_releases.load(Ordering::Relaxed), cleanup_deferred_releases: self.cleanup_deferred_releases.load(Ordering::Relaxed), } } @@ -618,7 +595,10 @@ impl UserIpTracker { Self::decrement_counter(&self.active_entry_count, removed_active_entries); let mut recent_ips = self.recent_ips.write().await; - let removed_recent_entries = recent_ips.remove(username).map(|ips| ips.len()).unwrap_or(0); + let removed_recent_entries = recent_ips + .remove(username) + .map(|ips| ips.len()) + .unwrap_or(0); Self::decrement_counter(&self.recent_entry_count, removed_recent_entries); } diff --git a/src/metrics.rs b/src/metrics.rs index d59ca12..addb18f 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -381,10 +381,7 @@ async fn render_metrics( out, "# HELP telemt_tls_fetch_profile_cache_entries Current adaptive TLS fetch profile-cache entries" ); - let _ = writeln!( - out, - "# TYPE telemt_tls_fetch_profile_cache_entries gauge" - ); + let _ = writeln!(out, "# TYPE telemt_tls_fetch_profile_cache_entries gauge"); let _ = writeln!( out, "telemt_tls_fetch_profile_cache_entries {}", @@ -3123,14 +3120,9 @@ async fn render_metrics( ); let _ = writeln!( out, - "# HELP telemt_ip_tracker_cleanup_total Release cleanup decisions by path" + "# HELP telemt_ip_tracker_cleanup_total Release cleanups deferred through the cleanup queue" ); let _ = writeln!(out, "# TYPE telemt_ip_tracker_cleanup_total counter"); - let _ = writeln!( - out, - "telemt_ip_tracker_cleanup_total{{path=\"direct\"}} {}", - ip_memory.cleanup_direct_releases - ); let _ = writeln!( out, "telemt_ip_tracker_cleanup_total{{path=\"deferred\"}} {}", diff --git a/src/proxy/client.rs b/src/proxy/client.rs index bd15faa..90dfee0 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -73,8 +73,7 @@ impl Drop for UserConnectionReservation { self.active = false; self.stats.decrement_user_curr_connects(&self.user); if self.tracks_ip { - self.ip_tracker - .release_or_enqueue_cleanup(self.user.clone(), self.ip); + self.ip_tracker.enqueue_cleanup(self.user.clone(), self.ip); } } } diff --git a/src/stats/beobachten.rs b/src/stats/beobachten.rs index 4929dee..5684455 100644 --- a/src/stats/beobachten.rs +++ b/src/stats/beobachten.rs @@ -88,10 +88,7 @@ impl BeobachtenStore { let mut grouped = BTreeMap::>::new(); for (class, ip, tries) in entries { - grouped - .entry(class) - .or_default() - .push((ip, tries)); + grouped.entry(class).or_default().push((ip, tries)); } if grouped.is_empty() { diff --git a/src/tls_front/cache.rs b/src/tls_front/cache.rs index 8028d6e..f18084b 100644 --- a/src/tls_front/cache.rs +++ b/src/tls_front/cache.rs @@ -3,8 +3,8 @@ use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; use std::net::IpAddr; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock; @@ -156,9 +156,7 @@ impl TlsFrontCache { let should_sweep = self .full_cert_sent_last_sweep_epoch_secs .fetch_update(Ordering::AcqRel, Ordering::Relaxed, |last_sweep| { - if now_epoch_secs.saturating_sub(last_sweep) - >= FULL_CERT_SENT_SWEEP_INTERVAL_SECS - { + if now_epoch_secs.saturating_sub(last_sweep) >= FULL_CERT_SENT_SWEEP_INTERVAL_SECS { Some(now_epoch_secs) } else { None