diff --git a/Cargo.lock b/Cargo.lock index 3f837a2..b4cfbca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2093,7 +2093,7 @@ dependencies = [ [[package]] name = "telemt" -version = "3.3.17" +version = "3.3.18" dependencies = [ "aes", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 51060d2..66a80c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.3.17" +version = "3.3.18" edition = "2024" [dependencies] diff --git a/README.md b/README.md index 1990109..2102a3a 100644 --- a/README.md +++ b/README.md @@ -19,9 +19,9 @@ ### πŸ‡·πŸ‡Ί RU -#### Π Π΅Π»ΠΈΠ· 3.3.16 +#### Π Π΅Π»ΠΈΠ· 3.3.15 Semistable -[3.3.16](https://github.com/telemt/telemt/releases/tag/3.3.16)! +[3.3.15](https://github.com/telemt/telemt/releases/tag/3.3.15) ΠΏΠΎ ΠΈΡ‚ΠΎΠ³Π°ΠΌ Ρ€Π°Π±ΠΎΡ‚Ρ‹ Π² ΠΏΡ€ΠΎΠ΄Π°ΠΊΡˆΠ½ ΠΏΡ€ΠΈΠ·Π½Π°Π½ ΠΎΠ΄Π½ΠΈΠΌ ΠΈΠ· самых ΡΡ‚Π°Π±ΠΈΠ»ΡŒΠ½Ρ‹Ρ… ΠΈ рСкомСндуСтся ΠΊ использованию, ΠΊΠΎΠ³Π΄Π° cutting-edge Ρ„ΠΈΡ‡ΠΈ Π½Π΅ΠΊΡ€ΠΈΡ‚ΠΈΡ‡Π½Ρ‹! Π‘ΡƒΠ΄Π΅ΠΌ Ρ€Π°Π΄Ρ‹ Π²Π°ΡˆΠ΅ΠΌΡƒ Ρ„ΠΈΠ΄Π±Π΅ΠΊΡƒ ΠΈ прСдлоТСниям ΠΏΠΎ ΡƒΠ»ΡƒΡ‡ΡˆΠ΅Π½ΠΈΡŽ β€” особСнно Π² части **API**, **статистики**, **UX** @@ -40,9 +40,9 @@ ### πŸ‡¬πŸ‡§ EN -#### Release 3.3.16 +#### Release 3.3.15 Semistable -[3.3.16](https://github.com/telemt/telemt/releases/tag/3.3.16) +[3.3.15](https://github.com/telemt/telemt/releases/tag/3.3.15) is, based on the results of his work in production, recognized as one of the most stable and recommended for use when cutting-edge features are not so necessary! We are looking forward to your feedback and improvement proposals β€” especially regarding **API**, **statistics**, **UX** diff --git a/docs/FAQ.en.md b/docs/FAQ.en.md index 49d1592..e3a6519 100644 --- a/docs/FAQ.en.md +++ b/docs/FAQ.en.md @@ -55,7 +55,10 @@ user2 = "00000000000000000000000000000002" user3 = "00000000000000000000000000000003" ``` 4. Save the config. Ctrl+S -> Ctrl+X. You don't need to restart telemt. -5. Get the links via `journalctl -u telemt -n -g "links" --no-pager -o cat | tac` +5. Get the links via +```bash +curl -s http://127.0.0.1:9091/v1/users | jq +``` ## How to view metrics diff --git a/docs/FAQ.ru.md b/docs/FAQ.ru.md index a2fcf1a..cb5db7f 100644 --- a/docs/FAQ.ru.md +++ b/docs/FAQ.ru.md @@ -55,7 +55,10 @@ user2 = "00000000000000000000000000000002" user3 = "00000000000000000000000000000003" ``` 4. Π‘ΠΎΡ…Ρ€Π°Π½ΠΈΡ‚ΡŒ ΠΊΠΎΠ½Ρ„ΠΈΠ³. Ctrl+S -> Ctrl+X. ΠŸΠ΅Ρ€Π΅Π·Π°ΠΏΡƒΡΠΊΠ°Ρ‚ΡŒ telemt Π½Π΅ Π½ΡƒΠΆΠ½ΠΎ. -5. ΠŸΠΎΠ»ΡƒΡ‡ΠΈΡ‚ΡŒ ссылки Ρ‡Π΅Ρ€Π΅Π· `journalctl -u telemt -n -g "links" --no-pager -o cat | tac` +5. ΠŸΠΎΠ»ΡƒΡ‡ΠΈΡ‚ΡŒ ссылки Ρ‡Π΅Ρ€Π΅Π· +```bash +curl -s http://127.0.0.1:9091/v1/users | jq +``` ## Как ΠΏΠΎΡΠΌΠΎΡ‚Ρ€Π΅Ρ‚ΡŒ ΠΌΠ΅Ρ‚Ρ€ΠΈΠΊΠΈ diff --git a/src/api/model.rs b/src/api/model.rs index 0bc52de..31233d7 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -236,6 +236,8 @@ pub(super) struct MeWritersSummary { pub(super) required_writers: usize, pub(super) alive_writers: usize, pub(super) coverage_pct: f64, + pub(super) fresh_alive_writers: usize, + pub(super) fresh_coverage_pct: f64, } #[derive(Serialize, Clone)] @@ -250,6 +252,12 @@ pub(super) struct MeWriterStatus { pub(super) bound_clients: usize, pub(super) idle_for_secs: Option, pub(super) rtt_ema_ms: Option, + pub(super) matches_active_generation: bool, + pub(super) in_desired_map: bool, + pub(super) allow_drain_fallback: bool, + pub(super) drain_started_at_epoch_secs: Option, + pub(super) drain_deadline_epoch_secs: Option, + pub(super) drain_over_ttl: bool, } #[derive(Serialize, Clone)] @@ -276,6 +284,8 @@ pub(super) struct DcStatus { pub(super) floor_capped: bool, pub(super) alive_writers: usize, pub(super) coverage_pct: f64, + pub(super) fresh_alive_writers: usize, + pub(super) fresh_coverage_pct: f64, pub(super) rtt_ms: Option, pub(super) load: usize, } diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index 139a4c5..9260c40 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -314,6 +314,8 @@ async fn get_minimal_payload_cached( required_writers: status.required_writers, alive_writers: status.alive_writers, coverage_pct: status.coverage_pct, + fresh_alive_writers: status.fresh_alive_writers, + fresh_coverage_pct: status.fresh_coverage_pct, }, writers: status .writers @@ -329,6 +331,12 @@ async fn get_minimal_payload_cached( bound_clients: entry.bound_clients, idle_for_secs: entry.idle_for_secs, rtt_ema_ms: entry.rtt_ema_ms, + matches_active_generation: entry.matches_active_generation, + in_desired_map: entry.in_desired_map, + allow_drain_fallback: entry.allow_drain_fallback, + drain_started_at_epoch_secs: entry.drain_started_at_epoch_secs, + drain_deadline_epoch_secs: entry.drain_deadline_epoch_secs, + drain_over_ttl: entry.drain_over_ttl, }) .collect(), }; @@ -363,6 +371,8 @@ async fn get_minimal_payload_cached( floor_capped: entry.floor_capped, alive_writers: entry.alive_writers, coverage_pct: entry.coverage_pct, + fresh_alive_writers: entry.fresh_alive_writers, + fresh_coverage_pct: entry.fresh_coverage_pct, rtt_ms: entry.rtt_ms, load: entry.load, }) @@ -486,6 +496,8 @@ fn disabled_me_writers(now_epoch_secs: u64, reason: &'static str) -> MeWritersDa required_writers: 0, alive_writers: 0, coverage_pct: 0.0, + fresh_alive_writers: 0, + fresh_coverage_pct: 0.0, }, writers: Vec::new(), } diff --git a/src/api/runtime_zero.rs b/src/api/runtime_zero.rs index 93e3931..ba89302 100644 --- a/src/api/runtime_zero.rs +++ b/src/api/runtime_zero.rs @@ -90,6 +90,7 @@ pub(super) struct EffectiveMiddleProxyLimits { #[derive(Serialize)] pub(super) struct EffectiveUserIpPolicyLimits { + pub(super) global_each: usize, pub(super) mode: &'static str, pub(super) window_secs: u64, } @@ -262,6 +263,7 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD me2dc_fallback: cfg.general.me2dc_fallback, }, user_ip_policy: EffectiveUserIpPolicyLimits { + global_each: cfg.access.user_max_unique_ips_global_each, mode: user_max_unique_ips_mode_label(cfg.access.user_max_unique_ips_mode), window_secs: cfg.access.user_max_unique_ips_window_secs, }, diff --git a/src/api/users.rs b/src/api/users.rs index da360c7..f339806 100644 --- a/src/api/users.rs +++ b/src/api/users.rs @@ -386,7 +386,16 @@ pub(super) async fn users_from_config( .get(&username) .map(chrono::DateTime::::to_rfc3339), data_quota_bytes: cfg.access.user_data_quota.get(&username).copied(), - max_unique_ips: cfg.access.user_max_unique_ips.get(&username).copied(), + max_unique_ips: cfg + .access + .user_max_unique_ips + .get(&username) + .copied() + .filter(|limit| *limit > 0) + .or( + (cfg.access.user_max_unique_ips_global_each > 0) + .then_some(cfg.access.user_max_unique_ips_global_each), + ), current_connections: stats.get_user_curr_connects(&username), active_unique_ips: active_ip_list.len(), active_unique_ips_list: active_ip_list, diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 76479cf..82ec0b3 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -584,6 +584,10 @@ pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 { 90 } +pub(crate) fn default_me_pool_drain_threshold() -> u64 { + 128 +} + pub(crate) fn default_me_bind_stale_ttl_secs() -> u64 { default_me_pool_drain_ttl_secs() } @@ -635,6 +639,10 @@ pub(crate) fn default_user_max_unique_ips_window_secs() -> u64 { DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS } +pub(crate) fn default_user_max_unique_ips_global_each() -> usize { + 0 +} + // Custom deserializer helpers #[derive(Deserialize)] diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index a375899..6f07a4b 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -55,6 +55,7 @@ pub struct HotFields { pub me_reinit_coalesce_window_ms: u64, pub hardswap: bool, pub me_pool_drain_ttl_secs: u64, + pub me_pool_drain_threshold: u64, pub me_pool_min_fresh_ratio: f32, pub me_reinit_drain_timeout_secs: u64, pub me_hardswap_warmup_delay_min_ms: u64, @@ -118,6 +119,7 @@ pub struct HotFields { pub user_expirations: std::collections::HashMap>, pub user_data_quota: std::collections::HashMap, pub user_max_unique_ips: std::collections::HashMap, + pub user_max_unique_ips_global_each: usize, pub user_max_unique_ips_mode: crate::config::UserMaxUniqueIpsMode, pub user_max_unique_ips_window_secs: u64, } @@ -135,6 +137,7 @@ impl HotFields { me_reinit_coalesce_window_ms: cfg.general.me_reinit_coalesce_window_ms, hardswap: cfg.general.hardswap, me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs, + me_pool_drain_threshold: cfg.general.me_pool_drain_threshold, me_pool_min_fresh_ratio: cfg.general.me_pool_min_fresh_ratio, me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs, me_hardswap_warmup_delay_min_ms: cfg.general.me_hardswap_warmup_delay_min_ms, @@ -232,6 +235,7 @@ impl HotFields { user_expirations: cfg.access.user_expirations.clone(), user_data_quota: cfg.access.user_data_quota.clone(), user_max_unique_ips: cfg.access.user_max_unique_ips.clone(), + user_max_unique_ips_global_each: cfg.access.user_max_unique_ips_global_each, user_max_unique_ips_mode: cfg.access.user_max_unique_ips_mode, user_max_unique_ips_window_secs: cfg.access.user_max_unique_ips_window_secs, } @@ -450,6 +454,7 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig { cfg.general.me_reinit_coalesce_window_ms = new.general.me_reinit_coalesce_window_ms; cfg.general.hardswap = new.general.hardswap; cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs; + cfg.general.me_pool_drain_threshold = new.general.me_pool_drain_threshold; cfg.general.me_pool_min_fresh_ratio = new.general.me_pool_min_fresh_ratio; cfg.general.me_reinit_drain_timeout_secs = new.general.me_reinit_drain_timeout_secs; cfg.general.me_hardswap_warmup_delay_min_ms = new.general.me_hardswap_warmup_delay_min_ms; @@ -532,6 +537,7 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig { cfg.access.user_expirations = new.access.user_expirations.clone(); cfg.access.user_data_quota = new.access.user_data_quota.clone(); cfg.access.user_max_unique_ips = new.access.user_max_unique_ips.clone(); + cfg.access.user_max_unique_ips_global_each = new.access.user_max_unique_ips_global_each; cfg.access.user_max_unique_ips_mode = new.access.user_max_unique_ips_mode; cfg.access.user_max_unique_ips_window_secs = new.access.user_max_unique_ips_window_secs; @@ -823,6 +829,13 @@ fn log_changes( ); } + if old_hot.me_pool_drain_threshold != new_hot.me_pool_drain_threshold { + info!( + "config reload: me_pool_drain_threshold: {} β†’ {}", + old_hot.me_pool_drain_threshold, new_hot.me_pool_drain_threshold, + ); + } + if (old_hot.me_pool_min_fresh_ratio - new_hot.me_pool_min_fresh_ratio).abs() > f32::EPSILON { info!( "config reload: me_pool_min_fresh_ratio: {:.3} β†’ {:.3}", @@ -1099,12 +1112,14 @@ fn log_changes( new_hot.user_max_unique_ips.len() ); } - if old_hot.user_max_unique_ips_mode != new_hot.user_max_unique_ips_mode + if old_hot.user_max_unique_ips_global_each != new_hot.user_max_unique_ips_global_each + || old_hot.user_max_unique_ips_mode != new_hot.user_max_unique_ips_mode || old_hot.user_max_unique_ips_window_secs != new_hot.user_max_unique_ips_window_secs { info!( - "config reload: user_max_unique_ips policy mode={:?} window={}s", + "config reload: user_max_unique_ips policy global_each={} mode={:?} window={}s", + new_hot.user_max_unique_ips_global_each, new_hot.user_max_unique_ips_mode, new_hot.user_max_unique_ips_window_secs ); diff --git a/src/config/types.rs b/src/config/types.rs index 24626a4..04a22ce 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -3,6 +3,7 @@ use ipnetwork::IpNetwork; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::net::IpAddr; +use std::path::PathBuf; use super::defaults::*; @@ -356,6 +357,9 @@ impl Default for NetworkConfig { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct GeneralConfig { + #[serde(default)] + pub data_path: Option, + #[serde(default)] pub modes: ProxyModes, @@ -794,6 +798,11 @@ pub struct GeneralConfig { #[serde(default = "default_me_pool_drain_ttl_secs")] pub me_pool_drain_ttl_secs: u64, + /// Maximum allowed number of draining ME writers before oldest ones are force-closed in batches. + /// Set to 0 to disable threshold-based draining cleanup and keep timeout-only behavior. + #[serde(default = "default_me_pool_drain_threshold")] + pub me_pool_drain_threshold: u64, + /// Policy for new binds on stale draining writers. #[serde(default)] pub me_bind_stale_mode: MeBindStaleMode, @@ -866,6 +875,7 @@ pub struct GeneralConfig { impl Default for GeneralConfig { fn default() -> Self { Self { + data_path: None, modes: ProxyModes::default(), prefer_ipv6: false, fast_mode: default_true(), @@ -973,6 +983,7 @@ impl Default for GeneralConfig { me_secret_atomic_snapshot: default_me_secret_atomic_snapshot(), proxy_secret_len_max: default_proxy_secret_len_max(), me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(), + me_pool_drain_threshold: default_me_pool_drain_threshold(), me_bind_stale_mode: MeBindStaleMode::default(), me_bind_stale_ttl_secs: default_me_bind_stale_ttl_secs(), me_pool_min_fresh_ratio: default_me_pool_min_fresh_ratio(), @@ -1317,6 +1328,11 @@ pub struct AccessConfig { #[serde(default)] pub user_max_unique_ips: HashMap, + /// Global per-user unique IP limit applied when a user has no individual override. + /// `0` disables the inherited limit. + #[serde(default = "default_user_max_unique_ips_global_each")] + pub user_max_unique_ips_global_each: usize, + #[serde(default)] pub user_max_unique_ips_mode: UserMaxUniqueIpsMode, @@ -1342,6 +1358,7 @@ impl Default for AccessConfig { user_expirations: HashMap::new(), user_data_quota: HashMap::new(), user_max_unique_ips: HashMap::new(), + user_max_unique_ips_global_each: default_user_max_unique_ips_global_each(), user_max_unique_ips_mode: UserMaxUniqueIpsMode::default(), user_max_unique_ips_window_secs: default_user_max_unique_ips_window_secs(), replay_check_len: default_replay_check_len(), diff --git a/src/ip_tracker.rs b/src/ip_tracker.rs index d406d51..fce20b6 100644 --- a/src/ip_tracker.rs +++ b/src/ip_tracker.rs @@ -17,6 +17,7 @@ pub struct UserIpTracker { active_ips: Arc>>>, recent_ips: Arc>>>, max_ips: Arc>>, + default_max_ips: Arc>, limit_mode: Arc>, limit_window: Arc>, last_compact_epoch_secs: Arc, @@ -28,6 +29,7 @@ impl UserIpTracker { active_ips: Arc::new(RwLock::new(HashMap::new())), recent_ips: Arc::new(RwLock::new(HashMap::new())), max_ips: Arc::new(RwLock::new(HashMap::new())), + default_max_ips: Arc::new(RwLock::new(0)), limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)), limit_window: Arc::new(RwLock::new(Duration::from_secs(30))), last_compact_epoch_secs: Arc::new(AtomicU64::new(0)), @@ -100,7 +102,10 @@ impl UserIpTracker { limits.remove(username); } - pub async fn load_limits(&self, limits: &HashMap) { + pub async fn load_limits(&self, default_limit: usize, limits: &HashMap) { + let mut default_max_ips = self.default_max_ips.write().await; + *default_max_ips = default_limit; + drop(default_max_ips); let mut max_ips = self.max_ips.write().await; max_ips.clone_from(limits); } @@ -114,9 +119,14 @@ impl UserIpTracker { pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> { self.maybe_compact_empty_users().await; + let default_max_ips = *self.default_max_ips.read().await; let limit = { let max_ips = self.max_ips.read().await; - max_ips.get(username).copied() + max_ips + .get(username) + .copied() + .filter(|limit| *limit > 0) + .or((default_max_ips > 0).then_some(default_max_ips)) }; let mode = *self.limit_mode.read().await; let window = *self.limit_window.read().await; @@ -255,10 +265,16 @@ impl UserIpTracker { pub async fn get_stats(&self) -> Vec<(String, usize, usize)> { let active_ips = self.active_ips.read().await; let max_ips = self.max_ips.read().await; + let default_max_ips = *self.default_max_ips.read().await; let mut stats = Vec::new(); for (username, user_ips) in active_ips.iter() { - let limit = max_ips.get(username).copied().unwrap_or(0); + let limit = max_ips + .get(username) + .copied() + .filter(|limit| *limit > 0) + .or((default_max_ips > 0).then_some(default_max_ips)) + .unwrap_or(0); stats.push((username.clone(), user_ips.len(), limit)); } @@ -293,8 +309,13 @@ impl UserIpTracker { } pub async fn get_user_limit(&self, username: &str) -> Option { + let default_max_ips = *self.default_max_ips.read().await; let max_ips = self.max_ips.read().await; - max_ips.get(username).copied() + max_ips + .get(username) + .copied() + .filter(|limit| *limit > 0) + .or((default_max_ips > 0).then_some(default_max_ips)) } pub async fn format_stats(&self) -> String { @@ -546,7 +567,7 @@ mod tests { config_limits.insert("user1".to_string(), 5); config_limits.insert("user2".to_string(), 3); - tracker.load_limits(&config_limits).await; + tracker.load_limits(0, &config_limits).await; assert_eq!(tracker.get_user_limit("user1").await, Some(5)); assert_eq!(tracker.get_user_limit("user2").await, Some(3)); @@ -560,16 +581,46 @@ mod tests { let mut first = HashMap::new(); first.insert("user1".to_string(), 2); first.insert("user2".to_string(), 3); - tracker.load_limits(&first).await; + tracker.load_limits(0, &first).await; let mut second = HashMap::new(); second.insert("user2".to_string(), 5); - tracker.load_limits(&second).await; + tracker.load_limits(0, &second).await; assert_eq!(tracker.get_user_limit("user1").await, None); assert_eq!(tracker.get_user_limit("user2").await, Some(5)); } + #[tokio::test] + async fn test_global_each_limit_applies_without_user_override() { + let tracker = UserIpTracker::new(); + tracker.load_limits(2, &HashMap::new()).await; + + let ip1 = test_ipv4(172, 16, 0, 1); + let ip2 = test_ipv4(172, 16, 0, 2); + let ip3 = test_ipv4(172, 16, 0, 3); + + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + assert!(tracker.check_and_add("test_user", ip2).await.is_ok()); + assert!(tracker.check_and_add("test_user", ip3).await.is_err()); + assert_eq!(tracker.get_user_limit("test_user").await, Some(2)); + } + + #[tokio::test] + async fn test_user_override_wins_over_global_each_limit() { + let tracker = UserIpTracker::new(); + let mut limits = HashMap::new(); + limits.insert("test_user".to_string(), 1); + tracker.load_limits(3, &limits).await; + + let ip1 = test_ipv4(172, 17, 0, 1); + let ip2 = test_ipv4(172, 17, 0, 2); + + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + assert!(tracker.check_and_add("test_user", ip2).await.is_err()); + assert_eq!(tracker.get_user_limit("test_user").await, Some(1)); + } + #[tokio::test] async fn test_time_window_mode_blocks_recent_ip_churn() { let tracker = UserIpTracker::new(); diff --git a/src/maestro/helpers.rs b/src/maestro/helpers.rs index acaecdd..78f3ec4 100644 --- a/src/maestro/helpers.rs +++ b/src/maestro/helpers.rs @@ -1,4 +1,5 @@ use std::time::Duration; +use std::path::PathBuf; use tokio::sync::watch; use tracing::{debug, error, info, warn}; @@ -9,8 +10,9 @@ use crate::transport::middle_proxy::{ ProxyConfigData, fetch_proxy_config_with_raw, load_proxy_config_cache, save_proxy_config_cache, }; -pub(crate) fn parse_cli() -> (String, bool, Option) { +pub(crate) fn parse_cli() -> (String, Option, bool, Option) { let mut config_path = "config.toml".to_string(); + let mut data_path: Option = None; let mut silent = false; let mut log_level: Option = None; @@ -28,6 +30,18 @@ pub(crate) fn parse_cli() -> (String, bool, Option) { let mut i = 0; while i < args.len() { match args[i].as_str() { + "--data-path" => { + i += 1; + if i < args.len() { + data_path = Some(PathBuf::from(args[i].clone())); + } else { + eprintln!("Missing value for --data-path"); + std::process::exit(0); + } + } + s if s.starts_with("--data-path=") => { + data_path = Some(PathBuf::from(s.trim_start_matches("--data-path=").to_string())); + } "--silent" | "-s" => { silent = true; } @@ -44,6 +58,7 @@ pub(crate) fn parse_cli() -> (String, bool, Option) { eprintln!("Usage: telemt [config.toml] [OPTIONS]"); eprintln!(); eprintln!("Options:"); + eprintln!(" --data-path Set data directory (absolute path; overrides config value)"); eprintln!(" --silent, -s Suppress info logs"); eprintln!(" --log-level debug|verbose|normal|silent"); eprintln!(" --help, -h Show this help"); @@ -78,7 +93,7 @@ pub(crate) fn parse_cli() -> (String, bool, Option) { i += 1; } - (config_path, silent, log_level) + (config_path, data_path, silent, log_level) } pub(crate) fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 72fdd40..245c7a9 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -237,6 +237,7 @@ pub(crate) async fn initialize_me_pool( config.general.me_adaptive_floor_max_warm_writers_global, config.general.hardswap, config.general.me_pool_drain_ttl_secs, + config.general.me_pool_drain_threshold, config.general.effective_me_pool_force_close_secs(), config.general.me_pool_min_fresh_ratio, config.general.me_hardswap_warmup_delay_min_ms, diff --git a/src/maestro/mod.rs b/src/maestro/mod.rs index 5f6c70a..047c204 100644 --- a/src/maestro/mod.rs +++ b/src/maestro/mod.rs @@ -58,7 +58,7 @@ pub async fn run() -> std::result::Result<(), Box> { startup_tracker .start_component(COMPONENT_CONFIG_LOAD, Some("load and validate config".to_string())) .await; - let (config_path, cli_silent, cli_log_level) = parse_cli(); + let (config_path, data_path, cli_silent, cli_log_level) = parse_cli(); let mut config = match ProxyConfig::load(&config_path) { Ok(c) => c, @@ -80,6 +80,34 @@ pub async fn run() -> std::result::Result<(), Box> { std::process::exit(1); } + if let Some(p) = data_path { + config.general.data_path = Some(p); + } + + if let Some(ref data_path) = config.general.data_path { + if !data_path.is_absolute() { + eprintln!("[telemt] data_path must be absolute: {}", data_path.display()); + std::process::exit(1); + } + + if data_path.exists() { + if !data_path.is_dir() { + eprintln!("[telemt] data_path exists but is not a directory: {}", data_path.display()); + std::process::exit(1); + } + } else { + if let Err(e) = std::fs::create_dir_all(data_path) { + eprintln!("[telemt] Can't create data_path {}: {}", data_path.display(), e); + std::process::exit(1); + } + } + + if let Err(e) = std::env::set_current_dir(data_path) { + eprintln!("[telemt] Can't use data_path {}: {}", data_path.display(), e); + std::process::exit(1); + } + } + if let Err(e) = crate::network::dns_overrides::install_entries(&config.network.dns_overrides) { eprintln!("[telemt] Invalid network.dns_overrides: {}", e); std::process::exit(1); @@ -168,17 +196,24 @@ pub async fn run() -> std::result::Result<(), Box> { stats.clone(), )); let ip_tracker = Arc::new(UserIpTracker::new()); - ip_tracker.load_limits(&config.access.user_max_unique_ips).await; + ip_tracker + .load_limits( + config.access.user_max_unique_ips_global_each, + &config.access.user_max_unique_ips, + ) + .await; ip_tracker .set_limit_policy( config.access.user_max_unique_ips_mode, config.access.user_max_unique_ips_window_secs, ) .await; - if !config.access.user_max_unique_ips.is_empty() { + if config.access.user_max_unique_ips_global_each > 0 || !config.access.user_max_unique_ips.is_empty() + { info!( - "IP limits configured for {} users", - config.access.user_max_unique_ips.len() + global_each_limit = config.access.user_max_unique_ips_global_each, + explicit_user_limits = config.access.user_max_unique_ips.len(), + "User unique IP limits configured" ); } if !config.network.dns_overrides.is_empty() { diff --git a/src/maestro/runtime_tasks.rs b/src/maestro/runtime_tasks.rs index c8aa534..329e267 100644 --- a/src/maestro/runtime_tasks.rs +++ b/src/maestro/runtime_tasks.rs @@ -131,6 +131,10 @@ pub(crate) async fn spawn_runtime_tasks( let mut config_rx_ip_limits = config_rx.clone(); tokio::spawn(async move { let mut prev_limits = config_rx_ip_limits.borrow().access.user_max_unique_ips.clone(); + let mut prev_global_each = config_rx_ip_limits + .borrow() + .access + .user_max_unique_ips_global_each; let mut prev_mode = config_rx_ip_limits.borrow().access.user_max_unique_ips_mode; let mut prev_window = config_rx_ip_limits .borrow() @@ -143,9 +147,17 @@ pub(crate) async fn spawn_runtime_tasks( } let cfg = config_rx_ip_limits.borrow_and_update().clone(); - if prev_limits != cfg.access.user_max_unique_ips { - ip_tracker_policy.load_limits(&cfg.access.user_max_unique_ips).await; + if prev_limits != cfg.access.user_max_unique_ips + || prev_global_each != cfg.access.user_max_unique_ips_global_each + { + ip_tracker_policy + .load_limits( + cfg.access.user_max_unique_ips_global_each, + &cfg.access.user_max_unique_ips, + ) + .await; prev_limits = cfg.access.user_max_unique_ips.clone(); + prev_global_each = cfg.access.user_max_unique_ips_global_each; } if prev_mode != cfg.access.user_max_unique_ips_mode diff --git a/src/metrics.rs b/src/metrics.rs index c24dc54..02edfd7 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1774,14 +1774,24 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp "# HELP telemt_user_unique_ips_recent_window Per-user unique IPs seen in configured observation window" ); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_recent_window gauge"); - let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)"); + let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Effective per-user unique IP limit (0 means unlimited)"); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge"); let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)"); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge"); for user in unique_users { let current = ip_counts.get(&user).copied().unwrap_or(0); - let limit = config.access.user_max_unique_ips.get(&user).copied().unwrap_or(0); + let limit = config + .access + .user_max_unique_ips + .get(&user) + .copied() + .filter(|limit| *limit > 0) + .or( + (config.access.user_max_unique_ips_global_each > 0) + .then_some(config.access.user_max_unique_ips_global_each), + ) + .unwrap_or(0); let utilization = if limit > 0 { current as f64 / limit as f64 } else { @@ -1904,6 +1914,25 @@ mod tests { assert!(output.contains("telemt_user_unique_ips_recent_window{user=")); } + #[tokio::test] + async fn test_render_uses_global_each_unique_ip_limit() { + let stats = Stats::new(); + stats.increment_user_connects("alice"); + stats.increment_user_curr_connects("alice"); + let tracker = UserIpTracker::new(); + tracker + .check_and_add("alice", "203.0.113.10".parse().unwrap()) + .await + .unwrap(); + let mut config = ProxyConfig::default(); + config.access.user_max_unique_ips_global_each = 2; + + let output = render_metrics(&stats, &config, &tracker).await; + + assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 2")); + assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.500000")); + } + #[tokio::test] async fn test_render_has_type_annotations() { let stats = Stats::new(); diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 194da5b..b6a0160 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -298,6 +298,7 @@ async fn run_update_cycle( pool.update_runtime_reinit_policy( cfg.general.hardswap, cfg.general.me_pool_drain_ttl_secs, + cfg.general.me_pool_drain_threshold, cfg.general.effective_me_pool_force_close_secs(), cfg.general.me_pool_min_fresh_ratio, cfg.general.me_hardswap_warmup_delay_min_ms, @@ -524,6 +525,7 @@ pub async fn me_config_updater( pool.update_runtime_reinit_policy( cfg.general.hardswap, cfg.general.me_pool_drain_ttl_secs, + cfg.general.me_pool_drain_threshold, cfg.general.effective_me_pool_force_close_secs(), cfg.general.me_pool_min_fresh_ratio, cfg.general.me_hardswap_warmup_delay_min_ms, diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index b422dc6..e5f4260 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -62,6 +62,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut drain_warn_next_allowed: HashMap = HashMap::new(); let mut degraded_interval = true; loop { let interval = if degraded_interval { @@ -71,7 +72,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c }; tokio::time::sleep(interval).await; pool.prune_closed_writers().await; - reap_draining_writers(&pool).await; + reap_draining_writers(&pool, &mut drain_warn_next_allowed).await; let v4_degraded = check_family( IpFamily::V4, &pool, @@ -110,17 +111,81 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c } } -async fn reap_draining_writers(pool: &Arc) { +async fn reap_draining_writers( + pool: &Arc, + warn_next_allowed: &mut HashMap, +) { let now_epoch_secs = MePool::now_epoch_secs(); + let now = Instant::now(); + let drain_ttl_secs = pool.me_pool_drain_ttl_secs.load(std::sync::atomic::Ordering::Relaxed); + let drain_threshold = pool + .me_pool_drain_threshold + .load(std::sync::atomic::Ordering::Relaxed); let writers = pool.writers.read().await.clone(); + let mut draining_writers = Vec::new(); for writer in writers { if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) { continue; } - if pool.registry.is_writer_empty(writer.id).await { + let is_empty = pool.registry.is_writer_empty(writer.id).await; + if is_empty { pool.remove_writer_and_close_clients(writer.id).await; continue; } + draining_writers.push(writer); + } + + if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize { + draining_writers.sort_by(|left, right| { + let left_started = left + .draining_started_at_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed); + let right_started = right + .draining_started_at_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed); + left_started + .cmp(&right_started) + .then_with(|| left.created_at.cmp(&right.created_at)) + .then_with(|| left.id.cmp(&right.id)) + }); + let overflow = draining_writers.len().saturating_sub(drain_threshold as usize); + warn!( + draining_writers = draining_writers.len(), + me_pool_drain_threshold = drain_threshold, + removing_writers = overflow, + "ME draining writer threshold exceeded, force-closing oldest draining writers" + ); + for writer in draining_writers.drain(..overflow) { + pool.stats.increment_pool_force_close_total(); + pool.remove_writer_and_close_clients(writer.id).await; + } + } + + for writer in draining_writers { + let drain_started_at_epoch_secs = writer + .draining_started_at_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed); + if drain_ttl_secs > 0 + && drain_started_at_epoch_secs != 0 + && now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > drain_ttl_secs + && should_emit_writer_warn( + warn_next_allowed, + writer.id, + now, + pool.warn_rate_limit_duration(), + ) + { + warn!( + writer_id = writer.id, + writer_dc = writer.writer_dc, + endpoint = %writer.addr, + generation = writer.generation, + drain_ttl_secs, + force_close_secs = pool.me_pool_force_close_secs.load(std::sync::atomic::Ordering::Relaxed), + allow_drain_fallback = writer.allow_drain_fallback.load(std::sync::atomic::Ordering::Relaxed), + "ME draining writer remains non-empty past drain TTL" + ); + } let deadline_epoch_secs = writer .drain_deadline_epoch_secs .load(std::sync::atomic::Ordering::Relaxed); @@ -132,6 +197,23 @@ async fn reap_draining_writers(pool: &Arc) { } } +fn should_emit_writer_warn( + next_allowed: &mut HashMap, + writer_id: u64, + now: Instant, + cooldown: Duration, +) -> bool { + let Some(ready_at) = next_allowed.get(&writer_id).copied() else { + next_allowed.insert(writer_id, now + cooldown); + return true; + }; + if now >= ready_at { + next_allowed.insert(writer_id, now + cooldown); + return true; + } + false +} + async fn check_family( family: IpFamily, pool: &Arc, @@ -1222,3 +1304,190 @@ async fn maybe_rotate_single_endpoint_shadow( "Single-endpoint shadow writer rotated" ); } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::Arc; + use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; + use std::time::{Duration, Instant}; + + use tokio::sync::mpsc; + use tokio_util::sync::CancellationToken; + + use super::reap_draining_writers; + use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode}; + use crate::crypto::SecureRandom; + use crate::network::probe::NetworkDecision; + use crate::stats::Stats; + use crate::transport::middle_proxy::codec::WriterCommand; + use crate::transport::middle_proxy::pool::{MePool, MeWriter, WriterContour}; + use crate::transport::middle_proxy::registry::ConnMeta; + + async fn make_pool(me_pool_drain_threshold: u64) -> Arc { + let general = GeneralConfig { + me_pool_drain_threshold, + ..GeneralConfig::default() + }; + MePool::new( + None, + vec![1u8; 32], + None, + false, + None, + Vec::new(), + 1, + None, + 12, + 1200, + HashMap::new(), + HashMap::new(), + None, + NetworkDecision::default(), + None, + Arc::new(SecureRandom::new()), + Arc::new(Stats::default()), + general.me_keepalive_enabled, + general.me_keepalive_interval_secs, + general.me_keepalive_jitter_secs, + general.me_keepalive_payload_random, + general.rpc_proxy_req_every, + general.me_warmup_stagger_enabled, + general.me_warmup_step_delay_ms, + general.me_warmup_step_jitter_ms, + general.me_reconnect_max_concurrent_per_dc, + general.me_reconnect_backoff_base_ms, + general.me_reconnect_backoff_cap_ms, + general.me_reconnect_fast_retry_count, + general.me_single_endpoint_shadow_writers, + general.me_single_endpoint_outage_mode_enabled, + general.me_single_endpoint_outage_disable_quarantine, + general.me_single_endpoint_outage_backoff_min_ms, + general.me_single_endpoint_outage_backoff_max_ms, + general.me_single_endpoint_shadow_rotate_every_secs, + general.me_floor_mode, + general.me_adaptive_floor_idle_secs, + general.me_adaptive_floor_min_writers_single_endpoint, + general.me_adaptive_floor_min_writers_multi_endpoint, + general.me_adaptive_floor_recover_grace_secs, + general.me_adaptive_floor_writers_per_core_total, + general.me_adaptive_floor_cpu_cores_override, + general.me_adaptive_floor_max_extra_writers_single_per_core, + general.me_adaptive_floor_max_extra_writers_multi_per_core, + general.me_adaptive_floor_max_active_writers_per_core, + general.me_adaptive_floor_max_warm_writers_per_core, + general.me_adaptive_floor_max_active_writers_global, + general.me_adaptive_floor_max_warm_writers_global, + general.hardswap, + general.me_pool_drain_ttl_secs, + general.me_pool_drain_threshold, + general.effective_me_pool_force_close_secs(), + general.me_pool_min_fresh_ratio, + general.me_hardswap_warmup_delay_min_ms, + general.me_hardswap_warmup_delay_max_ms, + general.me_hardswap_warmup_extra_passes, + general.me_hardswap_warmup_pass_backoff_base_ms, + general.me_bind_stale_mode, + general.me_bind_stale_ttl_secs, + general.me_secret_atomic_snapshot, + general.me_deterministic_writer_sort, + MeWriterPickMode::default(), + general.me_writer_pick_sample_size, + MeSocksKdfPolicy::default(), + general.me_writer_cmd_channel_capacity, + general.me_route_channel_capacity, + general.me_route_backpressure_base_timeout_ms, + general.me_route_backpressure_high_timeout_ms, + general.me_route_backpressure_high_watermark_pct, + general.me_reader_route_data_wait_ms, + general.me_health_interval_ms_unhealthy, + general.me_health_interval_ms_healthy, + general.me_warn_rate_limit_ms, + MeRouteNoWriterMode::default(), + general.me_route_no_writer_wait_ms, + general.me_route_inline_recovery_attempts, + general.me_route_inline_recovery_wait_ms, + ) + } + + async fn insert_draining_writer( + pool: &Arc, + writer_id: u64, + drain_started_at_epoch_secs: u64, + ) -> u64 { + let (conn_id, _rx) = pool.registry.register().await; + let (tx, _writer_rx) = mpsc::channel::(8); + let writer = MeWriter { + id: writer_id, + addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000 + writer_id as u16), + source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST), + writer_dc: 2, + generation: 1, + contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())), + created_at: Instant::now() - Duration::from_secs(writer_id), + tx: tx.clone(), + cancel: CancellationToken::new(), + degraded: Arc::new(AtomicBool::new(false)), + rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)), + draining: Arc::new(AtomicBool::new(true)), + draining_started_at_epoch_secs: Arc::new(AtomicU64::new(drain_started_at_epoch_secs)), + drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)), + allow_drain_fallback: Arc::new(AtomicBool::new(false)), + }; + pool.writers.write().await.push(writer); + pool.registry.register_writer(writer_id, tx).await; + pool.conn_count.fetch_add(1, Ordering::Relaxed); + assert!( + pool.registry + .bind_writer( + conn_id, + writer_id, + ConnMeta { + target_dc: 2, + client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6000), + our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443), + proto_flags: 0, + }, + ) + .await + ); + conn_id + } + + #[tokio::test] + async fn reap_draining_writers_force_closes_oldest_over_threshold() { + let pool = make_pool(2).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let conn_a = insert_draining_writer(&pool, 10, now_epoch_secs.saturating_sub(30)).await; + let conn_b = insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(20)).await; + let conn_c = insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(10)).await; + let mut warn_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + + let writer_ids: Vec = pool.writers.read().await.iter().map(|writer| writer.id).collect(); + assert_eq!(writer_ids, vec![20, 30]); + assert!(pool.registry.get_writer(conn_a).await.is_none()); + assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20); + assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30); + } + + #[tokio::test] + async fn reap_draining_writers_keeps_timeout_only_behavior_when_threshold_disabled() { + let pool = make_pool(0).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let conn_a = insert_draining_writer(&pool, 10, now_epoch_secs.saturating_sub(30)).await; + let conn_b = insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(20)).await; + let conn_c = insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(10)).await; + let mut warn_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + + let writer_ids: Vec = pool.writers.read().await.iter().map(|writer| writer.id).collect(); + assert_eq!(writer_ids, vec![10, 20, 30]); + assert_eq!(pool.registry.get_writer(conn_a).await.unwrap().writer_id, 10); + assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20); + assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30); + } +} diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 42cba81..2a65160 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -171,6 +171,7 @@ pub struct MePool { pub(super) endpoint_quarantine: Arc>>, pub(super) kdf_material_fingerprint: Arc>>, pub(super) me_pool_drain_ttl_secs: AtomicU64, + pub(super) me_pool_drain_threshold: AtomicU64, pub(super) me_pool_force_close_secs: AtomicU64, pub(super) me_pool_min_fresh_ratio_permille: AtomicU32, pub(super) me_hardswap_warmup_delay_min_ms: AtomicU64, @@ -271,6 +272,7 @@ impl MePool { me_adaptive_floor_max_warm_writers_global: u32, hardswap: bool, me_pool_drain_ttl_secs: u64, + me_pool_drain_threshold: u64, me_pool_force_close_secs: u64, me_pool_min_fresh_ratio: f32, me_hardswap_warmup_delay_min_ms: u64, @@ -446,6 +448,7 @@ impl MePool { endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())), me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs), + me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold), me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs), me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille( me_pool_min_fresh_ratio, @@ -492,6 +495,7 @@ impl MePool { &self, hardswap: bool, drain_ttl_secs: u64, + pool_drain_threshold: u64, force_close_secs: u64, min_fresh_ratio: f32, hardswap_warmup_delay_min_ms: u64, @@ -530,6 +534,8 @@ impl MePool { self.hardswap.store(hardswap, Ordering::Relaxed); self.me_pool_drain_ttl_secs .store(drain_ttl_secs, Ordering::Relaxed); + self.me_pool_drain_threshold + .store(pool_drain_threshold, Ordering::Relaxed); self.me_pool_force_close_secs .store(force_close_secs, Ordering::Relaxed); self.me_pool_min_fresh_ratio_permille diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index 6673cf2..99070a8 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -19,6 +19,12 @@ pub(crate) struct MeApiWriterStatusSnapshot { pub bound_clients: usize, pub idle_for_secs: Option, pub rtt_ema_ms: Option, + pub matches_active_generation: bool, + pub in_desired_map: bool, + pub allow_drain_fallback: bool, + pub drain_started_at_epoch_secs: Option, + pub drain_deadline_epoch_secs: Option, + pub drain_over_ttl: bool, } #[derive(Clone, Debug)] @@ -35,6 +41,8 @@ pub(crate) struct MeApiDcStatusSnapshot { pub floor_capped: bool, pub alive_writers: usize, pub coverage_pct: f64, + pub fresh_alive_writers: usize, + pub fresh_coverage_pct: f64, pub rtt_ms: Option, pub load: usize, } @@ -55,6 +63,8 @@ pub(crate) struct MeApiStatusSnapshot { pub required_writers: usize, pub alive_writers: usize, pub coverage_pct: f64, + pub fresh_alive_writers: usize, + pub fresh_coverage_pct: f64, pub writers: Vec, pub dcs: Vec, } @@ -213,6 +223,8 @@ impl MePool { pub(crate) async fn api_status_snapshot(&self) -> MeApiStatusSnapshot { let now_epoch_secs = Self::now_epoch_secs(); + let active_generation = self.current_generation(); + let drain_ttl_secs = self.me_pool_drain_ttl_secs.load(Ordering::Relaxed); let mut endpoints_by_dc = BTreeMap::>::new(); if self.decision.ipv4_me { @@ -239,6 +251,7 @@ impl MePool { let mut live_writers_by_dc_endpoint = HashMap::<(i16, SocketAddr), usize>::new(); let mut live_writers_by_dc = HashMap::::new(); + let mut fresh_writers_by_dc = HashMap::::new(); let mut dc_rtt_agg = HashMap::::new(); let mut writer_rows = Vec::::with_capacity(writers.len()); @@ -247,6 +260,10 @@ impl MePool { let dc = i16::try_from(writer.writer_dc).ok(); let draining = writer.draining.load(Ordering::Relaxed); let degraded = writer.degraded.load(Ordering::Relaxed); + let matches_active_generation = writer.generation == active_generation; + let in_desired_map = dc + .and_then(|dc_idx| endpoints_by_dc.get(&dc_idx)) + .is_some_and(|endpoints| endpoints.contains(&endpoint)); let bound_clients = activity .bound_clients_by_writer .get(&writer.id) @@ -256,6 +273,21 @@ impl MePool { .get(&writer.id) .map(|idle_ts| now_epoch_secs.saturating_sub(*idle_ts)); let rtt_ema_ms = rtt.get(&writer.id).map(|(_, ema)| *ema); + let allow_drain_fallback = writer.allow_drain_fallback.load(Ordering::Relaxed); + let drain_started_at_epoch_secs = writer + .draining_started_at_epoch_secs + .load(Ordering::Relaxed); + let drain_deadline_epoch_secs = writer + .drain_deadline_epoch_secs + .load(Ordering::Relaxed); + let drain_started_at_epoch_secs = + (drain_started_at_epoch_secs != 0).then_some(drain_started_at_epoch_secs); + let drain_deadline_epoch_secs = + (drain_deadline_epoch_secs != 0).then_some(drain_deadline_epoch_secs); + let drain_over_ttl = draining + && drain_ttl_secs > 0 + && drain_started_at_epoch_secs + .is_some_and(|started| now_epoch_secs.saturating_sub(started) > drain_ttl_secs); let state = match WriterContour::from_u8(writer.contour.load(Ordering::Relaxed)) { WriterContour::Warm => "warm", WriterContour::Active => "active", @@ -273,6 +305,9 @@ impl MePool { entry.0 += ema_ms; entry.1 += 1; } + if matches_active_generation && in_desired_map { + *fresh_writers_by_dc.entry(dc_idx).or_insert(0) += 1; + } } } @@ -287,6 +322,12 @@ impl MePool { bound_clients, idle_for_secs, rtt_ema_ms, + matches_active_generation, + in_desired_map, + allow_drain_fallback, + drain_started_at_epoch_secs, + drain_deadline_epoch_secs, + drain_over_ttl, }); } @@ -295,6 +336,7 @@ impl MePool { let mut dcs = Vec::::with_capacity(endpoints_by_dc.len()); let mut available_endpoints = 0usize; let mut alive_writers = 0usize; + let mut fresh_alive_writers = 0usize; let floor_mode = self.floor_mode(); let adaptive_cpu_cores = (self .me_adaptive_floor_cpu_cores_effective @@ -333,6 +375,7 @@ impl MePool { let floor_capped = matches!(floor_mode, MeFloorMode::Adaptive) && dc_required_writers < base_required; let dc_alive_writers = live_writers_by_dc.get(&dc).copied().unwrap_or(0); + let dc_fresh_alive_writers = fresh_writers_by_dc.get(&dc).copied().unwrap_or(0); let dc_load = activity .active_sessions_by_target_dc .get(&dc) @@ -344,6 +387,7 @@ impl MePool { available_endpoints += dc_available_endpoints; alive_writers += dc_alive_writers; + fresh_alive_writers += dc_fresh_alive_writers; dcs.push(MeApiDcStatusSnapshot { dc, @@ -367,6 +411,8 @@ impl MePool { floor_capped, alive_writers: dc_alive_writers, coverage_pct: ratio_pct(dc_alive_writers, dc_required_writers), + fresh_alive_writers: dc_fresh_alive_writers, + fresh_coverage_pct: ratio_pct(dc_fresh_alive_writers, dc_required_writers), rtt_ms: dc_rtt_ms, load: dc_load, }); @@ -381,6 +427,8 @@ impl MePool { required_writers, alive_writers, coverage_pct: ratio_pct(alive_writers, required_writers), + fresh_alive_writers, + fresh_coverage_pct: ratio_pct(fresh_alive_writers, required_writers), writers: writer_rows, dcs, } diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 64fb700..8ce3de3 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -178,6 +178,7 @@ impl MePool { allow_drain_fallback: allow_drain_fallback.clone(), }; self.writers.write().await.push(writer.clone()); + self.registry.register_writer(writer_id, tx.clone()).await; self.registry.mark_writer_idle(writer_id).await; self.conn_count.fetch_add(1, Ordering::Relaxed); self.writer_available.notify_one(); @@ -414,9 +415,15 @@ impl MePool { }; let (conn_id, mut service_rx) = pool.registry.register().await; - pool.registry - .bind_writer(conn_id, writer_id, tx_signal.clone(), meta.clone()) - .await; + if !pool + .registry + .bind_writer(conn_id, writer_id, meta.clone()) + .await + { + let _ = pool.registry.unregister(conn_id).await; + stats_signal.increment_me_rpc_proxy_req_signal_skipped_no_meta_total(); + continue; + } let payload = build_proxy_req_payload( conn_id, @@ -521,6 +528,12 @@ impl MePool { self.conn_count.fetch_sub(1, Ordering::Relaxed); } } + let conns = self.registry.writer_lost(writer_id).await; + { + let mut tracker = self.ping_tracker.lock().await; + tracker.retain(|_, (_, wid)| *wid != writer_id); + } + self.rtt_stats.lock().await.remove(&writer_id); if let Some(tx) = close_tx { let _ = tx.send(WriterCommand::Close).await; } @@ -533,8 +546,7 @@ impl MePool { } self.trigger_immediate_refill_for_dc(addr, writer_dc); } - self.rtt_stats.lock().await.remove(&writer_id); - self.registry.writer_lost(writer_id).await + conns } pub(crate) async fn mark_writer_draining_with_timeout( diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index ee04969..cc3028b 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -138,6 +138,15 @@ impl ConnRegistry { (id, rx) } + pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender) { + let mut inner = self.inner.write().await; + inner.writers.insert(writer_id, tx); + inner + .conns_for_writer + .entry(writer_id) + .or_insert_with(HashSet::new); + } + /// Unregister connection, returning associated writer_id if any. pub async fn unregister(&self, id: u64) -> Option { let mut inner = self.inner.write().await; @@ -282,24 +291,39 @@ impl ConnRegistry { } } - pub async fn bind_writer( - &self, - conn_id: u64, - writer_id: u64, - tx: mpsc::Sender, - meta: ConnMeta, - ) { + pub async fn bind_writer(&self, conn_id: u64, writer_id: u64, meta: ConnMeta) -> bool { let mut inner = self.inner.write().await; - inner.meta.entry(conn_id).or_insert(meta.clone()); - inner.writer_for_conn.insert(conn_id, writer_id); + if !inner.writers.contains_key(&writer_id) { + return false; + } + + let previous_writer_id = inner.writer_for_conn.insert(conn_id, writer_id); + if let Some(previous_writer_id) = previous_writer_id + && previous_writer_id != writer_id + { + let became_empty = if let Some(set) = inner.conns_for_writer.get_mut(&previous_writer_id) + { + set.remove(&conn_id); + set.is_empty() + } else { + false + }; + if became_empty { + inner + .writer_idle_since_epoch_secs + .insert(previous_writer_id, Self::now_epoch_secs()); + } + } + + inner.meta.insert(conn_id, meta.clone()); inner.last_meta_for_writer.insert(writer_id, meta); inner.writer_idle_since_epoch_secs.remove(&writer_id); - inner.writers.entry(writer_id).or_insert_with(|| tx.clone()); inner .conns_for_writer .entry(writer_id) .or_insert_with(HashSet::new) .insert(conn_id); + true } pub async fn mark_writer_idle(&self, writer_id: u64) { @@ -384,6 +408,9 @@ impl ConnRegistry { let mut out = Vec::new(); for conn_id in conns { + if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) { + continue; + } inner.writer_for_conn.remove(&conn_id); if let Some(m) = inner.meta.get(&conn_id) { out.push(BoundConn { @@ -427,47 +454,52 @@ mod tests { let (conn_c, _rx_c) = registry.register().await; let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8); let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8); + registry.register_writer(10, writer_tx_a.clone()).await; + registry.register_writer(20, writer_tx_b.clone()).await; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443); - registry - .bind_writer( - conn_a, - 10, - writer_tx_a.clone(), - ConnMeta { - target_dc: 2, - client_addr: addr, - our_addr: addr, - proto_flags: 0, - }, - ) - .await; - registry - .bind_writer( - conn_b, - 10, - writer_tx_a, - ConnMeta { - target_dc: -2, - client_addr: addr, - our_addr: addr, - proto_flags: 0, - }, - ) - .await; - registry - .bind_writer( - conn_c, - 20, - writer_tx_b, - ConnMeta { - target_dc: 4, - client_addr: addr, - our_addr: addr, - proto_flags: 0, - }, - ) - .await; + assert!( + registry + .bind_writer( + conn_a, + 10, + ConnMeta { + target_dc: 2, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await + ); + assert!( + registry + .bind_writer( + conn_b, + 10, + ConnMeta { + target_dc: -2, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await + ); + assert!( + registry + .bind_writer( + conn_c, + 20, + ConnMeta { + target_dc: 4, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await + ); let snapshot = registry.writer_activity_snapshot().await; assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&2)); @@ -476,4 +508,130 @@ mod tests { assert_eq!(snapshot.active_sessions_by_target_dc.get(&-2), Some(&1)); assert_eq!(snapshot.active_sessions_by_target_dc.get(&4), Some(&1)); } + + #[tokio::test] + async fn bind_writer_rebinds_conn_atomically() { + let registry = ConnRegistry::new(); + let (conn_id, _rx) = registry.register().await; + let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8); + let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8); + registry.register_writer(10, writer_tx_a).await; + registry.register_writer(20, writer_tx_b).await; + + let client_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443); + let first_our_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)), 443); + let second_our_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(2, 2, 2, 2)), 443); + + assert!( + registry + .bind_writer( + conn_id, + 10, + ConnMeta { + target_dc: 2, + client_addr, + our_addr: first_our_addr, + proto_flags: 1, + }, + ) + .await + ); + assert!( + registry + .bind_writer( + conn_id, + 20, + ConnMeta { + target_dc: 2, + client_addr, + our_addr: second_our_addr, + proto_flags: 2, + }, + ) + .await + ); + + let writer = registry.get_writer(conn_id).await.expect("writer binding"); + assert_eq!(writer.writer_id, 20); + + let meta = registry.get_meta(conn_id).await.expect("conn meta"); + assert_eq!(meta.our_addr, second_our_addr); + assert_eq!(meta.proto_flags, 2); + + let snapshot = registry.writer_activity_snapshot().await; + assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&0)); + assert_eq!(snapshot.bound_clients_by_writer.get(&20), Some(&1)); + assert!(registry.writer_idle_since_snapshot().await.contains_key(&10)); + } + + #[tokio::test] + async fn writer_lost_does_not_drop_rebound_conn() { + let registry = ConnRegistry::new(); + let (conn_id, _rx) = registry.register().await; + let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8); + let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8); + registry.register_writer(10, writer_tx_a).await; + registry.register_writer(20, writer_tx_b).await; + + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443); + assert!( + registry + .bind_writer( + conn_id, + 10, + ConnMeta { + target_dc: 2, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await + ); + assert!( + registry + .bind_writer( + conn_id, + 20, + ConnMeta { + target_dc: 2, + client_addr: addr, + our_addr: addr, + proto_flags: 1, + }, + ) + .await + ); + + let lost = registry.writer_lost(10).await; + assert!(lost.is_empty()); + assert_eq!(registry.get_writer(conn_id).await.expect("writer").writer_id, 20); + + let removed_writer = registry.unregister(conn_id).await; + assert_eq!(removed_writer, Some(20)); + assert!(registry.is_writer_empty(20).await); + } + + #[tokio::test] + async fn bind_writer_rejects_unregistered_writer() { + let registry = ConnRegistry::new(); + let (conn_id, _rx) = registry.register().await; + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443); + + assert!( + !registry + .bind_writer( + conn_id, + 10, + ConnMeta { + target_dc: 2, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await + ); + assert!(registry.get_writer(conn_id).await.is_none()); + } } diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index f63662b..0f9fed6 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -375,9 +375,14 @@ impl MePool { match w.tx.try_send(WriterCommand::Data(payload.clone())) { Ok(()) => { self.stats.increment_me_writer_pick_success_try_total(pick_mode); - self.registry - .bind_writer(conn_id, w.id, w.tx.clone(), meta) - .await; + if !self.registry.bind_writer(conn_id, w.id, meta).await { + debug!( + conn_id, + writer_id = w.id, + "ME writer disappeared before bind commit, retrying" + ); + continue; + } if w.generation < self.current_generation() { self.stats.increment_pool_stale_pick_total(); debug!( @@ -421,9 +426,14 @@ impl MePool { Ok(()) => { self.stats .increment_me_writer_pick_success_fallback_total(pick_mode); - self.registry - .bind_writer(conn_id, w.id, w.tx.clone(), meta) - .await; + if !self.registry.bind_writer(conn_id, w.id, meta).await { + debug!( + conn_id, + writer_id = w.id, + "ME writer disappeared before fallback bind commit, retrying" + ); + continue; + } if w.generation < self.current_generation() { self.stats.increment_pool_stale_pick_total(); }