Enhance UserConnectionReservation management: add active state and release method, improve cleanup on drop, and implement tests for immediate release and concurrent handling

This commit is contained in:
David Osipov 2026-03-17 19:39:29 +04:00
parent c540a6657f
commit d81140ccec
No known key found for this signature in database
GPG Key ID: 0E55C4A47454E82E
2 changed files with 238 additions and 1 deletions

View File

@ -29,6 +29,7 @@ struct UserConnectionReservation {
ip_tracker: Arc<UserIpTracker>,
user: String,
ip: IpAddr,
active: bool,
}
impl UserConnectionReservation {
@ -38,12 +39,26 @@ impl UserConnectionReservation {
ip_tracker,
user,
ip,
active: true,
}
}
async fn release(mut self) {
if !self.active {
return;
}
self.active = false;
self.stats.decrement_user_curr_connects(&self.user);
self.ip_tracker.remove_ip(&self.user, self.ip).await;
}
}
impl Drop for UserConnectionReservation {
fn drop(&mut self) {
if !self.active {
return;
}
self.active = false;
self.stats.decrement_user_curr_connects(&self.user);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
@ -53,6 +68,12 @@ impl Drop for UserConnectionReservation {
handle.spawn(async move {
ip_tracker.remove_ip(&user, ip).await;
});
} else {
warn!(
user = %self.user,
ip = %self.ip,
"UserConnectionReservation dropped without Tokio runtime; IP reservation cleanup skipped"
);
}
}
}
@ -833,7 +854,7 @@ impl RunningClientHandler {
{
let user = success.user.clone();
let _user_limit_reservation =
let user_limit_reservation =
match Self::acquire_user_connection_reservation_static(
&user,
&config,
@ -905,6 +926,7 @@ impl RunningClientHandler {
)
.await
};
user_limit_reservation.release().await;
relay_result
}

View File

@ -1420,6 +1420,221 @@ async fn tcp_limit_rejection_does_not_reserve_ip_or_trigger_rollback() {
);
}
#[tokio::test]
async fn explicit_reservation_release_cleans_user_and_ip_immediately() {
let user = "release-user";
let peer_addr: SocketAddr = "198.51.100.240:50002".parse().unwrap();
let mut config = ProxyConfig::default();
config
.access
.user_max_tcp_conns
.insert(user.to_string(), 4);
let stats = Arc::new(Stats::new());
let ip_tracker = Arc::new(UserIpTracker::new());
let reservation = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats.clone(),
peer_addr,
ip_tracker.clone(),
)
.await
.expect("reservation acquisition must succeed");
assert_eq!(stats.get_user_curr_connects(user), 1);
assert_eq!(ip_tracker.get_active_ip_count(user).await, 1);
reservation.release().await;
assert_eq!(
stats.get_user_curr_connects(user),
0,
"explicit release must synchronously free user connection slot"
);
assert_eq!(
ip_tracker.get_active_ip_count(user).await,
0,
"explicit release must synchronously remove reserved user IP"
);
}
#[tokio::test]
async fn explicit_reservation_release_does_not_double_decrement_on_drop() {
let user = "release-once-user";
let peer_addr: SocketAddr = "198.51.100.241:50003".parse().unwrap();
let mut config = ProxyConfig::default();
config
.access
.user_max_tcp_conns
.insert(user.to_string(), 4);
let stats = Arc::new(Stats::new());
let ip_tracker = Arc::new(UserIpTracker::new());
let reservation = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats.clone(),
peer_addr,
ip_tracker,
)
.await
.expect("reservation acquisition must succeed");
reservation.release().await;
assert_eq!(
stats.get_user_curr_connects(user),
0,
"release must disarm drop and prevent double decrement"
);
}
#[tokio::test]
async fn drop_fallback_eventually_cleans_user_and_ip_reservation() {
let user = "drop-fallback-user";
let peer_addr: SocketAddr = "198.51.100.242:50004".parse().unwrap();
let mut config = ProxyConfig::default();
config
.access
.user_max_tcp_conns
.insert(user.to_string(), 4);
let stats = Arc::new(Stats::new());
let ip_tracker = Arc::new(UserIpTracker::new());
let reservation = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats.clone(),
peer_addr,
ip_tracker.clone(),
)
.await
.expect("reservation acquisition must succeed");
assert_eq!(stats.get_user_curr_connects(user), 1);
assert_eq!(ip_tracker.get_active_ip_count(user).await, 1);
drop(reservation);
tokio::time::timeout(Duration::from_secs(1), async {
loop {
if stats.get_user_curr_connects(user) == 0
&& ip_tracker.get_active_ip_count(user).await == 0
{
break;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
})
.await
.expect("drop fallback must eventually clean both user slot and active IP");
}
#[tokio::test]
async fn explicit_release_allows_immediate_cross_ip_reacquire_under_limit() {
let user = "cross-ip-user";
let peer1: SocketAddr = "198.51.100.243:50005".parse().unwrap();
let peer2: SocketAddr = "198.51.100.244:50006".parse().unwrap();
let mut config = ProxyConfig::default();
config
.access
.user_max_tcp_conns
.insert(user.to_string(), 4);
let stats = Arc::new(Stats::new());
let ip_tracker = Arc::new(UserIpTracker::new());
ip_tracker.set_user_limit(user, 1).await;
let first = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats.clone(),
peer1,
ip_tracker.clone(),
)
.await
.expect("first reservation must succeed");
first.release().await;
let second = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats.clone(),
peer2,
ip_tracker.clone(),
)
.await
.expect("second reservation must succeed immediately after explicit release");
second.release().await;
assert_eq!(stats.get_user_curr_connects(user), 0);
assert_eq!(ip_tracker.get_active_ip_count(user).await, 0);
}
#[tokio::test]
async fn concurrent_release_storm_leaves_zero_user_and_ip_footprint() {
const RESERVATIONS: usize = 64;
let user = "release-storm-user";
let mut config = ProxyConfig::default();
config
.access
.user_max_tcp_conns
.insert(user.to_string(), RESERVATIONS + 8);
let stats = Arc::new(Stats::new());
let ip_tracker = Arc::new(UserIpTracker::new());
let mut reservations = Vec::with_capacity(RESERVATIONS);
for idx in 0..RESERVATIONS {
let ip = std::net::Ipv4Addr::new(203, 0, 113, (idx + 1) as u8);
let peer = SocketAddr::new(IpAddr::V4(ip), 51000 + idx as u16);
let reservation = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats.clone(),
peer,
ip_tracker.clone(),
)
.await
.expect("reservation acquisition in storm must succeed");
reservations.push(reservation);
}
assert_eq!(stats.get_user_curr_connects(user), RESERVATIONS as u64);
assert_eq!(ip_tracker.get_active_ip_count(user).await, RESERVATIONS);
let mut tasks = tokio::task::JoinSet::new();
for reservation in reservations {
tasks.spawn(async move {
reservation.release().await;
});
}
while let Some(result) = tasks.join_next().await {
result.expect("release task must not panic");
}
assert_eq!(
stats.get_user_curr_connects(user),
0,
"release storm must drain user current-connection counter to zero"
);
assert_eq!(
ip_tracker.get_active_ip_count(user).await,
0,
"release storm must clear all active IP entries"
);
}
#[tokio::test]
async fn quota_rejection_does_not_reserve_ip_or_trigger_rollback() {
let mut config = ProxyConfig::default();