Bounded ME Route fairness and IP-Cleanup-Backlog

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
Signed-off-by: Alexey <247128645+axkurcom@users.noreply.github.com>
This commit is contained in:
Alexey
2026-04-25 13:09:10 +03:00
parent 8494429690
commit 1df668144c
3 changed files with 50 additions and 8 deletions

View File

@@ -17,6 +17,20 @@ fn remove_temp_config(path: &PathBuf) {
let _ = fs::remove_file(path);
}
#[test]
fn defaults_enable_byte_bounded_route_fairness() {
let cfg = ProxyConfig::default();
assert!(
cfg.general.me_route_fairshare_enabled,
"D2C route fairness must be enabled by default to bound queued bytes"
);
assert!(
cfg.general.me_route_backpressure_enabled,
"D2C route backpressure must be enabled by default to shed under sustained pressure"
);
}
#[test]
fn load_rejects_writer_cmd_capacity_above_upper_bound() {
let path = write_temp_config(

View File

@@ -22,7 +22,7 @@ pub struct UserIpTracker {
limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>,
limit_window: Arc<RwLock<Duration>>,
last_compact_epoch_secs: Arc<AtomicU64>,
cleanup_queue: Arc<Mutex<Vec<(String, IpAddr)>>>,
cleanup_queue: Arc<Mutex<HashMap<(String, IpAddr), usize>>>,
cleanup_drain_lock: Arc<AsyncMutex<()>>,
}
@@ -45,17 +45,21 @@ impl UserIpTracker {
limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)),
limit_window: Arc::new(RwLock::new(Duration::from_secs(30))),
last_compact_epoch_secs: Arc::new(AtomicU64::new(0)),
cleanup_queue: Arc::new(Mutex::new(Vec::new())),
cleanup_queue: Arc::new(Mutex::new(HashMap::new())),
cleanup_drain_lock: Arc::new(AsyncMutex::new(())),
}
}
pub fn enqueue_cleanup(&self, user: String, ip: IpAddr) {
match self.cleanup_queue.lock() {
Ok(mut queue) => queue.push((user, ip)),
Ok(mut queue) => {
let count = queue.entry((user, ip)).or_insert(0);
*count = count.saturating_add(1);
}
Err(poisoned) => {
let mut queue = poisoned.into_inner();
queue.push((user.clone(), ip));
let count = queue.entry((user.clone(), ip)).or_insert(0);
*count = count.saturating_add(1);
self.cleanup_queue.clear_poison();
tracing::warn!(
"UserIpTracker cleanup_queue lock poisoned; recovered and enqueued IP cleanup for {} ({})",
@@ -75,7 +79,9 @@ impl UserIpTracker {
}
#[cfg(test)]
pub(crate) fn cleanup_queue_mutex_for_tests(&self) -> Arc<Mutex<Vec<(String, IpAddr)>>> {
pub(crate) fn cleanup_queue_mutex_for_tests(
&self,
) -> Arc<Mutex<HashMap<(String, IpAddr), usize>>> {
Arc::clone(&self.cleanup_queue)
}
@@ -105,11 +111,14 @@ impl UserIpTracker {
};
let mut active_ips = self.active_ips.write().await;
for (user, ip) in to_remove {
for ((user, ip), pending_count) in to_remove {
if pending_count == 0 {
continue;
}
if let Some(user_ips) = active_ips.get_mut(&user) {
if let Some(count) = user_ips.get_mut(&ip) {
if *count > 1 {
*count -= 1;
if *count > pending_count {
*count -= pending_count;
} else {
user_ips.remove(&ip);
}

View File

@@ -649,6 +649,25 @@ async fn duplicate_cleanup_entries_do_not_break_future_admission() {
);
}
#[tokio::test]
async fn duplicate_cleanup_entries_are_coalesced_until_drain() {
let tracker = UserIpTracker::new();
let ip = ip_from_idx(7150);
tracker.enqueue_cleanup("coalesced-cleanup".to_string(), ip);
tracker.enqueue_cleanup("coalesced-cleanup".to_string(), ip);
tracker.enqueue_cleanup("coalesced-cleanup".to_string(), ip);
assert_eq!(
tracker.cleanup_queue_len_for_tests(),
1,
"duplicate queued cleanup entries must retain one allocation slot"
);
tracker.drain_cleanup_queue().await;
assert_eq!(tracker.cleanup_queue_len_for_tests(), 0);
}
#[tokio::test]
async fn stress_repeated_queue_poison_recovery_preserves_admission_progress() {
let tracker = UserIpTracker::new();