Preserve synchronous IP cleanup queue contract + Rustfmt

This commit is contained in:
Alexey
2026-04-30 11:05:04 +03:00
parent c3de07db6a
commit d46bda9880
6 changed files with 11 additions and 47 deletions

View File

@@ -2618,9 +2618,7 @@ mod tests {
let path = dir.join("telemt_me_route_blocking_send_timeout_zero_test.toml"); let path = dir.join("telemt_me_route_blocking_send_timeout_zero_test.toml");
std::fs::write(&path, toml).unwrap(); std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string(); let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!( assert!(err.contains("general.me_route_blocking_send_timeout_ms must be within [1, 5000]"));
err.contains("general.me_route_blocking_send_timeout_ms must be within [1, 5000]")
);
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }

View File

@@ -26,7 +26,6 @@ pub struct UserIpTracker {
recent_entry_count: Arc<AtomicU64>, recent_entry_count: Arc<AtomicU64>,
active_cap_rejects: Arc<AtomicU64>, active_cap_rejects: Arc<AtomicU64>,
recent_cap_rejects: Arc<AtomicU64>, recent_cap_rejects: Arc<AtomicU64>,
cleanup_direct_releases: Arc<AtomicU64>,
cleanup_deferred_releases: Arc<AtomicU64>, cleanup_deferred_releases: Arc<AtomicU64>,
max_ips: Arc<RwLock<HashMap<String, usize>>>, max_ips: Arc<RwLock<HashMap<String, usize>>>,
default_max_ips: Arc<RwLock<usize>>, default_max_ips: Arc<RwLock<usize>>,
@@ -54,8 +53,6 @@ pub struct UserIpTrackerMemoryStats {
pub active_cap_rejects: u64, pub active_cap_rejects: u64,
/// Number of new connections rejected by the global recent-entry cap. /// Number of new connections rejected by the global recent-entry cap.
pub recent_cap_rejects: u64, pub recent_cap_rejects: u64,
/// Number of release cleanups completed without queueing.
pub cleanup_direct_releases: u64,
/// Number of release cleanups deferred through the cleanup queue. /// Number of release cleanups deferred through the cleanup queue.
pub cleanup_deferred_releases: u64, pub cleanup_deferred_releases: u64,
} }
@@ -69,7 +66,6 @@ impl UserIpTracker {
recent_entry_count: Arc::new(AtomicU64::new(0)), recent_entry_count: Arc::new(AtomicU64::new(0)),
active_cap_rejects: Arc::new(AtomicU64::new(0)), active_cap_rejects: Arc::new(AtomicU64::new(0)),
recent_cap_rejects: Arc::new(AtomicU64::new(0)), recent_cap_rejects: Arc::new(AtomicU64::new(0)),
cleanup_direct_releases: Arc::new(AtomicU64::new(0)),
cleanup_deferred_releases: Arc::new(AtomicU64::new(0)), cleanup_deferred_releases: Arc::new(AtomicU64::new(0)),
max_ips: Arc::new(RwLock::new(HashMap::new())), max_ips: Arc::new(RwLock::new(HashMap::new())),
default_max_ips: Arc::new(RwLock::new(0)), default_max_ips: Arc::new(RwLock::new(0)),
@@ -119,24 +115,6 @@ impl UserIpTracker {
removed_active_entries removed_active_entries
} }
fn try_cleanup_active_ip(&self, user: &str, ip: IpAddr) -> bool {
let Ok(mut active_ips) = self.active_ips.try_write() else {
return false;
};
let removed_active_entries = Self::apply_active_cleanup(&mut active_ips, user, ip, 1);
Self::decrement_counter(&self.active_entry_count, removed_active_entries);
true
}
/// Releases an active IP reservation without waiting, falling back to deferred cleanup.
pub(crate) fn release_or_enqueue_cleanup(&self, user: String, ip: IpAddr) {
if self.try_cleanup_active_ip(&user, ip) {
self.cleanup_direct_releases.fetch_add(1, Ordering::Relaxed);
return;
}
self.enqueue_cleanup(user, ip);
}
/// Queues a deferred active IP cleanup for a later async drain. /// Queues a deferred active IP cleanup for a later async drain.
pub fn enqueue_cleanup(&self, user: String, ip: IpAddr) { pub fn enqueue_cleanup(&self, user: String, ip: IpAddr) {
match self.cleanup_queue.lock() { match self.cleanup_queue.lock() {
@@ -335,7 +313,6 @@ impl UserIpTracker {
cleanup_queue_len, cleanup_queue_len,
active_cap_rejects: self.active_cap_rejects.load(Ordering::Relaxed), active_cap_rejects: self.active_cap_rejects.load(Ordering::Relaxed),
recent_cap_rejects: self.recent_cap_rejects.load(Ordering::Relaxed), recent_cap_rejects: self.recent_cap_rejects.load(Ordering::Relaxed),
cleanup_direct_releases: self.cleanup_direct_releases.load(Ordering::Relaxed),
cleanup_deferred_releases: self.cleanup_deferred_releases.load(Ordering::Relaxed), cleanup_deferred_releases: self.cleanup_deferred_releases.load(Ordering::Relaxed),
} }
} }
@@ -618,7 +595,10 @@ impl UserIpTracker {
Self::decrement_counter(&self.active_entry_count, removed_active_entries); Self::decrement_counter(&self.active_entry_count, removed_active_entries);
let mut recent_ips = self.recent_ips.write().await; let mut recent_ips = self.recent_ips.write().await;
let removed_recent_entries = recent_ips.remove(username).map(|ips| ips.len()).unwrap_or(0); let removed_recent_entries = recent_ips
.remove(username)
.map(|ips| ips.len())
.unwrap_or(0);
Self::decrement_counter(&self.recent_entry_count, removed_recent_entries); Self::decrement_counter(&self.recent_entry_count, removed_recent_entries);
} }

