mirror of
https://github.com/telemt/telemt.git
synced 2026-05-22 19:51:43 +03:00
Compare commits
9 Commits
2f2fe9d5d3
...
bdfa641843
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bdfa641843 | ||
|
|
007fc86189 | ||
|
|
10c9bcd97d | ||
|
|
8ab9405dca | ||
|
|
9412f089c0 | ||
|
|
4e57cee9b9 | ||
|
|
e217371dc8 | ||
|
|
d567dfe40b | ||
|
|
37c916056a |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2791,7 +2791,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
|
||||
|
||||
[[package]]
|
||||
name = "telemt"
|
||||
version = "3.4.6"
|
||||
version = "3.4.8"
|
||||
dependencies = [
|
||||
"aes",
|
||||
"anyhow",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "telemt"
|
||||
version = "3.4.6"
|
||||
version = "3.4.8"
|
||||
edition = "2024"
|
||||
|
||||
[features]
|
||||
|
||||
@@ -278,9 +278,11 @@ impl UserIpTracker {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let is_new_ip = !user_recent.contains_key(&ip);
|
||||
|
||||
if let Some(limit) = limit {
|
||||
let active_limit_reached = user_active.len() >= limit;
|
||||
let recent_limit_reached = user_recent.len() >= limit;
|
||||
let recent_limit_reached = user_recent.len() >= limit && is_new_ip;
|
||||
let deny = match mode {
|
||||
UserMaxUniqueIpsMode::ActiveWindow => active_limit_reached,
|
||||
UserMaxUniqueIpsMode::TimeWindow => recent_limit_reached,
|
||||
@@ -860,4 +862,19 @@ mod tests {
|
||||
.unwrap_or(false);
|
||||
assert!(!stale_exists);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_time_window_allows_same_ip_reconnect() {
|
||||
let tracker = UserIpTracker::new();
|
||||
tracker.set_user_limit("test_user", 1).await;
|
||||
tracker
|
||||
.set_limit_policy(UserMaxUniqueIpsMode::TimeWindow, 1)
|
||||
.await;
|
||||
|
||||
let ip1 = test_ipv4(10, 4, 0, 1);
|
||||
|
||||
assert!(tracker.check_and_add("test_user", ip1).await.is_ok());
|
||||
tracker.remove_ip("test_user", ip1).await;
|
||||
assert!(tracker.check_and_add("test_user", ip1).await.is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1612,22 +1612,19 @@ impl RunningClientHandler {
|
||||
});
|
||||
}
|
||||
|
||||
let tracks_ip = ip_tracker.get_user_limit(user).await.is_some();
|
||||
if tracks_ip {
|
||||
match ip_tracker.check_and_add(user, peer_addr.ip()).await {
|
||||
Ok(()) => {}
|
||||
Err(reason) => {
|
||||
stats.decrement_user_curr_connects(user);
|
||||
warn!(
|
||||
user = %user,
|
||||
ip = %peer_addr.ip(),
|
||||
reason = %reason,
|
||||
"IP limit exceeded"
|
||||
);
|
||||
return Err(ProxyError::ConnectionLimitExceeded {
|
||||
user: user.to_string(),
|
||||
});
|
||||
}
|
||||
match ip_tracker.check_and_add(user, peer_addr.ip()).await {
|
||||
Ok(()) => {}
|
||||
Err(reason) => {
|
||||
stats.decrement_user_curr_connects(user);
|
||||
warn!(
|
||||
user = %user,
|
||||
ip = %peer_addr.ip(),
|
||||
reason = %reason,
|
||||
"IP limit exceeded"
|
||||
);
|
||||
return Err(ProxyError::ConnectionLimitExceeded {
|
||||
user: user.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1636,7 +1633,7 @@ impl RunningClientHandler {
|
||||
ip_tracker,
|
||||
user.to_string(),
|
||||
peer_addr.ip(),
|
||||
tracks_ip,
|
||||
true,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -1679,23 +1676,21 @@ impl RunningClientHandler {
|
||||
});
|
||||
}
|
||||
|
||||
if ip_tracker.get_user_limit(user).await.is_some() {
|
||||
match ip_tracker.check_and_add(user, peer_addr.ip()).await {
|
||||
Ok(()) => {
|
||||
ip_tracker.remove_ip(user, peer_addr.ip()).await;
|
||||
}
|
||||
Err(reason) => {
|
||||
stats.decrement_user_curr_connects(user);
|
||||
warn!(
|
||||
user = %user,
|
||||
ip = %peer_addr.ip(),
|
||||
reason = %reason,
|
||||
"IP limit exceeded"
|
||||
);
|
||||
return Err(ProxyError::ConnectionLimitExceeded {
|
||||
user: user.to_string(),
|
||||
});
|
||||
}
|
||||
match ip_tracker.check_and_add(user, peer_addr.ip()).await {
|
||||
Ok(()) => {
|
||||
ip_tracker.remove_ip(user, peer_addr.ip()).await;
|
||||
}
|
||||
Err(reason) => {
|
||||
stats.decrement_user_curr_connects(user);
|
||||
warn!(
|
||||
user = %user,
|
||||
ip = %peer_addr.ip(),
|
||||
reason = %reason,
|
||||
"IP limit exceeded"
|
||||
);
|
||||
return Err(ProxyError::ConnectionLimitExceeded {
|
||||
user: user.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -281,13 +281,8 @@ async fn user_connection_reservation_drop_enqueues_cleanup_synchronously() {
|
||||
assert_eq!(ip_tracker.get_active_ip_count(&user).await, 1);
|
||||
assert_eq!(stats.get_user_curr_connects(&user), 1);
|
||||
|
||||
let reservation = UserConnectionReservation::new(
|
||||
stats.clone(),
|
||||
ip_tracker.clone(),
|
||||
user.clone(),
|
||||
ip,
|
||||
true,
|
||||
);
|
||||
let reservation =
|
||||
UserConnectionReservation::new(stats.clone(), ip_tracker.clone(), user.clone(), ip, true);
|
||||
|
||||
// Drop the reservation synchronously without any tokio::spawn/await yielding!
|
||||
drop(reservation);
|
||||
@@ -965,6 +960,36 @@ async fn reservation_limit_failure_does_not_leak_curr_connects_counter() {
|
||||
assert_eq!(ip_tracker.get_active_ip_count(user).await, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unlimited_unique_ip_user_is_still_visible_in_active_ip_tracker() {
|
||||
let user = "active-ip-observed-user";
|
||||
let config = crate::config::ProxyConfig::default();
|
||||
let stats = Arc::new(crate::stats::Stats::new());
|
||||
let ip_tracker = Arc::new(crate::ip_tracker::UserIpTracker::new());
|
||||
let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 200, 17)), 50017);
|
||||
|
||||
let reservation = RunningClientHandler::acquire_user_connection_reservation_static(
|
||||
user,
|
||||
&config,
|
||||
stats.clone(),
|
||||
peer,
|
||||
ip_tracker.clone(),
|
||||
)
|
||||
.await
|
||||
.expect("reservation without unique-IP limit must succeed");
|
||||
|
||||
assert_eq!(stats.get_user_curr_connects(user), 1);
|
||||
assert_eq!(
|
||||
ip_tracker.get_active_ip_count(user).await,
|
||||
1,
|
||||
"active IP observability must not depend on unique-IP limit enforcement"
|
||||
);
|
||||
|
||||
reservation.release().await;
|
||||
assert_eq!(stats.get_user_curr_connects(user), 0);
|
||||
assert_eq!(ip_tracker.get_active_ip_count(user).await, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn short_tls_probe_is_masked_through_client_pipeline() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
|
||||
@@ -8,8 +8,8 @@ use dashmap::DashMap;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio::sync::{Mutex, Semaphore, mpsc};
|
||||
|
||||
use super::{MeResponse, RouteBytePermit};
|
||||
use super::codec::WriterCommand;
|
||||
use super::{MeResponse, RouteBytePermit};
|
||||
|
||||
const ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS: u64 = 25;
|
||||
const ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS: u64 = 120;
|
||||
@@ -129,7 +129,10 @@ impl ConnRegistry {
|
||||
)
|
||||
}
|
||||
|
||||
fn with_route_limits(route_channel_capacity: usize, route_byte_permits_per_conn: usize) -> Self {
|
||||
fn with_route_limits(
|
||||
route_channel_capacity: usize,
|
||||
route_byte_permits_per_conn: usize,
|
||||
) -> Self {
|
||||
let start = rand::random::<u64>() | 1;
|
||||
let route_channel_capacity = route_channel_capacity.max(1);
|
||||
Self {
|
||||
@@ -209,9 +212,10 @@ impl ConnRegistry {
|
||||
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
|
||||
let (tx, rx) = mpsc::channel(self.route_channel_capacity);
|
||||
self.routing.map.insert(id, tx);
|
||||
self.routing
|
||||
.byte_budget
|
||||
.insert(id, Arc::new(Semaphore::new(self.route_byte_permits_per_conn)));
|
||||
self.routing.byte_budget.insert(
|
||||
id,
|
||||
Arc::new(Semaphore::new(self.route_byte_permits_per_conn)),
|
||||
);
|
||||
(id, rx)
|
||||
}
|
||||
|
||||
@@ -287,8 +291,7 @@ impl ConnRegistry {
|
||||
.map_err(|_| RouteResult::QueueFullHigh)?,
|
||||
Some(timeout_ms) => {
|
||||
let acquire = semaphore.acquire_many_owned(permits);
|
||||
match tokio::time::timeout(Duration::from_millis(timeout_ms.max(1)), acquire)
|
||||
.await
|
||||
match tokio::time::timeout(Duration::from_millis(timeout_ms.max(1)), acquire).await
|
||||
{
|
||||
Ok(Ok(permit)) => permit,
|
||||
Ok(Err(_)) => return Err(RouteResult::ChannelClosed),
|
||||
|
||||
Reference in New Issue
Block a user