mirror of https://github.com/telemt/telemt.git
add global fallback for per-user tcp connection limits
This commit is contained in:
parent
e35d69c61f
commit
a9dc56c565
|
|
@ -390,7 +390,8 @@ If your backend or network is very bandwidth-constrained, reduce cap first. If p
|
||||||
|---|---|---|---|---|---|
|
|---|---|---|---|---|---|
|
||||||
| users | `Map<String, String>` | `{"default": "000…000"}` | Secret must be 32 hex characters. | `[access.users]`<br>`user = "32-hex secret"`<br>`user2 = "32-hex secret"` | User credentials map used for client authentication. |
|
| users | `Map<String, String>` | `{"default": "000…000"}` | Secret must be 32 hex characters. | `[access.users]`<br>`user = "32-hex secret"`<br>`user2 = "32-hex secret"` | User credentials map used for client authentication. |
|
||||||
| user_ad_tags | `Map<String, String>` | `{}` | Every value must be exactly 32 hex characters. | `[access.user_ad_tags]`<br>`user = "32-hex ad_tag"` | Per-user ad tags used as override over `general.ad_tag`. |
|
| user_ad_tags | `Map<String, String>` | `{}` | Every value must be exactly 32 hex characters. | `[access.user_ad_tags]`<br>`user = "32-hex ad_tag"` | Per-user ad tags used as override over `general.ad_tag`. |
|
||||||
| user_max_tcp_conns | `Map<String, usize>` | `{}` | — | `[access.user_max_tcp_conns]`<br>`user = 500` | Per-user maximum concurrent TCP connections. |
|
| user_max_tcp_conns | `Map<String, usize>` | `{}` | — | `[access.user_max_tcp_conns]`<br>`user = 500` | Per-user override for the maximum concurrent TCP connections. |
|
||||||
|
| user_max_tcp_conns_global_each | `usize` | `0` | — | `user_max_tcp_conns_global_each = 0` | Global fallback used when `[access.user_max_tcp_conns]` has no per-user override. |
|
||||||
| user_expirations | `Map<String, DateTime<Utc>>` | `{}` | Timestamp must be valid RFC3339/ISO-8601 datetime. | `[access.user_expirations]`<br>`user = "2026-12-31T23:59:59Z"` | Per-user account expiration timestamps. |
|
| user_expirations | `Map<String, DateTime<Utc>>` | `{}` | Timestamp must be valid RFC3339/ISO-8601 datetime. | `[access.user_expirations]`<br>`user = "2026-12-31T23:59:59Z"` | Per-user account expiration timestamps. |
|
||||||
| user_data_quota | `Map<String, u64>` | `{}` | — | `[access.user_data_quota]`<br>`user = 1073741824` | Per-user traffic quota in bytes. |
|
| user_data_quota | `Map<String, u64>` | `{}` | — | `[access.user_data_quota]`<br>`user = 1073741824` | Per-user traffic quota in bytes. |
|
||||||
| user_max_unique_ips | `Map<String, usize>` | `{}` | — | `[access.user_max_unique_ips]`<br>`user = 16` | Per-user unique source IP limits. |
|
| user_max_unique_ips | `Map<String, usize>` | `{}` | — | `[access.user_max_unique_ips]`<br>`user = 16` | Per-user unique source IP limits. |
|
||||||
|
|
|
||||||
|
|
@ -144,7 +144,13 @@ pub(super) async fn create_user(
|
||||||
.unwrap_or(UserInfo {
|
.unwrap_or(UserInfo {
|
||||||
username: body.username.clone(),
|
username: body.username.clone(),
|
||||||
user_ad_tag: None,
|
user_ad_tag: None,
|
||||||
max_tcp_conns: None,
|
max_tcp_conns: cfg
|
||||||
|
.access
|
||||||
|
.user_max_tcp_conns
|
||||||
|
.get(&body.username)
|
||||||
|
.copied()
|
||||||
|
.or((cfg.access.user_max_tcp_conns_global_each > 0)
|
||||||
|
.then_some(cfg.access.user_max_tcp_conns_global_each)),
|
||||||
expiration_rfc3339: None,
|
expiration_rfc3339: None,
|
||||||
data_quota_bytes: None,
|
data_quota_bytes: None,
|
||||||
max_unique_ips: updated_limit,
|
max_unique_ips: updated_limit,
|
||||||
|
|
@ -395,7 +401,13 @@ pub(super) async fn users_from_config(
|
||||||
});
|
});
|
||||||
users.push(UserInfo {
|
users.push(UserInfo {
|
||||||
user_ad_tag: cfg.access.user_ad_tags.get(&username).cloned(),
|
user_ad_tag: cfg.access.user_ad_tags.get(&username).cloned(),
|
||||||
max_tcp_conns: cfg.access.user_max_tcp_conns.get(&username).copied(),
|
max_tcp_conns: cfg
|
||||||
|
.access
|
||||||
|
.user_max_tcp_conns
|
||||||
|
.get(&username)
|
||||||
|
.copied()
|
||||||
|
.or((cfg.access.user_max_tcp_conns_global_each > 0)
|
||||||
|
.then_some(cfg.access.user_max_tcp_conns_global_each)),
|
||||||
expiration_rfc3339: cfg
|
expiration_rfc3339: cfg
|
||||||
.access
|
.access
|
||||||
.user_expirations
|
.user_expirations
|
||||||
|
|
|
||||||
|
|
@ -761,6 +761,10 @@ pub(crate) fn default_access_users() -> HashMap<String, String> {
|
||||||
)])
|
)])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_user_max_tcp_conns_global_each() -> usize {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_user_max_unique_ips_window_secs() -> u64 {
|
pub(crate) fn default_user_max_unique_ips_window_secs() -> u64 {
|
||||||
DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS
|
DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -117,6 +117,7 @@ pub struct HotFields {
|
||||||
pub users: std::collections::HashMap<String, String>,
|
pub users: std::collections::HashMap<String, String>,
|
||||||
pub user_ad_tags: std::collections::HashMap<String, String>,
|
pub user_ad_tags: std::collections::HashMap<String, String>,
|
||||||
pub user_max_tcp_conns: std::collections::HashMap<String, usize>,
|
pub user_max_tcp_conns: std::collections::HashMap<String, usize>,
|
||||||
|
pub user_max_tcp_conns_global_each: usize,
|
||||||
pub user_expirations: std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
|
pub user_expirations: std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
|
||||||
pub user_data_quota: std::collections::HashMap<String, u64>,
|
pub user_data_quota: std::collections::HashMap<String, u64>,
|
||||||
pub user_max_unique_ips: std::collections::HashMap<String, usize>,
|
pub user_max_unique_ips: std::collections::HashMap<String, usize>,
|
||||||
|
|
@ -238,6 +239,7 @@ impl HotFields {
|
||||||
users: cfg.access.users.clone(),
|
users: cfg.access.users.clone(),
|
||||||
user_ad_tags: cfg.access.user_ad_tags.clone(),
|
user_ad_tags: cfg.access.user_ad_tags.clone(),
|
||||||
user_max_tcp_conns: cfg.access.user_max_tcp_conns.clone(),
|
user_max_tcp_conns: cfg.access.user_max_tcp_conns.clone(),
|
||||||
|
user_max_tcp_conns_global_each: cfg.access.user_max_tcp_conns_global_each,
|
||||||
user_expirations: cfg.access.user_expirations.clone(),
|
user_expirations: cfg.access.user_expirations.clone(),
|
||||||
user_data_quota: cfg.access.user_data_quota.clone(),
|
user_data_quota: cfg.access.user_data_quota.clone(),
|
||||||
user_max_unique_ips: cfg.access.user_max_unique_ips.clone(),
|
user_max_unique_ips: cfg.access.user_max_unique_ips.clone(),
|
||||||
|
|
@ -528,6 +530,7 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
|
||||||
cfg.access.users = new.access.users.clone();
|
cfg.access.users = new.access.users.clone();
|
||||||
cfg.access.user_ad_tags = new.access.user_ad_tags.clone();
|
cfg.access.user_ad_tags = new.access.user_ad_tags.clone();
|
||||||
cfg.access.user_max_tcp_conns = new.access.user_max_tcp_conns.clone();
|
cfg.access.user_max_tcp_conns = new.access.user_max_tcp_conns.clone();
|
||||||
|
cfg.access.user_max_tcp_conns_global_each = new.access.user_max_tcp_conns_global_each;
|
||||||
cfg.access.user_expirations = new.access.user_expirations.clone();
|
cfg.access.user_expirations = new.access.user_expirations.clone();
|
||||||
cfg.access.user_data_quota = new.access.user_data_quota.clone();
|
cfg.access.user_data_quota = new.access.user_data_quota.clone();
|
||||||
cfg.access.user_max_unique_ips = new.access.user_max_unique_ips.clone();
|
cfg.access.user_max_unique_ips = new.access.user_max_unique_ips.clone();
|
||||||
|
|
@ -1133,6 +1136,12 @@ fn log_changes(
|
||||||
new_hot.user_max_tcp_conns.len()
|
new_hot.user_max_tcp_conns.len()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
if old_hot.user_max_tcp_conns_global_each != new_hot.user_max_tcp_conns_global_each {
|
||||||
|
info!(
|
||||||
|
"config reload: user_max_tcp_conns policy global_each={}",
|
||||||
|
new_hot.user_max_tcp_conns_global_each
|
||||||
|
);
|
||||||
|
}
|
||||||
if old_hot.user_expirations != new_hot.user_expirations {
|
if old_hot.user_expirations != new_hot.user_expirations {
|
||||||
info!(
|
info!(
|
||||||
"config reload: user_expirations updated ({} entries)",
|
"config reload: user_expirations updated ({} entries)",
|
||||||
|
|
|
||||||
|
|
@ -1507,6 +1507,11 @@ pub struct AccessConfig {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub user_max_tcp_conns: HashMap<String, usize>,
|
pub user_max_tcp_conns: HashMap<String, usize>,
|
||||||
|
|
||||||
|
/// Global per-user TCP connection limit applied when a user has no individual override.
|
||||||
|
/// `0` disables the inherited limit.
|
||||||
|
#[serde(default = "default_user_max_tcp_conns_global_each")]
|
||||||
|
pub user_max_tcp_conns_global_each: usize,
|
||||||
|
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub user_expirations: HashMap<String, DateTime<Utc>>,
|
pub user_expirations: HashMap<String, DateTime<Utc>>,
|
||||||
|
|
||||||
|
|
@ -1543,6 +1548,7 @@ impl Default for AccessConfig {
|
||||||
users: default_access_users(),
|
users: default_access_users(),
|
||||||
user_ad_tags: HashMap::new(),
|
user_ad_tags: HashMap::new(),
|
||||||
user_max_tcp_conns: HashMap::new(),
|
user_max_tcp_conns: HashMap::new(),
|
||||||
|
user_max_tcp_conns_global_each: default_user_max_tcp_conns_global_each(),
|
||||||
user_expirations: HashMap::new(),
|
user_expirations: HashMap::new(),
|
||||||
user_data_quota: HashMap::new(),
|
user_data_quota: HashMap::new(),
|
||||||
user_max_unique_ips: HashMap::new(),
|
user_max_unique_ips: HashMap::new(),
|
||||||
|
|
|
||||||
|
|
@ -1164,7 +1164,10 @@ impl RunningClientHandler {
|
||||||
.access
|
.access
|
||||||
.user_max_tcp_conns
|
.user_max_tcp_conns
|
||||||
.get(user)
|
.get(user)
|
||||||
.map(|v| *v as u64);
|
.copied()
|
||||||
|
.or((config.access.user_max_tcp_conns_global_each > 0)
|
||||||
|
.then_some(config.access.user_max_tcp_conns_global_each))
|
||||||
|
.map(|v| v as u64);
|
||||||
if !stats.try_acquire_user_curr_connects(user, limit) {
|
if !stats.try_acquire_user_curr_connects(user, limit) {
|
||||||
return Err(ProxyError::ConnectionLimitExceeded {
|
return Err(ProxyError::ConnectionLimitExceeded {
|
||||||
user: user.to_string(),
|
user: user.to_string(),
|
||||||
|
|
@ -1223,7 +1226,10 @@ impl RunningClientHandler {
|
||||||
.access
|
.access
|
||||||
.user_max_tcp_conns
|
.user_max_tcp_conns
|
||||||
.get(user)
|
.get(user)
|
||||||
.map(|v| *v as u64);
|
.copied()
|
||||||
|
.or((config.access.user_max_tcp_conns_global_each > 0)
|
||||||
|
.then_some(config.access.user_max_tcp_conns_global_each))
|
||||||
|
.map(|v| v as u64);
|
||||||
if !stats.try_acquire_user_curr_connects(user, limit) {
|
if !stats.try_acquire_user_curr_connects(user, limit) {
|
||||||
return Err(ProxyError::ConnectionLimitExceeded {
|
return Err(ProxyError::ConnectionLimitExceeded {
|
||||||
user: user.to_string(),
|
user: user.to_string(),
|
||||||
|
|
|
||||||
|
|
@ -714,6 +714,101 @@ async fn reservation_limit_failure_does_not_leak_curr_connects_counter() {
|
||||||
assert_eq!(ip_tracker.get_active_ip_count(user).await, 0);
|
assert_eq!(ip_tracker.get_active_ip_count(user).await, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn global_tcp_limit_applies_without_user_override() {
|
||||||
|
let user = "global-limit-user";
|
||||||
|
let mut config = crate::config::ProxyConfig::default();
|
||||||
|
config.access.user_max_tcp_conns_global_each = 1;
|
||||||
|
|
||||||
|
let stats = Arc::new(crate::stats::Stats::new());
|
||||||
|
let ip_tracker = Arc::new(crate::ip_tracker::UserIpTracker::new());
|
||||||
|
ip_tracker.set_user_limit(user, 8).await;
|
||||||
|
|
||||||
|
let first_peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 201, 1)), 50001);
|
||||||
|
let first = RunningClientHandler::acquire_user_connection_reservation_static(
|
||||||
|
user,
|
||||||
|
&config,
|
||||||
|
stats.clone(),
|
||||||
|
first_peer,
|
||||||
|
ip_tracker.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("first reservation must succeed under inherited tcp-conns limit");
|
||||||
|
|
||||||
|
let second_peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 201, 2)), 50002);
|
||||||
|
let second = RunningClientHandler::acquire_user_connection_reservation_static(
|
||||||
|
user,
|
||||||
|
&config,
|
||||||
|
stats.clone(),
|
||||||
|
second_peer,
|
||||||
|
ip_tracker.clone(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
matches!(second, Err(crate::error::ProxyError::ConnectionLimitExceeded { user: denied }) if denied == user),
|
||||||
|
"second reservation must be rejected at the inherited tcp-conns limit"
|
||||||
|
);
|
||||||
|
|
||||||
|
first.release().await;
|
||||||
|
ip_tracker.drain_cleanup_queue().await;
|
||||||
|
assert_eq!(stats.get_user_curr_connects(user), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn per_user_tcp_limit_override_wins_over_global_limit() {
|
||||||
|
let user = "override-limit-user";
|
||||||
|
let mut config = crate::config::ProxyConfig::default();
|
||||||
|
config.access.user_max_tcp_conns_global_each = 1;
|
||||||
|
config.access.user_max_tcp_conns.insert(user.to_string(), 2);
|
||||||
|
|
||||||
|
let stats = Arc::new(crate::stats::Stats::new());
|
||||||
|
let ip_tracker = Arc::new(crate::ip_tracker::UserIpTracker::new());
|
||||||
|
ip_tracker.set_user_limit(user, 8).await;
|
||||||
|
|
||||||
|
let first_peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 202, 1)), 50001);
|
||||||
|
let first = RunningClientHandler::acquire_user_connection_reservation_static(
|
||||||
|
user,
|
||||||
|
&config,
|
||||||
|
stats.clone(),
|
||||||
|
first_peer,
|
||||||
|
ip_tracker.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("first reservation must succeed under per-user override");
|
||||||
|
|
||||||
|
let second_peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 202, 2)), 50002);
|
||||||
|
let second = RunningClientHandler::acquire_user_connection_reservation_static(
|
||||||
|
user,
|
||||||
|
&config,
|
||||||
|
stats.clone(),
|
||||||
|
second_peer,
|
||||||
|
ip_tracker.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("second reservation must succeed under per-user override");
|
||||||
|
|
||||||
|
let third_peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 202, 3)), 50003);
|
||||||
|
let third = RunningClientHandler::acquire_user_connection_reservation_static(
|
||||||
|
user,
|
||||||
|
&config,
|
||||||
|
stats.clone(),
|
||||||
|
third_peer,
|
||||||
|
ip_tracker.clone(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
matches!(third, Err(crate::error::ProxyError::ConnectionLimitExceeded { user: denied }) if denied == user),
|
||||||
|
"third reservation must be rejected once the per-user override is reached"
|
||||||
|
);
|
||||||
|
|
||||||
|
first.release().await;
|
||||||
|
second.release().await;
|
||||||
|
ip_tracker.drain_cleanup_queue().await;
|
||||||
|
assert_eq!(stats.get_user_curr_connects(user), 0);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn short_tls_probe_is_masked_through_client_pipeline() {
|
async fn short_tls_probe_is_masked_through_client_pipeline() {
|
||||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue