Compare commits

...

6 Commits

Author SHA1 Message Date
Alexey
10c9bcd97d Merge pull request #747 from telemt/flow
Restore active IP observability for users without unique-IP limits
2026-04-25 18:11:30 +03:00
Alexey
8ab9405dca Bump 2026-04-25 18:05:22 +03:00
Alexey
9412f089c0 Restore active IP observability for users without unique-IP limits 2026-04-25 15:49:28 +03:00
Alexey
4e57cee9b9 Merge pull request #745 from telemt/flow
API PATCH fixes + No IP tracking with disabled unique-IP limits + Bound hot-path pressure in ME Relay and Handshake + Bounded ME Route fairness and IP-Cleanup-Backlog + Bound relay queues by bytes
2026-04-25 14:45:34 +03:00
Alexey
e217371dc8 Bump 2026-04-25 14:36:51 +03:00
Alexey
37c916056a Rustfmt 2026-04-25 14:35:35 +03:00
5 changed files with 73 additions and 50 deletions

2
Cargo.lock generated
View File

@@ -2791,7 +2791,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]] [[package]]
name = "telemt" name = "telemt"
version = "3.4.6" version = "3.4.8"
dependencies = [ dependencies = [
"aes", "aes",
"anyhow", "anyhow",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.4.6" version = "3.4.8"
edition = "2024" edition = "2024"
[features] [features]

View File

@@ -1612,8 +1612,6 @@ impl RunningClientHandler {
}); });
} }
let tracks_ip = ip_tracker.get_user_limit(user).await.is_some();
if tracks_ip {
match ip_tracker.check_and_add(user, peer_addr.ip()).await { match ip_tracker.check_and_add(user, peer_addr.ip()).await {
Ok(()) => {} Ok(()) => {}
Err(reason) => { Err(reason) => {
@@ -1629,14 +1627,13 @@ impl RunningClientHandler {
}); });
} }
} }
}
Ok(UserConnectionReservation::new( Ok(UserConnectionReservation::new(
stats, stats,
ip_tracker, ip_tracker,
user.to_string(), user.to_string(),
peer_addr.ip(), peer_addr.ip(),
tracks_ip, true,
)) ))
} }
@@ -1679,7 +1676,6 @@ impl RunningClientHandler {
}); });
} }
if ip_tracker.get_user_limit(user).await.is_some() {
match ip_tracker.check_and_add(user, peer_addr.ip()).await { match ip_tracker.check_and_add(user, peer_addr.ip()).await {
Ok(()) => { Ok(()) => {
ip_tracker.remove_ip(user, peer_addr.ip()).await; ip_tracker.remove_ip(user, peer_addr.ip()).await;
@@ -1697,7 +1693,6 @@ impl RunningClientHandler {
}); });
} }
} }
}
stats.decrement_user_curr_connects(user); stats.decrement_user_curr_connects(user);
Ok(()) Ok(())

View File

@@ -281,13 +281,8 @@ async fn user_connection_reservation_drop_enqueues_cleanup_synchronously() {
assert_eq!(ip_tracker.get_active_ip_count(&user).await, 1); assert_eq!(ip_tracker.get_active_ip_count(&user).await, 1);
assert_eq!(stats.get_user_curr_connects(&user), 1); assert_eq!(stats.get_user_curr_connects(&user), 1);
let reservation = UserConnectionReservation::new( let reservation =
stats.clone(), UserConnectionReservation::new(stats.clone(), ip_tracker.clone(), user.clone(), ip, true);
ip_tracker.clone(),
user.clone(),
ip,
true,
);
// Drop the reservation synchronously without any tokio::spawn/await yielding! // Drop the reservation synchronously without any tokio::spawn/await yielding!
drop(reservation); drop(reservation);
@@ -965,6 +960,36 @@ async fn reservation_limit_failure_does_not_leak_curr_connects_counter() {
assert_eq!(ip_tracker.get_active_ip_count(user).await, 0); assert_eq!(ip_tracker.get_active_ip_count(user).await, 0);
} }
#[tokio::test]
async fn unlimited_unique_ip_user_is_still_visible_in_active_ip_tracker() {
let user = "active-ip-observed-user";
let config = crate::config::ProxyConfig::default();
let stats = Arc::new(crate::stats::Stats::new());
let ip_tracker = Arc::new(crate::ip_tracker::UserIpTracker::new());
let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 200, 17)), 50017);
let reservation = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats.clone(),
peer,
ip_tracker.clone(),
)
.await
.expect("reservation without unique-IP limit must succeed");
assert_eq!(stats.get_user_curr_connects(user), 1);
assert_eq!(
ip_tracker.get_active_ip_count(user).await,
1,
"active IP observability must not depend on unique-IP limit enforcement"
);
reservation.release().await;
assert_eq!(stats.get_user_curr_connects(user), 0);
assert_eq!(ip_tracker.get_active_ip_count(user).await, 0);
}
#[tokio::test] #[tokio::test]
async fn short_tls_probe_is_masked_through_client_pipeline() { async fn short_tls_probe_is_masked_through_client_pipeline() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();

View File

@@ -8,8 +8,8 @@ use dashmap::DashMap;
use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{Mutex, Semaphore, mpsc}; use tokio::sync::{Mutex, Semaphore, mpsc};
use super::{MeResponse, RouteBytePermit};
use super::codec::WriterCommand; use super::codec::WriterCommand;
use super::{MeResponse, RouteBytePermit};
const ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS: u64 = 25; const ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS: u64 = 25;
const ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS: u64 = 120; const ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS: u64 = 120;
@@ -129,7 +129,10 @@ impl ConnRegistry {
) )
} }
fn with_route_limits(route_channel_capacity: usize, route_byte_permits_per_conn: usize) -> Self { fn with_route_limits(
route_channel_capacity: usize,
route_byte_permits_per_conn: usize,
) -> Self {
let start = rand::random::<u64>() | 1; let start = rand::random::<u64>() | 1;
let route_channel_capacity = route_channel_capacity.max(1); let route_channel_capacity = route_channel_capacity.max(1);
Self { Self {
@@ -209,9 +212,10 @@ impl ConnRegistry {
let id = self.next_id.fetch_add(1, Ordering::Relaxed); let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = mpsc::channel(self.route_channel_capacity); let (tx, rx) = mpsc::channel(self.route_channel_capacity);
self.routing.map.insert(id, tx); self.routing.map.insert(id, tx);
self.routing self.routing.byte_budget.insert(
.byte_budget id,
.insert(id, Arc::new(Semaphore::new(self.route_byte_permits_per_conn))); Arc::new(Semaphore::new(self.route_byte_permits_per_conn)),
);
(id, rx) (id, rx)
} }
@@ -287,8 +291,7 @@ impl ConnRegistry {
.map_err(|_| RouteResult::QueueFullHigh)?, .map_err(|_| RouteResult::QueueFullHigh)?,
Some(timeout_ms) => { Some(timeout_ms) => {
let acquire = semaphore.acquire_many_owned(permits); let acquire = semaphore.acquire_many_owned(permits);
match tokio::time::timeout(Duration::from_millis(timeout_ms.max(1)), acquire) match tokio::time::timeout(Duration::from_millis(timeout_ms.max(1)), acquire).await
.await
{ {
Ok(Ok(permit)) => permit, Ok(Ok(permit)) => permit,
Ok(Err(_)) => return Err(RouteResult::ChannelClosed), Ok(Err(_)) => return Err(RouteResult::ChannelClosed),