diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 0077f72..849e409 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -31,16 +31,19 @@ struct UserConnectionReservation { user: String, ip: IpAddr, active: bool, + runtime_handle: Option, } impl UserConnectionReservation { fn new(stats: Arc, ip_tracker: Arc, user: String, ip: IpAddr) -> Self { + let runtime_handle = tokio::runtime::Handle::try_current().ok(); Self { stats, ip_tracker, user, ip, active: true, + runtime_handle, } } @@ -62,7 +65,15 @@ impl Drop for UserConnectionReservation { self.active = false; self.stats.decrement_user_curr_connects(&self.user); - if let Ok(handle) = tokio::runtime::Handle::try_current() { + if let Some(handle) = &self.runtime_handle { + let ip_tracker = self.ip_tracker.clone(); + let user = self.user.clone(); + let ip = self.ip; + let handle = handle.clone(); + handle.spawn(async move { + ip_tracker.remove_ip(&user, ip).await; + }); + } else if let Ok(handle) = tokio::runtime::Handle::try_current() { let ip_tracker = self.ip_tracker.clone(); let user = self.user.clone(); let ip = self.ip; diff --git a/src/proxy/client_security_tests.rs b/src/proxy/client_security_tests.rs index 8058c38..8bdb234 100644 --- a/src/proxy/client_security_tests.rs +++ b/src/proxy/client_security_tests.rs @@ -1888,6 +1888,227 @@ async fn quota_rejection_does_not_reserve_ip_or_trigger_rollback() { ); } +#[tokio::test] +async fn expired_user_rejection_does_not_reserve_ip_or_increment_curr_connects() { + let mut config = ProxyConfig::default(); + config + .access + .user_expirations + .insert("user".to_string(), chrono::Utc::now() - chrono::Duration::seconds(1)); + + let stats = Stats::new(); + let ip_tracker = UserIpTracker::new(); + let peer_addr: SocketAddr = "203.0.113.212:50002".parse().unwrap(); + + let result = RunningClientHandler::check_user_limits_static( + "user", + &config, + &stats, + peer_addr, + &ip_tracker, + ) + .await; + + assert!(matches!( + result, + Err(ProxyError::UserExpired { user }) if user == "user" + )); + assert_eq!(stats.get_user_curr_connects("user"), 0); + assert_eq!(ip_tracker.get_active_ip_count("user").await, 0); +} + +#[tokio::test] +async fn same_ip_second_reservation_succeeds_under_unique_ip_limit_one() { + let user = "same-ip-unique-limit-user"; + let peer_addr: SocketAddr = "198.51.100.248:50010".parse().unwrap(); + + let mut config = ProxyConfig::default(); + config + .access + .user_max_tcp_conns + .insert(user.to_string(), 8); + + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 1).await; + + let first = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer_addr, + ip_tracker.clone(), + ) + .await + .expect("first reservation must succeed"); + let second = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer_addr, + ip_tracker.clone(), + ) + .await + .expect("second reservation from same IP must succeed under unique-ip limit=1"); + + assert_eq!(stats.get_user_curr_connects(user), 2); + assert_eq!(ip_tracker.get_active_ip_count(user).await, 1); + + first.release().await; + second.release().await; + assert_eq!(stats.get_user_curr_connects(user), 0); + assert_eq!(ip_tracker.get_active_ip_count(user).await, 0); +} + +#[tokio::test] +async fn second_distinct_ip_is_rejected_under_unique_ip_limit_one() { + let user = "distinct-ip-unique-limit-user"; + let peer1: SocketAddr = "198.51.100.249:50011".parse().unwrap(); + let peer2: SocketAddr = "198.51.100.250:50012".parse().unwrap(); + + let mut config = ProxyConfig::default(); + config + .access + .user_max_tcp_conns + .insert(user.to_string(), 8); + + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 1).await; + + let first = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer1, + ip_tracker.clone(), + ) + .await + .expect("first reservation must succeed"); + + let second = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer2, + ip_tracker.clone(), + ) + .await; + + assert!(matches!( + second, + Err(ProxyError::ConnectionLimitExceeded { user }) if user == "distinct-ip-unique-limit-user" + )); + assert_eq!(stats.get_user_curr_connects(user), 1); + assert_eq!(ip_tracker.get_active_ip_count(user).await, 1); + + first.release().await; +} + +#[tokio::test] +async fn cross_thread_drop_uses_captured_runtime_for_ip_cleanup() { + let user = "cross-thread-drop-user"; + let peer_addr: SocketAddr = "198.51.100.251:50013".parse().unwrap(); + + let mut config = ProxyConfig::default(); + config + .access + .user_max_tcp_conns + .insert(user.to_string(), 8); + + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + + let reservation = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer_addr, + ip_tracker.clone(), + ) + .await + .expect("reservation acquisition must succeed"); + + assert_eq!(stats.get_user_curr_connects(user), 1); + assert_eq!(ip_tracker.get_active_ip_count(user).await, 1); + + std::thread::spawn(move || { + drop(reservation); + }) + .join() + .expect("drop thread must not panic"); + + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if stats.get_user_curr_connects(user) == 0 + && ip_tracker.get_active_ip_count(user).await == 0 + { + break; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + }) + .await + .expect("cross-thread drop must still converge to zero user and IP footprint"); +} + +#[tokio::test] +async fn immediate_reacquire_after_cross_thread_drop_succeeds() { + let user = "cross-thread-reacquire-user"; + let peer_addr: SocketAddr = "198.51.100.252:50014".parse().unwrap(); + + let mut config = ProxyConfig::default(); + config + .access + .user_max_tcp_conns + .insert(user.to_string(), 1); + + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + + let reservation = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer_addr, + ip_tracker.clone(), + ) + .await + .expect("initial reservation must succeed"); + + std::thread::spawn(move || { + drop(reservation); + }) + .join() + .expect("drop thread must not panic"); + + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if stats.get_user_curr_connects(user) == 0 + && ip_tracker.get_active_ip_count(user).await == 0 + { + break; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + }) + .await + .expect("cross-thread cleanup must settle before reacquire check"); + + let reacquire = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats, + peer_addr, + ip_tracker, + ) + .await; + assert!( + reacquire.is_ok(), + "reacquire must succeed after cross-thread drop cleanup" + ); +} + #[tokio::test] async fn concurrent_limit_rejections_from_mixed_ips_leave_no_ip_footprint() { const PARALLEL_IPS: usize = 64;