View File

@@ -381,10 +381,7 @@ async fn render_metrics(
out, out,
"# HELP telemt_tls_fetch_profile_cache_entries Current adaptive TLS fetch profile-cache entries" "# HELP telemt_tls_fetch_profile_cache_entries Current adaptive TLS fetch profile-cache entries"
); );
let _ = writeln!( let _ = writeln!(out, "# TYPE telemt_tls_fetch_profile_cache_entries gauge");
out,
"# TYPE telemt_tls_fetch_profile_cache_entries gauge"
);
let _ = writeln!( let _ = writeln!(
out, out,
"telemt_tls_fetch_profile_cache_entries {}", "telemt_tls_fetch_profile_cache_entries {}",
@@ -3123,14 +3120,9 @@ async fn render_metrics(
); );
let _ = writeln!( let _ = writeln!(
out, out,
"# HELP telemt_ip_tracker_cleanup_total Release cleanup decisions by path" "# HELP telemt_ip_tracker_cleanup_total Release cleanups deferred through the cleanup queue"
); );
let _ = writeln!(out, "# TYPE telemt_ip_tracker_cleanup_total counter"); let _ = writeln!(out, "# TYPE telemt_ip_tracker_cleanup_total counter");
let _ = writeln!(
out,
"telemt_ip_tracker_cleanup_total{{path=\"direct\"}} {}",
ip_memory.cleanup_direct_releases
);
let _ = writeln!( let _ = writeln!(
out, out,
"telemt_ip_tracker_cleanup_total{{path=\"deferred\"}} {}", "telemt_ip_tracker_cleanup_total{{path=\"deferred\"}} {}",

View File

@@ -73,8 +73,7 @@ impl Drop for UserConnectionReservation {
self.active = false; self.active = false;
self.stats.decrement_user_curr_connects(&self.user); self.stats.decrement_user_curr_connects(&self.user);
if self.tracks_ip { if self.tracks_ip {
self.ip_tracker self.ip_tracker.enqueue_cleanup(self.user.clone(), self.ip);
.release_or_enqueue_cleanup(self.user.clone(), self.ip);
} }
} }
} }

View File

@@ -88,10 +88,7 @@ impl BeobachtenStore {
let mut grouped = BTreeMap::<String, Vec<(IpAddr, u64)>>::new(); let mut grouped = BTreeMap::<String, Vec<(IpAddr, u64)>>::new();
for (class, ip, tries) in entries { for (class, ip, tries) in entries {
grouped grouped.entry(class).or_default().push((ip, tries));
.entry(class)
.or_default()
.push((ip, tries));
} }
if grouped.is_empty() { if grouped.is_empty() {

View File

@@ -3,8 +3,8 @@ use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::net::IpAddr; use std::net::IpAddr;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock; use tokio::sync::RwLock;
@@ -156,9 +156,7 @@ impl TlsFrontCache {
let should_sweep = self let should_sweep = self
.full_cert_sent_last_sweep_epoch_secs .full_cert_sent_last_sweep_epoch_secs
.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |last_sweep| { .fetch_update(Ordering::AcqRel, Ordering::Relaxed, |last_sweep| {
if now_epoch_secs.saturating_sub(last_sweep) if now_epoch_secs.saturating_sub(last_sweep) >= FULL_CERT_SENT_SWEEP_INTERVAL_SECS {
>= FULL_CERT_SENT_SWEEP_INTERVAL_SECS
{
Some(now_epoch_secs) Some(now_epoch_secs)
} else { } else {
None None