diff --git a/src/proxy/client.rs b/src/proxy/client.rs index f80f74d..0077f72 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -24,6 +24,7 @@ enum HandshakeOutcome { Handled, } +#[must_use = "UserConnectionReservation must be kept alive to retain user/IP reservation until release or drop"] struct UserConnectionReservation { stats: Arc, ip_tracker: Arc, diff --git a/src/proxy/client_security_tests.rs b/src/proxy/client_security_tests.rs index 7047987..8058c38 100644 --- a/src/proxy/client_security_tests.rs +++ b/src/proxy/client_security_tests.rs @@ -1635,6 +1635,223 @@ async fn concurrent_release_storm_leaves_zero_user_and_ip_footprint() { ); } +#[tokio::test] +async fn relay_connect_error_releases_user_and_ip_before_return() { + let user = "relay-error-user"; + let peer_addr: SocketAddr = "198.51.100.245:50007".parse().unwrap(); + + let dead_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let dead_port = dead_listener.local_addr().unwrap().port(); + drop(dead_listener); + + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + + let mut config = ProxyConfig::default(); + config + .access + .user_max_tcp_conns + .insert(user.to_string(), 1); + config + .dc_overrides + .insert("2".to_string(), vec![format!("127.0.0.1:{dead_port}")]); + let config = Arc::new(config); + + let upstream_manager = Arc::new(UpstreamManager::new( + vec![UpstreamConfig { + upstream_type: UpstreamType::Direct { + interface: None, + bind_addresses: None, + }, + weight: 1, + enabled: true, + scopes: String::new(), + selected_scope: String::new(), + }], + 1, + 1, + 1, + 1, + false, + stats.clone(), + )); + + let buffer_pool = Arc::new(BufferPool::new()); + let rng = Arc::new(SecureRandom::new()); + let route_runtime = Arc::new(RouteRuntimeController::new(RelayRouteMode::Direct)); + + let (server_side, _client_side) = duplex(64 * 1024); + let (server_reader, server_writer) = tokio::io::split(server_side); + let client_reader = make_crypto_reader(server_reader); + let client_writer = make_crypto_writer(server_writer); + + let success = HandshakeSuccess { + user: user.to_string(), + dc_idx: 2, + proto_tag: ProtoTag::Intermediate, + dec_key: [0u8; 32], + dec_iv: 0, + enc_key: [0u8; 32], + enc_iv: 0, + peer: peer_addr, + is_tls: false, + }; + + let result = RunningClientHandler::handle_authenticated_static( + client_reader, + client_writer, + success, + upstream_manager, + stats.clone(), + config, + buffer_pool, + rng, + None, + route_runtime, + "127.0.0.1:443".parse().unwrap(), + peer_addr, + ip_tracker.clone(), + ) + .await; + + assert!(result.is_err(), "relay must fail when upstream DC is unreachable"); + assert_eq!( + stats.get_user_curr_connects(user), + 0, + "error return must release user slot before returning" + ); + assert_eq!( + ip_tracker.get_active_ip_count(user).await, + 0, + "error return must release user IP reservation before returning" + ); +} + +#[tokio::test] +async fn mixed_release_and_drop_same_ip_preserves_counter_correctness() { + let user = "same-ip-mixed-user"; + let peer_addr: SocketAddr = "198.51.100.246:50008".parse().unwrap(); + + let mut config = ProxyConfig::default(); + config + .access + .user_max_tcp_conns + .insert(user.to_string(), 8); + + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + + let reservation_a = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer_addr, + ip_tracker.clone(), + ) + .await + .expect("first reservation must succeed"); + let reservation_b = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer_addr, + ip_tracker.clone(), + ) + .await + .expect("second reservation must succeed"); + + assert_eq!(stats.get_user_curr_connects(user), 2); + assert_eq!(ip_tracker.get_active_ip_count(user).await, 1); + + reservation_a.release().await; + assert_eq!( + stats.get_user_curr_connects(user), + 1, + "explicit release must decrement only one active reservation" + ); + assert_eq!( + ip_tracker.get_active_ip_count(user).await, + 1, + "same IP must remain active while second reservation exists" + ); + + drop(reservation_b); + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if stats.get_user_curr_connects(user) == 0 + && ip_tracker.get_active_ip_count(user).await == 0 + { + break; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + }) + .await + .expect("drop fallback must clear final same-IP reservation"); +} + +#[tokio::test] +async fn drop_one_of_two_same_ip_reservations_keeps_ip_active() { + let user = "same-ip-drop-one-user"; + let peer_addr: SocketAddr = "198.51.100.247:50009".parse().unwrap(); + + let mut config = ProxyConfig::default(); + config + .access + .user_max_tcp_conns + .insert(user.to_string(), 8); + + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + + let reservation_a = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer_addr, + ip_tracker.clone(), + ) + .await + .expect("first reservation must succeed"); + let reservation_b = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer_addr, + ip_tracker.clone(), + ) + .await + .expect("second reservation must succeed"); + + drop(reservation_a); + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if stats.get_user_curr_connects(user) == 1 + && ip_tracker.get_active_ip_count(user).await == 1 + { + break; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + }) + .await + .expect("dropping one reservation must keep same-IP activity for remaining reservation"); + + reservation_b.release().await; + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if stats.get_user_curr_connects(user) == 0 + && ip_tracker.get_active_ip_count(user).await == 0 + { + break; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + }) + .await + .expect("final release must converge to zero footprint after async fallback cleanup"); +} + #[tokio::test] async fn quota_rejection_does_not_reserve_ip_or_trigger_rollback() { let mut config = ProxyConfig::default(); diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 36241af..3ad361f 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -26,6 +26,7 @@ enum RouteConnectionGauge { Middle, } +#[must_use = "RouteConnectionLease must be kept alive to hold the connection gauge increment"] pub struct RouteConnectionLease { stats: Arc, gauge: RouteConnectionGauge,