From 1df668144c392052c737c665675c9b9f1612a8ba Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 25 Apr 2026 13:09:10 +0300 Subject: [PATCH] Bounded ME Route fairness and IP-Cleanup-Backlog Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> Signed-off-by: Alexey <247128645+axkurcom@users.noreply.github.com> --- .../tests/load_memory_envelope_tests.rs | 14 +++++++++++ src/ip_tracker.rs | 25 +++++++++++++------ src/tests/ip_tracker_regression_tests.rs | 19 ++++++++++++++ 3 files changed, 50 insertions(+), 8 deletions(-) diff --git a/src/config/tests/load_memory_envelope_tests.rs b/src/config/tests/load_memory_envelope_tests.rs index ea78498..1c201cc 100644 --- a/src/config/tests/load_memory_envelope_tests.rs +++ b/src/config/tests/load_memory_envelope_tests.rs @@ -17,6 +17,20 @@ fn remove_temp_config(path: &PathBuf) { let _ = fs::remove_file(path); } +#[test] +fn defaults_enable_byte_bounded_route_fairness() { + let cfg = ProxyConfig::default(); + + assert!( + cfg.general.me_route_fairshare_enabled, + "D2C route fairness must be enabled by default to bound queued bytes" + ); + assert!( + cfg.general.me_route_backpressure_enabled, + "D2C route backpressure must be enabled by default to shed under sustained pressure" + ); +} + #[test] fn load_rejects_writer_cmd_capacity_above_upper_bound() { let path = write_temp_config( diff --git a/src/ip_tracker.rs b/src/ip_tracker.rs index b4d934f..e3993f1 100644 --- a/src/ip_tracker.rs +++ b/src/ip_tracker.rs @@ -22,7 +22,7 @@ pub struct UserIpTracker { limit_mode: Arc>, limit_window: Arc>, last_compact_epoch_secs: Arc, - cleanup_queue: Arc>>, + cleanup_queue: Arc>>, cleanup_drain_lock: Arc>, } @@ -45,17 +45,21 @@ impl UserIpTracker { limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)), limit_window: Arc::new(RwLock::new(Duration::from_secs(30))), last_compact_epoch_secs: Arc::new(AtomicU64::new(0)), - cleanup_queue: Arc::new(Mutex::new(Vec::new())), + cleanup_queue: Arc::new(Mutex::new(HashMap::new())), cleanup_drain_lock: Arc::new(AsyncMutex::new(())), } } pub fn enqueue_cleanup(&self, user: String, ip: IpAddr) { match self.cleanup_queue.lock() { - Ok(mut queue) => queue.push((user, ip)), + Ok(mut queue) => { + let count = queue.entry((user, ip)).or_insert(0); + *count = count.saturating_add(1); + } Err(poisoned) => { let mut queue = poisoned.into_inner(); - queue.push((user.clone(), ip)); + let count = queue.entry((user.clone(), ip)).or_insert(0); + *count = count.saturating_add(1); self.cleanup_queue.clear_poison(); tracing::warn!( "UserIpTracker cleanup_queue lock poisoned; recovered and enqueued IP cleanup for {} ({})", @@ -75,7 +79,9 @@ impl UserIpTracker { } #[cfg(test)] - pub(crate) fn cleanup_queue_mutex_for_tests(&self) -> Arc>> { + pub(crate) fn cleanup_queue_mutex_for_tests( + &self, + ) -> Arc>> { Arc::clone(&self.cleanup_queue) } @@ -105,11 +111,14 @@ impl UserIpTracker { }; let mut active_ips = self.active_ips.write().await; - for (user, ip) in to_remove { + for ((user, ip), pending_count) in to_remove { + if pending_count == 0 { + continue; + } if let Some(user_ips) = active_ips.get_mut(&user) { if let Some(count) = user_ips.get_mut(&ip) { - if *count > 1 { - *count -= 1; + if *count > pending_count { + *count -= pending_count; } else { user_ips.remove(&ip); } diff --git a/src/tests/ip_tracker_regression_tests.rs b/src/tests/ip_tracker_regression_tests.rs index 0e6656e..193c9c3 100644 --- a/src/tests/ip_tracker_regression_tests.rs +++ b/src/tests/ip_tracker_regression_tests.rs @@ -649,6 +649,25 @@ async fn duplicate_cleanup_entries_do_not_break_future_admission() { ); } +#[tokio::test] +async fn duplicate_cleanup_entries_are_coalesced_until_drain() { + let tracker = UserIpTracker::new(); + let ip = ip_from_idx(7150); + + tracker.enqueue_cleanup("coalesced-cleanup".to_string(), ip); + tracker.enqueue_cleanup("coalesced-cleanup".to_string(), ip); + tracker.enqueue_cleanup("coalesced-cleanup".to_string(), ip); + + assert_eq!( + tracker.cleanup_queue_len_for_tests(), + 1, + "duplicate queued cleanup entries must retain one allocation slot" + ); + + tracker.drain_cleanup_queue().await; + assert_eq!(tracker.cleanup_queue_len_for_tests(), 0); +} + #[tokio::test] async fn stress_repeated_queue_poison_recovery_preserves_admission_progress() { let tracker = UserIpTracker::new();