Enhance UserConnectionReservation: add runtime handle for cross-thread IP cleanup and implement tests for user expiration and connection limits

This commit is contained in:
David Osipov 2026-03-17 20:21:01 +04:00
parent 0284b9f9e3
commit 2c06288b40
No known key found for this signature in database
GPG Key ID: 0E55C4A47454E82E
2 changed files with 233 additions and 1 deletions

View File

@ -31,16 +31,19 @@ struct UserConnectionReservation {
user: String,
ip: IpAddr,
active: bool,
runtime_handle: Option<tokio::runtime::Handle>,
}
impl UserConnectionReservation {
fn new(stats: Arc<Stats>, ip_tracker: Arc<UserIpTracker>, user: String, ip: IpAddr) -> Self {
let runtime_handle = tokio::runtime::Handle::try_current().ok();
Self {
stats,
ip_tracker,
user,
ip,
active: true,
runtime_handle,
}
}
@ -62,7 +65,15 @@ impl Drop for UserConnectionReservation {
self.active = false;
self.stats.decrement_user_curr_connects(&self.user);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
if let Some(handle) = &self.runtime_handle {
let ip_tracker = self.ip_tracker.clone();
let user = self.user.clone();
let ip = self.ip;
let handle = handle.clone();
handle.spawn(async move {
ip_tracker.remove_ip(&user, ip).await;
});
} else if let Ok(handle) = tokio::runtime::Handle::try_current() {
let ip_tracker = self.ip_tracker.clone();
let user = self.user.clone();
let ip = self.ip;

View File

@ -1888,6 +1888,227 @@ async fn quota_rejection_does_not_reserve_ip_or_trigger_rollback() {
);
}
#[tokio::test]
async fn expired_user_rejection_does_not_reserve_ip_or_increment_curr_connects() {
let mut config = ProxyConfig::default();
config
.access
.user_expirations
.insert("user".to_string(), chrono::Utc::now() - chrono::Duration::seconds(1));
let stats = Stats::new();
let ip_tracker = UserIpTracker::new();
let peer_addr: SocketAddr = "203.0.113.212:50002".parse().unwrap();
let result = RunningClientHandler::check_user_limits_static(
"user",
&config,
&stats,
peer_addr,
&ip_tracker,
)
.await;
assert!(matches!(
result,
Err(ProxyError::UserExpired { user }) if user == "user"
));
assert_eq!(stats.get_user_curr_connects("user"), 0);
assert_eq!(ip_tracker.get_active_ip_count("user").await, 0);
}
#[tokio::test]
async fn same_ip_second_reservation_succeeds_under_unique_ip_limit_one() {
let user = "same-ip-unique-limit-user";
let peer_addr: SocketAddr = "198.51.100.248:50010".parse().unwrap();
let mut config = ProxyConfig::default();
config
.access
.user_max_tcp_conns
.insert(user.to_string(), 8);
let stats = Arc::new(Stats::new());
let ip_tracker = Arc::new(UserIpTracker::new());
ip_tracker.set_user_limit(user, 1).await;
let first = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats.clone(),
peer_addr,
ip_tracker.clone(),
)
.await
.expect("first reservation must succeed");
let second = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats.clone(),
peer_addr,
ip_tracker.clone(),
)
.await
.expect("second reservation from same IP must succeed under unique-ip limit=1");
assert_eq!(stats.get_user_curr_connects(user), 2);
assert_eq!(ip_tracker.get_active_ip_count(user).await, 1);
first.release().await;
second.release().await;
assert_eq!(stats.get_user_curr_connects(user), 0);
assert_eq!(ip_tracker.get_active_ip_count(user).await, 0);
}
#[tokio::test]
async fn second_distinct_ip_is_rejected_under_unique_ip_limit_one() {
let user = "distinct-ip-unique-limit-user";
let peer1: SocketAddr = "198.51.100.249:50011".parse().unwrap();
let peer2: SocketAddr = "198.51.100.250:50012".parse().unwrap();
let mut config = ProxyConfig::default();
config
.access
.user_max_tcp_conns
.insert(user.to_string(), 8);
let stats = Arc::new(Stats::new());
let ip_tracker = Arc::new(UserIpTracker::new());
ip_tracker.set_user_limit(user, 1).await;
let first = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats.clone(),
peer1,
ip_tracker.clone(),
)
.await
.expect("first reservation must succeed");
let second = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats.clone(),
peer2,
ip_tracker.clone(),
)
.await;
assert!(matches!(
second,
Err(ProxyError::ConnectionLimitExceeded { user }) if user == "distinct-ip-unique-limit-user"
));
assert_eq!(stats.get_user_curr_connects(user), 1);
assert_eq!(ip_tracker.get_active_ip_count(user).await, 1);
first.release().await;
}
#[tokio::test]
async fn cross_thread_drop_uses_captured_runtime_for_ip_cleanup() {
let user = "cross-thread-drop-user";
let peer_addr: SocketAddr = "198.51.100.251:50013".parse().unwrap();
let mut config = ProxyConfig::default();
config
.access
.user_max_tcp_conns
.insert(user.to_string(), 8);
let stats = Arc::new(Stats::new());
let ip_tracker = Arc::new(UserIpTracker::new());
let reservation = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats.clone(),
peer_addr,
ip_tracker.clone(),
)
.await
.expect("reservation acquisition must succeed");
assert_eq!(stats.get_user_curr_connects(user), 1);
assert_eq!(ip_tracker.get_active_ip_count(user).await, 1);
std::thread::spawn(move || {
drop(reservation);
})
.join()
.expect("drop thread must not panic");
tokio::time::timeout(Duration::from_secs(1), async {
loop {
if stats.get_user_curr_connects(user) == 0
&& ip_tracker.get_active_ip_count(user).await == 0
{
break;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
})
.await
.expect("cross-thread drop must still converge to zero user and IP footprint");
}
#[tokio::test]
async fn immediate_reacquire_after_cross_thread_drop_succeeds() {
let user = "cross-thread-reacquire-user";
let peer_addr: SocketAddr = "198.51.100.252:50014".parse().unwrap();
let mut config = ProxyConfig::default();
config
.access
.user_max_tcp_conns
.insert(user.to_string(), 1);
let stats = Arc::new(Stats::new());
let ip_tracker = Arc::new(UserIpTracker::new());
let reservation = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats.clone(),
peer_addr,
ip_tracker.clone(),
)
.await
.expect("initial reservation must succeed");
std::thread::spawn(move || {
drop(reservation);
})
.join()
.expect("drop thread must not panic");
tokio::time::timeout(Duration::from_secs(1), async {
loop {
if stats.get_user_curr_connects(user) == 0
&& ip_tracker.get_active_ip_count(user).await == 0
{
break;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
})
.await
.expect("cross-thread cleanup must settle before reacquire check");
let reacquire = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats,
peer_addr,
ip_tracker,
)
.await;
assert!(
reacquire.is_ok(),
"reacquire must succeed after cross-thread drop cleanup"
);
}
#[tokio::test]
async fn concurrent_limit_rejections_from_mixed_ips_leave_no_ip_footprint() {
const PARALLEL_IPS: usize = 64;