chore: merge upstream/main (92972ab) into pr-sec-1

This commit is contained in:
David Osipov 2026-03-16 13:50:46 +04:00
commit f10ca192fa
No known key found for this signature in database
GPG Key ID: 0E55C4A47454E82E
25 changed files with 819 additions and 92 deletions

2
Cargo.lock generated
View File

@ -2093,7 +2093,7 @@ dependencies = [
[[package]] [[package]]
name = "telemt" name = "telemt"
version = "3.3.17" version = "3.3.18"
dependencies = [ dependencies = [
"aes", "aes",
"anyhow", "anyhow",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.3.17" version = "3.3.18"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -19,9 +19,9 @@
### 🇷🇺 RU ### 🇷🇺 RU
#### Релиз 3.3.16 #### Релиз 3.3.15 Semistable
[3.3.16](https://github.com/telemt/telemt/releases/tag/3.3.16)! [3.3.15](https://github.com/telemt/telemt/releases/tag/3.3.15) по итогам работы в продакшн признан одним из самых стабильных и рекомендуется к использованию, когда cutting-edge фичи некритичны!
Будем рады вашему фидбеку и предложениям по улучшению — особенно в части **API**, **статистики**, **UX** Будем рады вашему фидбеку и предложениям по улучшению — особенно в части **API**, **статистики**, **UX**
@ -40,9 +40,9 @@
### 🇬🇧 EN ### 🇬🇧 EN
#### Release 3.3.16 #### Release 3.3.15 Semistable
[3.3.16](https://github.com/telemt/telemt/releases/tag/3.3.16) [3.3.15](https://github.com/telemt/telemt/releases/tag/3.3.15) is, based on the results of his work in production, recognized as one of the most stable and recommended for use when cutting-edge features are not so necessary!
We are looking forward to your feedback and improvement proposals — especially regarding **API**, **statistics**, **UX** We are looking forward to your feedback and improvement proposals — especially regarding **API**, **statistics**, **UX**

View File

@ -55,7 +55,10 @@ user2 = "00000000000000000000000000000002"
user3 = "00000000000000000000000000000003" user3 = "00000000000000000000000000000003"
``` ```
4. Save the config. Ctrl+S -> Ctrl+X. You don't need to restart telemt. 4. Save the config. Ctrl+S -> Ctrl+X. You don't need to restart telemt.
5. Get the links via `journalctl -u telemt -n -g "links" --no-pager -o cat | tac` 5. Get the links via
```bash
curl -s http://127.0.0.1:9091/v1/users | jq
```
## How to view metrics ## How to view metrics

View File

@ -55,7 +55,10 @@ user2 = "00000000000000000000000000000002"
user3 = "00000000000000000000000000000003" user3 = "00000000000000000000000000000003"
``` ```
4. Сохранить конфиг. Ctrl+S -> Ctrl+X. Перезапускать telemt не нужно. 4. Сохранить конфиг. Ctrl+S -> Ctrl+X. Перезапускать telemt не нужно.
5. Получить ссылки через `journalctl -u telemt -n -g "links" --no-pager -o cat | tac` 5. Получить ссылки через
```bash
curl -s http://127.0.0.1:9091/v1/users | jq
```
## Как посмотреть метрики ## Как посмотреть метрики

View File

@ -236,6 +236,8 @@ pub(super) struct MeWritersSummary {
pub(super) required_writers: usize, pub(super) required_writers: usize,
pub(super) alive_writers: usize, pub(super) alive_writers: usize,
pub(super) coverage_pct: f64, pub(super) coverage_pct: f64,
pub(super) fresh_alive_writers: usize,
pub(super) fresh_coverage_pct: f64,
} }
#[derive(Serialize, Clone)] #[derive(Serialize, Clone)]
@ -250,6 +252,12 @@ pub(super) struct MeWriterStatus {
pub(super) bound_clients: usize, pub(super) bound_clients: usize,
pub(super) idle_for_secs: Option<u64>, pub(super) idle_for_secs: Option<u64>,
pub(super) rtt_ema_ms: Option<f64>, pub(super) rtt_ema_ms: Option<f64>,
pub(super) matches_active_generation: bool,
pub(super) in_desired_map: bool,
pub(super) allow_drain_fallback: bool,
pub(super) drain_started_at_epoch_secs: Option<u64>,
pub(super) drain_deadline_epoch_secs: Option<u64>,
pub(super) drain_over_ttl: bool,
} }
#[derive(Serialize, Clone)] #[derive(Serialize, Clone)]
@ -276,6 +284,8 @@ pub(super) struct DcStatus {
pub(super) floor_capped: bool, pub(super) floor_capped: bool,
pub(super) alive_writers: usize, pub(super) alive_writers: usize,
pub(super) coverage_pct: f64, pub(super) coverage_pct: f64,
pub(super) fresh_alive_writers: usize,
pub(super) fresh_coverage_pct: f64,
pub(super) rtt_ms: Option<f64>, pub(super) rtt_ms: Option<f64>,
pub(super) load: usize, pub(super) load: usize,
} }

View File

@ -314,6 +314,8 @@ async fn get_minimal_payload_cached(
required_writers: status.required_writers, required_writers: status.required_writers,
alive_writers: status.alive_writers, alive_writers: status.alive_writers,
coverage_pct: status.coverage_pct, coverage_pct: status.coverage_pct,
fresh_alive_writers: status.fresh_alive_writers,
fresh_coverage_pct: status.fresh_coverage_pct,
}, },
writers: status writers: status
.writers .writers
@ -329,6 +331,12 @@ async fn get_minimal_payload_cached(
bound_clients: entry.bound_clients, bound_clients: entry.bound_clients,
idle_for_secs: entry.idle_for_secs, idle_for_secs: entry.idle_for_secs,
rtt_ema_ms: entry.rtt_ema_ms, rtt_ema_ms: entry.rtt_ema_ms,
matches_active_generation: entry.matches_active_generation,
in_desired_map: entry.in_desired_map,
allow_drain_fallback: entry.allow_drain_fallback,
drain_started_at_epoch_secs: entry.drain_started_at_epoch_secs,
drain_deadline_epoch_secs: entry.drain_deadline_epoch_secs,
drain_over_ttl: entry.drain_over_ttl,
}) })
.collect(), .collect(),
}; };
@ -363,6 +371,8 @@ async fn get_minimal_payload_cached(
floor_capped: entry.floor_capped, floor_capped: entry.floor_capped,
alive_writers: entry.alive_writers, alive_writers: entry.alive_writers,
coverage_pct: entry.coverage_pct, coverage_pct: entry.coverage_pct,
fresh_alive_writers: entry.fresh_alive_writers,
fresh_coverage_pct: entry.fresh_coverage_pct,
rtt_ms: entry.rtt_ms, rtt_ms: entry.rtt_ms,
load: entry.load, load: entry.load,
}) })
@ -486,6 +496,8 @@ fn disabled_me_writers(now_epoch_secs: u64, reason: &'static str) -> MeWritersDa
required_writers: 0, required_writers: 0,
alive_writers: 0, alive_writers: 0,
coverage_pct: 0.0, coverage_pct: 0.0,
fresh_alive_writers: 0,
fresh_coverage_pct: 0.0,
}, },
writers: Vec::new(), writers: Vec::new(),
} }

View File

@ -90,6 +90,7 @@ pub(super) struct EffectiveMiddleProxyLimits {
#[derive(Serialize)] #[derive(Serialize)]
pub(super) struct EffectiveUserIpPolicyLimits { pub(super) struct EffectiveUserIpPolicyLimits {
pub(super) global_each: usize,
pub(super) mode: &'static str, pub(super) mode: &'static str,
pub(super) window_secs: u64, pub(super) window_secs: u64,
} }
@ -262,6 +263,7 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD
me2dc_fallback: cfg.general.me2dc_fallback, me2dc_fallback: cfg.general.me2dc_fallback,
}, },
user_ip_policy: EffectiveUserIpPolicyLimits { user_ip_policy: EffectiveUserIpPolicyLimits {
global_each: cfg.access.user_max_unique_ips_global_each,
mode: user_max_unique_ips_mode_label(cfg.access.user_max_unique_ips_mode), mode: user_max_unique_ips_mode_label(cfg.access.user_max_unique_ips_mode),
window_secs: cfg.access.user_max_unique_ips_window_secs, window_secs: cfg.access.user_max_unique_ips_window_secs,
}, },

View File

@ -386,7 +386,16 @@ pub(super) async fn users_from_config(
.get(&username) .get(&username)
.map(chrono::DateTime::<chrono::Utc>::to_rfc3339), .map(chrono::DateTime::<chrono::Utc>::to_rfc3339),
data_quota_bytes: cfg.access.user_data_quota.get(&username).copied(), data_quota_bytes: cfg.access.user_data_quota.get(&username).copied(),
max_unique_ips: cfg.access.user_max_unique_ips.get(&username).copied(), max_unique_ips: cfg
.access
.user_max_unique_ips
.get(&username)
.copied()
.filter(|limit| *limit > 0)
.or(
(cfg.access.user_max_unique_ips_global_each > 0)
.then_some(cfg.access.user_max_unique_ips_global_each),
),
current_connections: stats.get_user_curr_connects(&username), current_connections: stats.get_user_curr_connects(&username),
active_unique_ips: active_ip_list.len(), active_unique_ips: active_ip_list.len(),
active_unique_ips_list: active_ip_list, active_unique_ips_list: active_ip_list,

View File

@ -584,6 +584,10 @@ pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 {
90 90
} }
pub(crate) fn default_me_pool_drain_threshold() -> u64 {
128
}
pub(crate) fn default_me_bind_stale_ttl_secs() -> u64 { pub(crate) fn default_me_bind_stale_ttl_secs() -> u64 {
default_me_pool_drain_ttl_secs() default_me_pool_drain_ttl_secs()
} }
@ -635,6 +639,10 @@ pub(crate) fn default_user_max_unique_ips_window_secs() -> u64 {
DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS
} }
pub(crate) fn default_user_max_unique_ips_global_each() -> usize {
0
}
// Custom deserializer helpers // Custom deserializer helpers
#[derive(Deserialize)] #[derive(Deserialize)]

View File

@ -55,6 +55,7 @@ pub struct HotFields {
pub me_reinit_coalesce_window_ms: u64, pub me_reinit_coalesce_window_ms: u64,
pub hardswap: bool, pub hardswap: bool,
pub me_pool_drain_ttl_secs: u64, pub me_pool_drain_ttl_secs: u64,
pub me_pool_drain_threshold: u64,
pub me_pool_min_fresh_ratio: f32, pub me_pool_min_fresh_ratio: f32,
pub me_reinit_drain_timeout_secs: u64, pub me_reinit_drain_timeout_secs: u64,
pub me_hardswap_warmup_delay_min_ms: u64, pub me_hardswap_warmup_delay_min_ms: u64,
@ -118,6 +119,7 @@ pub struct HotFields {
pub user_expirations: std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>, pub user_expirations: std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
pub user_data_quota: std::collections::HashMap<String, u64>, pub user_data_quota: std::collections::HashMap<String, u64>,
pub user_max_unique_ips: std::collections::HashMap<String, usize>, pub user_max_unique_ips: std::collections::HashMap<String, usize>,
pub user_max_unique_ips_global_each: usize,
pub user_max_unique_ips_mode: crate::config::UserMaxUniqueIpsMode, pub user_max_unique_ips_mode: crate::config::UserMaxUniqueIpsMode,
pub user_max_unique_ips_window_secs: u64, pub user_max_unique_ips_window_secs: u64,
} }
@ -135,6 +137,7 @@ impl HotFields {
me_reinit_coalesce_window_ms: cfg.general.me_reinit_coalesce_window_ms, me_reinit_coalesce_window_ms: cfg.general.me_reinit_coalesce_window_ms,
hardswap: cfg.general.hardswap, hardswap: cfg.general.hardswap,
me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs, me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs,
me_pool_drain_threshold: cfg.general.me_pool_drain_threshold,
me_pool_min_fresh_ratio: cfg.general.me_pool_min_fresh_ratio, me_pool_min_fresh_ratio: cfg.general.me_pool_min_fresh_ratio,
me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs, me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs,
me_hardswap_warmup_delay_min_ms: cfg.general.me_hardswap_warmup_delay_min_ms, me_hardswap_warmup_delay_min_ms: cfg.general.me_hardswap_warmup_delay_min_ms,
@ -232,6 +235,7 @@ impl HotFields {
user_expirations: cfg.access.user_expirations.clone(), user_expirations: cfg.access.user_expirations.clone(),
user_data_quota: cfg.access.user_data_quota.clone(), user_data_quota: cfg.access.user_data_quota.clone(),
user_max_unique_ips: cfg.access.user_max_unique_ips.clone(), user_max_unique_ips: cfg.access.user_max_unique_ips.clone(),
user_max_unique_ips_global_each: cfg.access.user_max_unique_ips_global_each,
user_max_unique_ips_mode: cfg.access.user_max_unique_ips_mode, user_max_unique_ips_mode: cfg.access.user_max_unique_ips_mode,
user_max_unique_ips_window_secs: cfg.access.user_max_unique_ips_window_secs, user_max_unique_ips_window_secs: cfg.access.user_max_unique_ips_window_secs,
} }
@ -450,6 +454,7 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
cfg.general.me_reinit_coalesce_window_ms = new.general.me_reinit_coalesce_window_ms; cfg.general.me_reinit_coalesce_window_ms = new.general.me_reinit_coalesce_window_ms;
cfg.general.hardswap = new.general.hardswap; cfg.general.hardswap = new.general.hardswap;
cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs; cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs;
cfg.general.me_pool_drain_threshold = new.general.me_pool_drain_threshold;
cfg.general.me_pool_min_fresh_ratio = new.general.me_pool_min_fresh_ratio; cfg.general.me_pool_min_fresh_ratio = new.general.me_pool_min_fresh_ratio;
cfg.general.me_reinit_drain_timeout_secs = new.general.me_reinit_drain_timeout_secs; cfg.general.me_reinit_drain_timeout_secs = new.general.me_reinit_drain_timeout_secs;
cfg.general.me_hardswap_warmup_delay_min_ms = new.general.me_hardswap_warmup_delay_min_ms; cfg.general.me_hardswap_warmup_delay_min_ms = new.general.me_hardswap_warmup_delay_min_ms;
@ -532,6 +537,7 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
cfg.access.user_expirations = new.access.user_expirations.clone(); cfg.access.user_expirations = new.access.user_expirations.clone();
cfg.access.user_data_quota = new.access.user_data_quota.clone(); cfg.access.user_data_quota = new.access.user_data_quota.clone();
cfg.access.user_max_unique_ips = new.access.user_max_unique_ips.clone(); cfg.access.user_max_unique_ips = new.access.user_max_unique_ips.clone();
cfg.access.user_max_unique_ips_global_each = new.access.user_max_unique_ips_global_each;
cfg.access.user_max_unique_ips_mode = new.access.user_max_unique_ips_mode; cfg.access.user_max_unique_ips_mode = new.access.user_max_unique_ips_mode;
cfg.access.user_max_unique_ips_window_secs = new.access.user_max_unique_ips_window_secs; cfg.access.user_max_unique_ips_window_secs = new.access.user_max_unique_ips_window_secs;
@ -823,6 +829,13 @@ fn log_changes(
); );
} }
if old_hot.me_pool_drain_threshold != new_hot.me_pool_drain_threshold {
info!(
"config reload: me_pool_drain_threshold: {} → {}",
old_hot.me_pool_drain_threshold, new_hot.me_pool_drain_threshold,
);
}
if (old_hot.me_pool_min_fresh_ratio - new_hot.me_pool_min_fresh_ratio).abs() > f32::EPSILON { if (old_hot.me_pool_min_fresh_ratio - new_hot.me_pool_min_fresh_ratio).abs() > f32::EPSILON {
info!( info!(
"config reload: me_pool_min_fresh_ratio: {:.3} → {:.3}", "config reload: me_pool_min_fresh_ratio: {:.3} → {:.3}",
@ -1099,12 +1112,14 @@ fn log_changes(
new_hot.user_max_unique_ips.len() new_hot.user_max_unique_ips.len()
); );
} }
if old_hot.user_max_unique_ips_mode != new_hot.user_max_unique_ips_mode if old_hot.user_max_unique_ips_global_each != new_hot.user_max_unique_ips_global_each
|| old_hot.user_max_unique_ips_mode != new_hot.user_max_unique_ips_mode
|| old_hot.user_max_unique_ips_window_secs || old_hot.user_max_unique_ips_window_secs
!= new_hot.user_max_unique_ips_window_secs != new_hot.user_max_unique_ips_window_secs
{ {
info!( info!(
"config reload: user_max_unique_ips policy mode={:?} window={}s", "config reload: user_max_unique_ips policy global_each={} mode={:?} window={}s",
new_hot.user_max_unique_ips_global_each,
new_hot.user_max_unique_ips_mode, new_hot.user_max_unique_ips_mode,
new_hot.user_max_unique_ips_window_secs new_hot.user_max_unique_ips_window_secs
); );

View File

@ -3,6 +3,7 @@ use ipnetwork::IpNetwork;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::net::IpAddr; use std::net::IpAddr;
use std::path::PathBuf;
use super::defaults::*; use super::defaults::*;
@ -356,6 +357,9 @@ impl Default for NetworkConfig {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GeneralConfig { pub struct GeneralConfig {
#[serde(default)]
pub data_path: Option<PathBuf>,
#[serde(default)] #[serde(default)]
pub modes: ProxyModes, pub modes: ProxyModes,
@ -794,6 +798,11 @@ pub struct GeneralConfig {
#[serde(default = "default_me_pool_drain_ttl_secs")] #[serde(default = "default_me_pool_drain_ttl_secs")]
pub me_pool_drain_ttl_secs: u64, pub me_pool_drain_ttl_secs: u64,
/// Maximum allowed number of draining ME writers before oldest ones are force-closed in batches.
/// Set to 0 to disable threshold-based draining cleanup and keep timeout-only behavior.
#[serde(default = "default_me_pool_drain_threshold")]
pub me_pool_drain_threshold: u64,
/// Policy for new binds on stale draining writers. /// Policy for new binds on stale draining writers.
#[serde(default)] #[serde(default)]
pub me_bind_stale_mode: MeBindStaleMode, pub me_bind_stale_mode: MeBindStaleMode,
@ -866,6 +875,7 @@ pub struct GeneralConfig {
impl Default for GeneralConfig { impl Default for GeneralConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
data_path: None,
modes: ProxyModes::default(), modes: ProxyModes::default(),
prefer_ipv6: false, prefer_ipv6: false,
fast_mode: default_true(), fast_mode: default_true(),
@ -973,6 +983,7 @@ impl Default for GeneralConfig {
me_secret_atomic_snapshot: default_me_secret_atomic_snapshot(), me_secret_atomic_snapshot: default_me_secret_atomic_snapshot(),
proxy_secret_len_max: default_proxy_secret_len_max(), proxy_secret_len_max: default_proxy_secret_len_max(),
me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(), me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(),
me_pool_drain_threshold: default_me_pool_drain_threshold(),
me_bind_stale_mode: MeBindStaleMode::default(), me_bind_stale_mode: MeBindStaleMode::default(),
me_bind_stale_ttl_secs: default_me_bind_stale_ttl_secs(), me_bind_stale_ttl_secs: default_me_bind_stale_ttl_secs(),
me_pool_min_fresh_ratio: default_me_pool_min_fresh_ratio(), me_pool_min_fresh_ratio: default_me_pool_min_fresh_ratio(),
@ -1317,6 +1328,11 @@ pub struct AccessConfig {
#[serde(default)] #[serde(default)]
pub user_max_unique_ips: HashMap<String, usize>, pub user_max_unique_ips: HashMap<String, usize>,
/// Global per-user unique IP limit applied when a user has no individual override.
/// `0` disables the inherited limit.
#[serde(default = "default_user_max_unique_ips_global_each")]
pub user_max_unique_ips_global_each: usize,
#[serde(default)] #[serde(default)]
pub user_max_unique_ips_mode: UserMaxUniqueIpsMode, pub user_max_unique_ips_mode: UserMaxUniqueIpsMode,
@ -1342,6 +1358,7 @@ impl Default for AccessConfig {
user_expirations: HashMap::new(), user_expirations: HashMap::new(),
user_data_quota: HashMap::new(), user_data_quota: HashMap::new(),
user_max_unique_ips: HashMap::new(), user_max_unique_ips: HashMap::new(),
user_max_unique_ips_global_each: default_user_max_unique_ips_global_each(),
user_max_unique_ips_mode: UserMaxUniqueIpsMode::default(), user_max_unique_ips_mode: UserMaxUniqueIpsMode::default(),
user_max_unique_ips_window_secs: default_user_max_unique_ips_window_secs(), user_max_unique_ips_window_secs: default_user_max_unique_ips_window_secs(),
replay_check_len: default_replay_check_len(), replay_check_len: default_replay_check_len(),

View File

@ -17,6 +17,7 @@ pub struct UserIpTracker {
active_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, usize>>>>, active_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, usize>>>>,
recent_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, Instant>>>>, recent_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, Instant>>>>,
max_ips: Arc<RwLock<HashMap<String, usize>>>, max_ips: Arc<RwLock<HashMap<String, usize>>>,
default_max_ips: Arc<RwLock<usize>>,
limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>, limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>,
limit_window: Arc<RwLock<Duration>>, limit_window: Arc<RwLock<Duration>>,
last_compact_epoch_secs: Arc<AtomicU64>, last_compact_epoch_secs: Arc<AtomicU64>,
@ -28,6 +29,7 @@ impl UserIpTracker {
active_ips: Arc::new(RwLock::new(HashMap::new())), active_ips: Arc::new(RwLock::new(HashMap::new())),
recent_ips: Arc::new(RwLock::new(HashMap::new())), recent_ips: Arc::new(RwLock::new(HashMap::new())),
max_ips: Arc::new(RwLock::new(HashMap::new())), max_ips: Arc::new(RwLock::new(HashMap::new())),
default_max_ips: Arc::new(RwLock::new(0)),
limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)), limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)),
limit_window: Arc::new(RwLock::new(Duration::from_secs(30))), limit_window: Arc::new(RwLock::new(Duration::from_secs(30))),
last_compact_epoch_secs: Arc::new(AtomicU64::new(0)), last_compact_epoch_secs: Arc::new(AtomicU64::new(0)),
@ -100,7 +102,10 @@ impl UserIpTracker {
limits.remove(username); limits.remove(username);
} }
pub async fn load_limits(&self, limits: &HashMap<String, usize>) { pub async fn load_limits(&self, default_limit: usize, limits: &HashMap<String, usize>) {
let mut default_max_ips = self.default_max_ips.write().await;
*default_max_ips = default_limit;
drop(default_max_ips);
let mut max_ips = self.max_ips.write().await; let mut max_ips = self.max_ips.write().await;
max_ips.clone_from(limits); max_ips.clone_from(limits);
} }
@ -114,9 +119,14 @@ impl UserIpTracker {
pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> { pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> {
self.maybe_compact_empty_users().await; self.maybe_compact_empty_users().await;
let default_max_ips = *self.default_max_ips.read().await;
let limit = { let limit = {
let max_ips = self.max_ips.read().await; let max_ips = self.max_ips.read().await;
max_ips.get(username).copied() max_ips
.get(username)
.copied()
.filter(|limit| *limit > 0)
.or((default_max_ips > 0).then_some(default_max_ips))
}; };
let mode = *self.limit_mode.read().await; let mode = *self.limit_mode.read().await;
let window = *self.limit_window.read().await; let window = *self.limit_window.read().await;
@ -255,10 +265,16 @@ impl UserIpTracker {
pub async fn get_stats(&self) -> Vec<(String, usize, usize)> { pub async fn get_stats(&self) -> Vec<(String, usize, usize)> {
let active_ips = self.active_ips.read().await; let active_ips = self.active_ips.read().await;
let max_ips = self.max_ips.read().await; let max_ips = self.max_ips.read().await;
let default_max_ips = *self.default_max_ips.read().await;
let mut stats = Vec::new(); let mut stats = Vec::new();
for (username, user_ips) in active_ips.iter() { for (username, user_ips) in active_ips.iter() {
let limit = max_ips.get(username).copied().unwrap_or(0); let limit = max_ips
.get(username)
.copied()
.filter(|limit| *limit > 0)
.or((default_max_ips > 0).then_some(default_max_ips))
.unwrap_or(0);
stats.push((username.clone(), user_ips.len(), limit)); stats.push((username.clone(), user_ips.len(), limit));
} }
@ -293,8 +309,13 @@ impl UserIpTracker {
} }
pub async fn get_user_limit(&self, username: &str) -> Option<usize> { pub async fn get_user_limit(&self, username: &str) -> Option<usize> {
let default_max_ips = *self.default_max_ips.read().await;
let max_ips = self.max_ips.read().await; let max_ips = self.max_ips.read().await;
max_ips.get(username).copied() max_ips
.get(username)
.copied()
.filter(|limit| *limit > 0)
.or((default_max_ips > 0).then_some(default_max_ips))
} }
pub async fn format_stats(&self) -> String { pub async fn format_stats(&self) -> String {
@ -546,7 +567,7 @@ mod tests {
config_limits.insert("user1".to_string(), 5); config_limits.insert("user1".to_string(), 5);
config_limits.insert("user2".to_string(), 3); config_limits.insert("user2".to_string(), 3);
tracker.load_limits(&config_limits).await; tracker.load_limits(0, &config_limits).await;
assert_eq!(tracker.get_user_limit("user1").await, Some(5)); assert_eq!(tracker.get_user_limit("user1").await, Some(5));
assert_eq!(tracker.get_user_limit("user2").await, Some(3)); assert_eq!(tracker.get_user_limit("user2").await, Some(3));
@ -560,16 +581,46 @@ mod tests {
let mut first = HashMap::new(); let mut first = HashMap::new();
first.insert("user1".to_string(), 2); first.insert("user1".to_string(), 2);
first.insert("user2".to_string(), 3); first.insert("user2".to_string(), 3);
tracker.load_limits(&first).await; tracker.load_limits(0, &first).await;
let mut second = HashMap::new(); let mut second = HashMap::new();
second.insert("user2".to_string(), 5); second.insert("user2".to_string(), 5);
tracker.load_limits(&second).await; tracker.load_limits(0, &second).await;
assert_eq!(tracker.get_user_limit("user1").await, None); assert_eq!(tracker.get_user_limit("user1").await, None);
assert_eq!(tracker.get_user_limit("user2").await, Some(5)); assert_eq!(tracker.get_user_limit("user2").await, Some(5));
} }
#[tokio::test]
async fn test_global_each_limit_applies_without_user_override() {
let tracker = UserIpTracker::new();
tracker.load_limits(2, &HashMap::new()).await;
let ip1 = test_ipv4(172, 16, 0, 1);
let ip2 = test_ipv4(172, 16, 0, 2);
let ip3 = test_ipv4(172, 16, 0, 3);
assert!(tracker.check_and_add("test_user", ip1).await.is_ok());
assert!(tracker.check_and_add("test_user", ip2).await.is_ok());
assert!(tracker.check_and_add("test_user", ip3).await.is_err());
assert_eq!(tracker.get_user_limit("test_user").await, Some(2));
}
#[tokio::test]
async fn test_user_override_wins_over_global_each_limit() {
let tracker = UserIpTracker::new();
let mut limits = HashMap::new();
limits.insert("test_user".to_string(), 1);
tracker.load_limits(3, &limits).await;
let ip1 = test_ipv4(172, 17, 0, 1);
let ip2 = test_ipv4(172, 17, 0, 2);
assert!(tracker.check_and_add("test_user", ip1).await.is_ok());
assert!(tracker.check_and_add("test_user", ip2).await.is_err());
assert_eq!(tracker.get_user_limit("test_user").await, Some(1));
}
#[tokio::test] #[tokio::test]
async fn test_time_window_mode_blocks_recent_ip_churn() { async fn test_time_window_mode_blocks_recent_ip_churn() {
let tracker = UserIpTracker::new(); let tracker = UserIpTracker::new();

View File

@ -1,4 +1,5 @@
use std::time::Duration; use std::time::Duration;
use std::path::PathBuf;
use tokio::sync::watch; use tokio::sync::watch;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
@ -9,8 +10,9 @@ use crate::transport::middle_proxy::{
ProxyConfigData, fetch_proxy_config_with_raw, load_proxy_config_cache, save_proxy_config_cache, ProxyConfigData, fetch_proxy_config_with_raw, load_proxy_config_cache, save_proxy_config_cache,
}; };
pub(crate) fn parse_cli() -> (String, bool, Option<String>) { pub(crate) fn parse_cli() -> (String, Option<PathBuf>, bool, Option<String>) {
let mut config_path = "config.toml".to_string(); let mut config_path = "config.toml".to_string();
let mut data_path: Option<PathBuf> = None;
let mut silent = false; let mut silent = false;
let mut log_level: Option<String> = None; let mut log_level: Option<String> = None;
@ -28,6 +30,18 @@ pub(crate) fn parse_cli() -> (String, bool, Option<String>) {
let mut i = 0; let mut i = 0;
while i < args.len() { while i < args.len() {
match args[i].as_str() { match args[i].as_str() {
"--data-path" => {
i += 1;
if i < args.len() {
data_path = Some(PathBuf::from(args[i].clone()));
} else {
eprintln!("Missing value for --data-path");
std::process::exit(0);
}
}
s if s.starts_with("--data-path=") => {
data_path = Some(PathBuf::from(s.trim_start_matches("--data-path=").to_string()));
}
"--silent" | "-s" => { "--silent" | "-s" => {
silent = true; silent = true;
} }
@ -44,6 +58,7 @@ pub(crate) fn parse_cli() -> (String, bool, Option<String>) {
eprintln!("Usage: telemt [config.toml] [OPTIONS]"); eprintln!("Usage: telemt [config.toml] [OPTIONS]");
eprintln!(); eprintln!();
eprintln!("Options:"); eprintln!("Options:");
eprintln!(" --data-path <DIR> Set data directory (absolute path; overrides config value)");
eprintln!(" --silent, -s Suppress info logs"); eprintln!(" --silent, -s Suppress info logs");
eprintln!(" --log-level <LEVEL> debug|verbose|normal|silent"); eprintln!(" --log-level <LEVEL> debug|verbose|normal|silent");
eprintln!(" --help, -h Show this help"); eprintln!(" --help, -h Show this help");
@ -78,7 +93,7 @@ pub(crate) fn parse_cli() -> (String, bool, Option<String>) {
i += 1; i += 1;
} }
(config_path, silent, log_level) (config_path, data_path, silent, log_level)
} }
pub(crate) fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { pub(crate) fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {

View File

@ -237,6 +237,7 @@ pub(crate) async fn initialize_me_pool(
config.general.me_adaptive_floor_max_warm_writers_global, config.general.me_adaptive_floor_max_warm_writers_global,
config.general.hardswap, config.general.hardswap,
config.general.me_pool_drain_ttl_secs, config.general.me_pool_drain_ttl_secs,
config.general.me_pool_drain_threshold,
config.general.effective_me_pool_force_close_secs(), config.general.effective_me_pool_force_close_secs(),
config.general.me_pool_min_fresh_ratio, config.general.me_pool_min_fresh_ratio,
config.general.me_hardswap_warmup_delay_min_ms, config.general.me_hardswap_warmup_delay_min_ms,

View File

@ -58,7 +58,7 @@ pub async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
startup_tracker startup_tracker
.start_component(COMPONENT_CONFIG_LOAD, Some("load and validate config".to_string())) .start_component(COMPONENT_CONFIG_LOAD, Some("load and validate config".to_string()))
.await; .await;
let (config_path, cli_silent, cli_log_level) = parse_cli(); let (config_path, data_path, cli_silent, cli_log_level) = parse_cli();
let mut config = match ProxyConfig::load(&config_path) { let mut config = match ProxyConfig::load(&config_path) {
Ok(c) => c, Ok(c) => c,
@ -80,6 +80,34 @@ pub async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
std::process::exit(1); std::process::exit(1);
} }
if let Some(p) = data_path {
config.general.data_path = Some(p);
}
if let Some(ref data_path) = config.general.data_path {
if !data_path.is_absolute() {
eprintln!("[telemt] data_path must be absolute: {}", data_path.display());
std::process::exit(1);
}
if data_path.exists() {
if !data_path.is_dir() {
eprintln!("[telemt] data_path exists but is not a directory: {}", data_path.display());
std::process::exit(1);
}
} else {
if let Err(e) = std::fs::create_dir_all(data_path) {
eprintln!("[telemt] Can't create data_path {}: {}", data_path.display(), e);
std::process::exit(1);
}
}
if let Err(e) = std::env::set_current_dir(data_path) {
eprintln!("[telemt] Can't use data_path {}: {}", data_path.display(), e);
std::process::exit(1);
}
}
if let Err(e) = crate::network::dns_overrides::install_entries(&config.network.dns_overrides) { if let Err(e) = crate::network::dns_overrides::install_entries(&config.network.dns_overrides) {
eprintln!("[telemt] Invalid network.dns_overrides: {}", e); eprintln!("[telemt] Invalid network.dns_overrides: {}", e);
std::process::exit(1); std::process::exit(1);
@ -168,17 +196,24 @@ pub async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
stats.clone(), stats.clone(),
)); ));
let ip_tracker = Arc::new(UserIpTracker::new()); let ip_tracker = Arc::new(UserIpTracker::new());
ip_tracker.load_limits(&config.access.user_max_unique_ips).await; ip_tracker
.load_limits(
config.access.user_max_unique_ips_global_each,
&config.access.user_max_unique_ips,
)
.await;
ip_tracker ip_tracker
.set_limit_policy( .set_limit_policy(
config.access.user_max_unique_ips_mode, config.access.user_max_unique_ips_mode,
config.access.user_max_unique_ips_window_secs, config.access.user_max_unique_ips_window_secs,
) )
.await; .await;
if !config.access.user_max_unique_ips.is_empty() { if config.access.user_max_unique_ips_global_each > 0 || !config.access.user_max_unique_ips.is_empty()
{
info!( info!(
"IP limits configured for {} users", global_each_limit = config.access.user_max_unique_ips_global_each,
config.access.user_max_unique_ips.len() explicit_user_limits = config.access.user_max_unique_ips.len(),
"User unique IP limits configured"
); );
} }
if !config.network.dns_overrides.is_empty() { if !config.network.dns_overrides.is_empty() {

View File

@ -131,6 +131,10 @@ pub(crate) async fn spawn_runtime_tasks(
let mut config_rx_ip_limits = config_rx.clone(); let mut config_rx_ip_limits = config_rx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut prev_limits = config_rx_ip_limits.borrow().access.user_max_unique_ips.clone(); let mut prev_limits = config_rx_ip_limits.borrow().access.user_max_unique_ips.clone();
let mut prev_global_each = config_rx_ip_limits
.borrow()
.access
.user_max_unique_ips_global_each;
let mut prev_mode = config_rx_ip_limits.borrow().access.user_max_unique_ips_mode; let mut prev_mode = config_rx_ip_limits.borrow().access.user_max_unique_ips_mode;
let mut prev_window = config_rx_ip_limits let mut prev_window = config_rx_ip_limits
.borrow() .borrow()
@ -143,9 +147,17 @@ pub(crate) async fn spawn_runtime_tasks(
} }
let cfg = config_rx_ip_limits.borrow_and_update().clone(); let cfg = config_rx_ip_limits.borrow_and_update().clone();
if prev_limits != cfg.access.user_max_unique_ips { if prev_limits != cfg.access.user_max_unique_ips
ip_tracker_policy.load_limits(&cfg.access.user_max_unique_ips).await; || prev_global_each != cfg.access.user_max_unique_ips_global_each
{
ip_tracker_policy
.load_limits(
cfg.access.user_max_unique_ips_global_each,
&cfg.access.user_max_unique_ips,
)
.await;
prev_limits = cfg.access.user_max_unique_ips.clone(); prev_limits = cfg.access.user_max_unique_ips.clone();
prev_global_each = cfg.access.user_max_unique_ips_global_each;
} }
if prev_mode != cfg.access.user_max_unique_ips_mode if prev_mode != cfg.access.user_max_unique_ips_mode

View File

@ -1774,14 +1774,24 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
"# HELP telemt_user_unique_ips_recent_window Per-user unique IPs seen in configured observation window" "# HELP telemt_user_unique_ips_recent_window Per-user unique IPs seen in configured observation window"
); );
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_recent_window gauge"); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_recent_window gauge");
let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)"); let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Effective per-user unique IP limit (0 means unlimited)");
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge"); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge");
let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)"); let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)");
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge"); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge");
for user in unique_users { for user in unique_users {
let current = ip_counts.get(&user).copied().unwrap_or(0); let current = ip_counts.get(&user).copied().unwrap_or(0);
let limit = config.access.user_max_unique_ips.get(&user).copied().unwrap_or(0); let limit = config
.access
.user_max_unique_ips
.get(&user)
.copied()
.filter(|limit| *limit > 0)
.or(
(config.access.user_max_unique_ips_global_each > 0)
.then_some(config.access.user_max_unique_ips_global_each),
)
.unwrap_or(0);
let utilization = if limit > 0 { let utilization = if limit > 0 {
current as f64 / limit as f64 current as f64 / limit as f64
} else { } else {
@ -1904,6 +1914,25 @@ mod tests {
assert!(output.contains("telemt_user_unique_ips_recent_window{user=")); assert!(output.contains("telemt_user_unique_ips_recent_window{user="));
} }
#[tokio::test]
async fn test_render_uses_global_each_unique_ip_limit() {
let stats = Stats::new();
stats.increment_user_connects("alice");
stats.increment_user_curr_connects("alice");
let tracker = UserIpTracker::new();
tracker
.check_and_add("alice", "203.0.113.10".parse().unwrap())
.await
.unwrap();
let mut config = ProxyConfig::default();
config.access.user_max_unique_ips_global_each = 2;
let output = render_metrics(&stats, &config, &tracker).await;
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 2"));
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.500000"));
}
#[tokio::test] #[tokio::test]
async fn test_render_has_type_annotations() { async fn test_render_has_type_annotations() {
let stats = Stats::new(); let stats = Stats::new();

View File

@ -298,6 +298,7 @@ async fn run_update_cycle(
pool.update_runtime_reinit_policy( pool.update_runtime_reinit_policy(
cfg.general.hardswap, cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs, cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_pool_drain_threshold,
cfg.general.effective_me_pool_force_close_secs(), cfg.general.effective_me_pool_force_close_secs(),
cfg.general.me_pool_min_fresh_ratio, cfg.general.me_pool_min_fresh_ratio,
cfg.general.me_hardswap_warmup_delay_min_ms, cfg.general.me_hardswap_warmup_delay_min_ms,
@ -524,6 +525,7 @@ pub async fn me_config_updater(
pool.update_runtime_reinit_policy( pool.update_runtime_reinit_policy(
cfg.general.hardswap, cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs, cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_pool_drain_threshold,
cfg.general.effective_me_pool_force_close_secs(), cfg.general.effective_me_pool_force_close_secs(),
cfg.general.me_pool_min_fresh_ratio, cfg.general.me_pool_min_fresh_ratio,
cfg.general.me_hardswap_warmup_delay_min_ms, cfg.general.me_hardswap_warmup_delay_min_ms,

View File

@ -62,6 +62,7 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new();
let mut degraded_interval = true; let mut degraded_interval = true;
loop { loop {
let interval = if degraded_interval { let interval = if degraded_interval {
@ -71,7 +72,7 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
}; };
tokio::time::sleep(interval).await; tokio::time::sleep(interval).await;
pool.prune_closed_writers().await; pool.prune_closed_writers().await;
reap_draining_writers(&pool).await; reap_draining_writers(&pool, &mut drain_warn_next_allowed).await;
let v4_degraded = check_family( let v4_degraded = check_family(
IpFamily::V4, IpFamily::V4,
&pool, &pool,
@ -110,17 +111,81 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
} }
} }
async fn reap_draining_writers(pool: &Arc<MePool>) { async fn reap_draining_writers(
pool: &Arc<MePool>,
warn_next_allowed: &mut HashMap<u64, Instant>,
) {
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
let now = Instant::now();
let drain_ttl_secs = pool.me_pool_drain_ttl_secs.load(std::sync::atomic::Ordering::Relaxed);
let drain_threshold = pool
.me_pool_drain_threshold
.load(std::sync::atomic::Ordering::Relaxed);
let writers = pool.writers.read().await.clone(); let writers = pool.writers.read().await.clone();
let mut draining_writers = Vec::new();
for writer in writers { for writer in writers {
if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) { if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) {
continue; continue;
} }
if pool.registry.is_writer_empty(writer.id).await { let is_empty = pool.registry.is_writer_empty(writer.id).await;
if is_empty {
pool.remove_writer_and_close_clients(writer.id).await; pool.remove_writer_and_close_clients(writer.id).await;
continue; continue;
} }
draining_writers.push(writer);
}
if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize {
draining_writers.sort_by(|left, right| {
let left_started = left
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
let right_started = right
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
left_started
.cmp(&right_started)
.then_with(|| left.created_at.cmp(&right.created_at))
.then_with(|| left.id.cmp(&right.id))
});
let overflow = draining_writers.len().saturating_sub(drain_threshold as usize);
warn!(
draining_writers = draining_writers.len(),
me_pool_drain_threshold = drain_threshold,
removing_writers = overflow,
"ME draining writer threshold exceeded, force-closing oldest draining writers"
);
for writer in draining_writers.drain(..overflow) {
pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer.id).await;
}
}
for writer in draining_writers {
let drain_started_at_epoch_secs = writer
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if drain_ttl_secs > 0
&& drain_started_at_epoch_secs != 0
&& now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > drain_ttl_secs
&& should_emit_writer_warn(
warn_next_allowed,
writer.id,
now,
pool.warn_rate_limit_duration(),
)
{
warn!(
writer_id = writer.id,
writer_dc = writer.writer_dc,
endpoint = %writer.addr,
generation = writer.generation,
drain_ttl_secs,
force_close_secs = pool.me_pool_force_close_secs.load(std::sync::atomic::Ordering::Relaxed),
allow_drain_fallback = writer.allow_drain_fallback.load(std::sync::atomic::Ordering::Relaxed),
"ME draining writer remains non-empty past drain TTL"
);
}
let deadline_epoch_secs = writer let deadline_epoch_secs = writer
.drain_deadline_epoch_secs .drain_deadline_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed); .load(std::sync::atomic::Ordering::Relaxed);
@ -132,6 +197,23 @@ async fn reap_draining_writers(pool: &Arc<MePool>) {
} }
} }
fn should_emit_writer_warn(
next_allowed: &mut HashMap<u64, Instant>,
writer_id: u64,
now: Instant,
cooldown: Duration,
) -> bool {
let Some(ready_at) = next_allowed.get(&writer_id).copied() else {
next_allowed.insert(writer_id, now + cooldown);
return true;
};
if now >= ready_at {
next_allowed.insert(writer_id, now + cooldown);
return true;
}
false
}
async fn check_family( async fn check_family(
family: IpFamily, family: IpFamily,
pool: &Arc<MePool>, pool: &Arc<MePool>,
@ -1222,3 +1304,190 @@ async fn maybe_rotate_single_endpoint_shadow(
"Single-endpoint shadow writer rotated" "Single-endpoint shadow writer rotated"
); );
} }
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use super::reap_draining_writers;
use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode};
use crate::crypto::SecureRandom;
use crate::network::probe::NetworkDecision;
use crate::stats::Stats;
use crate::transport::middle_proxy::codec::WriterCommand;
use crate::transport::middle_proxy::pool::{MePool, MeWriter, WriterContour};
use crate::transport::middle_proxy::registry::ConnMeta;
async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
let general = GeneralConfig {
me_pool_drain_threshold,
..GeneralConfig::default()
};
MePool::new(
None,
vec![1u8; 32],
None,
false,
None,
Vec::new(),
1,
None,
12,
1200,
HashMap::new(),
HashMap::new(),
None,
NetworkDecision::default(),
None,
Arc::new(SecureRandom::new()),
Arc::new(Stats::default()),
general.me_keepalive_enabled,
general.me_keepalive_interval_secs,
general.me_keepalive_jitter_secs,
general.me_keepalive_payload_random,
general.rpc_proxy_req_every,
general.me_warmup_stagger_enabled,
general.me_warmup_step_delay_ms,
general.me_warmup_step_jitter_ms,
general.me_reconnect_max_concurrent_per_dc,
general.me_reconnect_backoff_base_ms,
general.me_reconnect_backoff_cap_ms,
general.me_reconnect_fast_retry_count,
general.me_single_endpoint_shadow_writers,
general.me_single_endpoint_outage_mode_enabled,
general.me_single_endpoint_outage_disable_quarantine,
general.me_single_endpoint_outage_backoff_min_ms,
general.me_single_endpoint_outage_backoff_max_ms,
general.me_single_endpoint_shadow_rotate_every_secs,
general.me_floor_mode,
general.me_adaptive_floor_idle_secs,
general.me_adaptive_floor_min_writers_single_endpoint,
general.me_adaptive_floor_min_writers_multi_endpoint,
general.me_adaptive_floor_recover_grace_secs,
general.me_adaptive_floor_writers_per_core_total,
general.me_adaptive_floor_cpu_cores_override,
general.me_adaptive_floor_max_extra_writers_single_per_core,
general.me_adaptive_floor_max_extra_writers_multi_per_core,
general.me_adaptive_floor_max_active_writers_per_core,
general.me_adaptive_floor_max_warm_writers_per_core,
general.me_adaptive_floor_max_active_writers_global,
general.me_adaptive_floor_max_warm_writers_global,
general.hardswap,
general.me_pool_drain_ttl_secs,
general.me_pool_drain_threshold,
general.effective_me_pool_force_close_secs(),
general.me_pool_min_fresh_ratio,
general.me_hardswap_warmup_delay_min_ms,
general.me_hardswap_warmup_delay_max_ms,
general.me_hardswap_warmup_extra_passes,
general.me_hardswap_warmup_pass_backoff_base_ms,
general.me_bind_stale_mode,
general.me_bind_stale_ttl_secs,
general.me_secret_atomic_snapshot,
general.me_deterministic_writer_sort,
MeWriterPickMode::default(),
general.me_writer_pick_sample_size,
MeSocksKdfPolicy::default(),
general.me_writer_cmd_channel_capacity,
general.me_route_channel_capacity,
general.me_route_backpressure_base_timeout_ms,
general.me_route_backpressure_high_timeout_ms,
general.me_route_backpressure_high_watermark_pct,
general.me_reader_route_data_wait_ms,
general.me_health_interval_ms_unhealthy,
general.me_health_interval_ms_healthy,
general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms,
general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms,
)
}
async fn insert_draining_writer(
pool: &Arc<MePool>,
writer_id: u64,
drain_started_at_epoch_secs: u64,
) -> u64 {
let (conn_id, _rx) = pool.registry.register().await;
let (tx, _writer_rx) = mpsc::channel::<WriterCommand>(8);
let writer = MeWriter {
id: writer_id,
addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000 + writer_id as u16),
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
writer_dc: 2,
generation: 1,
contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())),
created_at: Instant::now() - Duration::from_secs(writer_id),
tx: tx.clone(),
cancel: CancellationToken::new(),
degraded: Arc::new(AtomicBool::new(false)),
rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)),
draining: Arc::new(AtomicBool::new(true)),
draining_started_at_epoch_secs: Arc::new(AtomicU64::new(drain_started_at_epoch_secs)),
drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)),
allow_drain_fallback: Arc::new(AtomicBool::new(false)),
};
pool.writers.write().await.push(writer);
pool.registry.register_writer(writer_id, tx).await;
pool.conn_count.fetch_add(1, Ordering::Relaxed);
assert!(
pool.registry
.bind_writer(
conn_id,
writer_id,
ConnMeta {
target_dc: 2,
client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6000),
our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443),
proto_flags: 0,
},
)
.await
);
conn_id
}
#[tokio::test]
async fn reap_draining_writers_force_closes_oldest_over_threshold() {
let pool = make_pool(2).await;
let now_epoch_secs = MePool::now_epoch_secs();
let conn_a = insert_draining_writer(&pool, 10, now_epoch_secs.saturating_sub(30)).await;
let conn_b = insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(20)).await;
let conn_c = insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(10)).await;
let mut warn_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
let writer_ids: Vec<u64> = pool.writers.read().await.iter().map(|writer| writer.id).collect();
assert_eq!(writer_ids, vec![20, 30]);
assert!(pool.registry.get_writer(conn_a).await.is_none());
assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20);
assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30);
}
#[tokio::test]
async fn reap_draining_writers_keeps_timeout_only_behavior_when_threshold_disabled() {
let pool = make_pool(0).await;
let now_epoch_secs = MePool::now_epoch_secs();
let conn_a = insert_draining_writer(&pool, 10, now_epoch_secs.saturating_sub(30)).await;
let conn_b = insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(20)).await;
let conn_c = insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(10)).await;
let mut warn_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
let writer_ids: Vec<u64> = pool.writers.read().await.iter().map(|writer| writer.id).collect();
assert_eq!(writer_ids, vec![10, 20, 30]);
assert_eq!(pool.registry.get_writer(conn_a).await.unwrap().writer_id, 10);
assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20);
assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30);
}
}

View File

@ -171,6 +171,7 @@ pub struct MePool {
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>, pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>, pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>,
pub(super) me_pool_drain_ttl_secs: AtomicU64, pub(super) me_pool_drain_ttl_secs: AtomicU64,
pub(super) me_pool_drain_threshold: AtomicU64,
pub(super) me_pool_force_close_secs: AtomicU64, pub(super) me_pool_force_close_secs: AtomicU64,
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32, pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,
pub(super) me_hardswap_warmup_delay_min_ms: AtomicU64, pub(super) me_hardswap_warmup_delay_min_ms: AtomicU64,
@ -271,6 +272,7 @@ impl MePool {
me_adaptive_floor_max_warm_writers_global: u32, me_adaptive_floor_max_warm_writers_global: u32,
hardswap: bool, hardswap: bool,
me_pool_drain_ttl_secs: u64, me_pool_drain_ttl_secs: u64,
me_pool_drain_threshold: u64,
me_pool_force_close_secs: u64, me_pool_force_close_secs: u64,
me_pool_min_fresh_ratio: f32, me_pool_min_fresh_ratio: f32,
me_hardswap_warmup_delay_min_ms: u64, me_hardswap_warmup_delay_min_ms: u64,
@ -446,6 +448,7 @@ impl MePool {
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())), kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())),
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs), me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold),
me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs), me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs),
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille( me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(
me_pool_min_fresh_ratio, me_pool_min_fresh_ratio,
@ -492,6 +495,7 @@ impl MePool {
&self, &self,
hardswap: bool, hardswap: bool,
drain_ttl_secs: u64, drain_ttl_secs: u64,
pool_drain_threshold: u64,
force_close_secs: u64, force_close_secs: u64,
min_fresh_ratio: f32, min_fresh_ratio: f32,
hardswap_warmup_delay_min_ms: u64, hardswap_warmup_delay_min_ms: u64,
@ -530,6 +534,8 @@ impl MePool {
self.hardswap.store(hardswap, Ordering::Relaxed); self.hardswap.store(hardswap, Ordering::Relaxed);
self.me_pool_drain_ttl_secs self.me_pool_drain_ttl_secs
.store(drain_ttl_secs, Ordering::Relaxed); .store(drain_ttl_secs, Ordering::Relaxed);
self.me_pool_drain_threshold
.store(pool_drain_threshold, Ordering::Relaxed);
self.me_pool_force_close_secs self.me_pool_force_close_secs
.store(force_close_secs, Ordering::Relaxed); .store(force_close_secs, Ordering::Relaxed);
self.me_pool_min_fresh_ratio_permille self.me_pool_min_fresh_ratio_permille

View File

@ -19,6 +19,12 @@ pub(crate) struct MeApiWriterStatusSnapshot {
pub bound_clients: usize, pub bound_clients: usize,
pub idle_for_secs: Option<u64>, pub idle_for_secs: Option<u64>,
pub rtt_ema_ms: Option<f64>, pub rtt_ema_ms: Option<f64>,
pub matches_active_generation: bool,
pub in_desired_map: bool,
pub allow_drain_fallback: bool,
pub drain_started_at_epoch_secs: Option<u64>,
pub drain_deadline_epoch_secs: Option<u64>,
pub drain_over_ttl: bool,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -35,6 +41,8 @@ pub(crate) struct MeApiDcStatusSnapshot {
pub floor_capped: bool, pub floor_capped: bool,
pub alive_writers: usize, pub alive_writers: usize,
pub coverage_pct: f64, pub coverage_pct: f64,
pub fresh_alive_writers: usize,
pub fresh_coverage_pct: f64,
pub rtt_ms: Option<f64>, pub rtt_ms: Option<f64>,
pub load: usize, pub load: usize,
} }
@ -55,6 +63,8 @@ pub(crate) struct MeApiStatusSnapshot {
pub required_writers: usize, pub required_writers: usize,
pub alive_writers: usize, pub alive_writers: usize,
pub coverage_pct: f64, pub coverage_pct: f64,
pub fresh_alive_writers: usize,
pub fresh_coverage_pct: f64,
pub writers: Vec<MeApiWriterStatusSnapshot>, pub writers: Vec<MeApiWriterStatusSnapshot>,
pub dcs: Vec<MeApiDcStatusSnapshot>, pub dcs: Vec<MeApiDcStatusSnapshot>,
} }
@ -213,6 +223,8 @@ impl MePool {
pub(crate) async fn api_status_snapshot(&self) -> MeApiStatusSnapshot { pub(crate) async fn api_status_snapshot(&self) -> MeApiStatusSnapshot {
let now_epoch_secs = Self::now_epoch_secs(); let now_epoch_secs = Self::now_epoch_secs();
let active_generation = self.current_generation();
let drain_ttl_secs = self.me_pool_drain_ttl_secs.load(Ordering::Relaxed);
let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new(); let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new();
if self.decision.ipv4_me { if self.decision.ipv4_me {
@ -239,6 +251,7 @@ impl MePool {
let mut live_writers_by_dc_endpoint = HashMap::<(i16, SocketAddr), usize>::new(); let mut live_writers_by_dc_endpoint = HashMap::<(i16, SocketAddr), usize>::new();
let mut live_writers_by_dc = HashMap::<i16, usize>::new(); let mut live_writers_by_dc = HashMap::<i16, usize>::new();
let mut fresh_writers_by_dc = HashMap::<i16, usize>::new();
let mut dc_rtt_agg = HashMap::<i16, (f64, u64)>::new(); let mut dc_rtt_agg = HashMap::<i16, (f64, u64)>::new();
let mut writer_rows = Vec::<MeApiWriterStatusSnapshot>::with_capacity(writers.len()); let mut writer_rows = Vec::<MeApiWriterStatusSnapshot>::with_capacity(writers.len());
@ -247,6 +260,10 @@ impl MePool {
let dc = i16::try_from(writer.writer_dc).ok(); let dc = i16::try_from(writer.writer_dc).ok();
let draining = writer.draining.load(Ordering::Relaxed); let draining = writer.draining.load(Ordering::Relaxed);
let degraded = writer.degraded.load(Ordering::Relaxed); let degraded = writer.degraded.load(Ordering::Relaxed);
let matches_active_generation = writer.generation == active_generation;
let in_desired_map = dc
.and_then(|dc_idx| endpoints_by_dc.get(&dc_idx))
.is_some_and(|endpoints| endpoints.contains(&endpoint));
let bound_clients = activity let bound_clients = activity
.bound_clients_by_writer .bound_clients_by_writer
.get(&writer.id) .get(&writer.id)
@ -256,6 +273,21 @@ impl MePool {
.get(&writer.id) .get(&writer.id)
.map(|idle_ts| now_epoch_secs.saturating_sub(*idle_ts)); .map(|idle_ts| now_epoch_secs.saturating_sub(*idle_ts));
let rtt_ema_ms = rtt.get(&writer.id).map(|(_, ema)| *ema); let rtt_ema_ms = rtt.get(&writer.id).map(|(_, ema)| *ema);
let allow_drain_fallback = writer.allow_drain_fallback.load(Ordering::Relaxed);
let drain_started_at_epoch_secs = writer
.draining_started_at_epoch_secs
.load(Ordering::Relaxed);
let drain_deadline_epoch_secs = writer
.drain_deadline_epoch_secs
.load(Ordering::Relaxed);
let drain_started_at_epoch_secs =
(drain_started_at_epoch_secs != 0).then_some(drain_started_at_epoch_secs);
let drain_deadline_epoch_secs =
(drain_deadline_epoch_secs != 0).then_some(drain_deadline_epoch_secs);
let drain_over_ttl = draining
&& drain_ttl_secs > 0
&& drain_started_at_epoch_secs
.is_some_and(|started| now_epoch_secs.saturating_sub(started) > drain_ttl_secs);
let state = match WriterContour::from_u8(writer.contour.load(Ordering::Relaxed)) { let state = match WriterContour::from_u8(writer.contour.load(Ordering::Relaxed)) {
WriterContour::Warm => "warm", WriterContour::Warm => "warm",
WriterContour::Active => "active", WriterContour::Active => "active",
@ -273,6 +305,9 @@ impl MePool {
entry.0 += ema_ms; entry.0 += ema_ms;
entry.1 += 1; entry.1 += 1;
} }
if matches_active_generation && in_desired_map {
*fresh_writers_by_dc.entry(dc_idx).or_insert(0) += 1;
}
} }
} }
@ -287,6 +322,12 @@ impl MePool {
bound_clients, bound_clients,
idle_for_secs, idle_for_secs,
rtt_ema_ms, rtt_ema_ms,
matches_active_generation,
in_desired_map,
allow_drain_fallback,
drain_started_at_epoch_secs,
drain_deadline_epoch_secs,
drain_over_ttl,
}); });
} }
@ -295,6 +336,7 @@ impl MePool {
let mut dcs = Vec::<MeApiDcStatusSnapshot>::with_capacity(endpoints_by_dc.len()); let mut dcs = Vec::<MeApiDcStatusSnapshot>::with_capacity(endpoints_by_dc.len());
let mut available_endpoints = 0usize; let mut available_endpoints = 0usize;
let mut alive_writers = 0usize; let mut alive_writers = 0usize;
let mut fresh_alive_writers = 0usize;
let floor_mode = self.floor_mode(); let floor_mode = self.floor_mode();
let adaptive_cpu_cores = (self let adaptive_cpu_cores = (self
.me_adaptive_floor_cpu_cores_effective .me_adaptive_floor_cpu_cores_effective
@ -333,6 +375,7 @@ impl MePool {
let floor_capped = matches!(floor_mode, MeFloorMode::Adaptive) let floor_capped = matches!(floor_mode, MeFloorMode::Adaptive)
&& dc_required_writers < base_required; && dc_required_writers < base_required;
let dc_alive_writers = live_writers_by_dc.get(&dc).copied().unwrap_or(0); let dc_alive_writers = live_writers_by_dc.get(&dc).copied().unwrap_or(0);
let dc_fresh_alive_writers = fresh_writers_by_dc.get(&dc).copied().unwrap_or(0);
let dc_load = activity let dc_load = activity
.active_sessions_by_target_dc .active_sessions_by_target_dc
.get(&dc) .get(&dc)
@ -344,6 +387,7 @@ impl MePool {
available_endpoints += dc_available_endpoints; available_endpoints += dc_available_endpoints;
alive_writers += dc_alive_writers; alive_writers += dc_alive_writers;
fresh_alive_writers += dc_fresh_alive_writers;
dcs.push(MeApiDcStatusSnapshot { dcs.push(MeApiDcStatusSnapshot {
dc, dc,
@ -367,6 +411,8 @@ impl MePool {
floor_capped, floor_capped,
alive_writers: dc_alive_writers, alive_writers: dc_alive_writers,
coverage_pct: ratio_pct(dc_alive_writers, dc_required_writers), coverage_pct: ratio_pct(dc_alive_writers, dc_required_writers),
fresh_alive_writers: dc_fresh_alive_writers,
fresh_coverage_pct: ratio_pct(dc_fresh_alive_writers, dc_required_writers),
rtt_ms: dc_rtt_ms, rtt_ms: dc_rtt_ms,
load: dc_load, load: dc_load,
}); });
@ -381,6 +427,8 @@ impl MePool {
required_writers, required_writers,
alive_writers, alive_writers,
coverage_pct: ratio_pct(alive_writers, required_writers), coverage_pct: ratio_pct(alive_writers, required_writers),
fresh_alive_writers,
fresh_coverage_pct: ratio_pct(fresh_alive_writers, required_writers),
writers: writer_rows, writers: writer_rows,
dcs, dcs,
} }

View File

@ -178,6 +178,7 @@ impl MePool {
allow_drain_fallback: allow_drain_fallback.clone(), allow_drain_fallback: allow_drain_fallback.clone(),
}; };
self.writers.write().await.push(writer.clone()); self.writers.write().await.push(writer.clone());
self.registry.register_writer(writer_id, tx.clone()).await;
self.registry.mark_writer_idle(writer_id).await; self.registry.mark_writer_idle(writer_id).await;
self.conn_count.fetch_add(1, Ordering::Relaxed); self.conn_count.fetch_add(1, Ordering::Relaxed);
self.writer_available.notify_one(); self.writer_available.notify_one();
@ -414,9 +415,15 @@ impl MePool {
}; };
let (conn_id, mut service_rx) = pool.registry.register().await; let (conn_id, mut service_rx) = pool.registry.register().await;
pool.registry if !pool
.bind_writer(conn_id, writer_id, tx_signal.clone(), meta.clone()) .registry
.await; .bind_writer(conn_id, writer_id, meta.clone())
.await
{
let _ = pool.registry.unregister(conn_id).await;
stats_signal.increment_me_rpc_proxy_req_signal_skipped_no_meta_total();
continue;
}
let payload = build_proxy_req_payload( let payload = build_proxy_req_payload(
conn_id, conn_id,
@ -521,6 +528,12 @@ impl MePool {
self.conn_count.fetch_sub(1, Ordering::Relaxed); self.conn_count.fetch_sub(1, Ordering::Relaxed);
} }
} }
let conns = self.registry.writer_lost(writer_id).await;
{
let mut tracker = self.ping_tracker.lock().await;
tracker.retain(|_, (_, wid)| *wid != writer_id);
}
self.rtt_stats.lock().await.remove(&writer_id);
if let Some(tx) = close_tx { if let Some(tx) = close_tx {
let _ = tx.send(WriterCommand::Close).await; let _ = tx.send(WriterCommand::Close).await;
} }
@ -533,8 +546,7 @@ impl MePool {
} }
self.trigger_immediate_refill_for_dc(addr, writer_dc); self.trigger_immediate_refill_for_dc(addr, writer_dc);
} }
self.rtt_stats.lock().await.remove(&writer_id); conns
self.registry.writer_lost(writer_id).await
} }
pub(crate) async fn mark_writer_draining_with_timeout( pub(crate) async fn mark_writer_draining_with_timeout(

View File

@ -138,6 +138,15 @@ impl ConnRegistry {
(id, rx) (id, rx)
} }
pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender<WriterCommand>) {
let mut inner = self.inner.write().await;
inner.writers.insert(writer_id, tx);
inner
.conns_for_writer
.entry(writer_id)
.or_insert_with(HashSet::new);
}
/// Unregister connection, returning associated writer_id if any. /// Unregister connection, returning associated writer_id if any.
pub async fn unregister(&self, id: u64) -> Option<u64> { pub async fn unregister(&self, id: u64) -> Option<u64> {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
@ -282,24 +291,39 @@ impl ConnRegistry {
} }
} }
pub async fn bind_writer( pub async fn bind_writer(&self, conn_id: u64, writer_id: u64, meta: ConnMeta) -> bool {
&self,
conn_id: u64,
writer_id: u64,
tx: mpsc::Sender<WriterCommand>,
meta: ConnMeta,
) {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
inner.meta.entry(conn_id).or_insert(meta.clone()); if !inner.writers.contains_key(&writer_id) {
inner.writer_for_conn.insert(conn_id, writer_id); return false;
}
let previous_writer_id = inner.writer_for_conn.insert(conn_id, writer_id);
if let Some(previous_writer_id) = previous_writer_id
&& previous_writer_id != writer_id
{
let became_empty = if let Some(set) = inner.conns_for_writer.get_mut(&previous_writer_id)
{
set.remove(&conn_id);
set.is_empty()
} else {
false
};
if became_empty {
inner
.writer_idle_since_epoch_secs
.insert(previous_writer_id, Self::now_epoch_secs());
}
}
inner.meta.insert(conn_id, meta.clone());
inner.last_meta_for_writer.insert(writer_id, meta); inner.last_meta_for_writer.insert(writer_id, meta);
inner.writer_idle_since_epoch_secs.remove(&writer_id); inner.writer_idle_since_epoch_secs.remove(&writer_id);
inner.writers.entry(writer_id).or_insert_with(|| tx.clone());
inner inner
.conns_for_writer .conns_for_writer
.entry(writer_id) .entry(writer_id)
.or_insert_with(HashSet::new) .or_insert_with(HashSet::new)
.insert(conn_id); .insert(conn_id);
true
} }
pub async fn mark_writer_idle(&self, writer_id: u64) { pub async fn mark_writer_idle(&self, writer_id: u64) {
@ -384,6 +408,9 @@ impl ConnRegistry {
let mut out = Vec::new(); let mut out = Vec::new();
for conn_id in conns { for conn_id in conns {
if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) {
continue;
}
inner.writer_for_conn.remove(&conn_id); inner.writer_for_conn.remove(&conn_id);
if let Some(m) = inner.meta.get(&conn_id) { if let Some(m) = inner.meta.get(&conn_id) {
out.push(BoundConn { out.push(BoundConn {
@ -427,13 +454,15 @@ mod tests {
let (conn_c, _rx_c) = registry.register().await; let (conn_c, _rx_c) = registry.register().await;
let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8); let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8);
let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8); let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8);
registry.register_writer(10, writer_tx_a.clone()).await;
registry.register_writer(20, writer_tx_b.clone()).await;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
assert!(
registry registry
.bind_writer( .bind_writer(
conn_a, conn_a,
10, 10,
writer_tx_a.clone(),
ConnMeta { ConnMeta {
target_dc: 2, target_dc: 2,
client_addr: addr, client_addr: addr,
@ -441,12 +470,13 @@ mod tests {
proto_flags: 0, proto_flags: 0,
}, },
) )
.await; .await
);
assert!(
registry registry
.bind_writer( .bind_writer(
conn_b, conn_b,
10, 10,
writer_tx_a,
ConnMeta { ConnMeta {
target_dc: -2, target_dc: -2,
client_addr: addr, client_addr: addr,
@ -454,12 +484,13 @@ mod tests {
proto_flags: 0, proto_flags: 0,
}, },
) )
.await; .await
);
assert!(
registry registry
.bind_writer( .bind_writer(
conn_c, conn_c,
20, 20,
writer_tx_b,
ConnMeta { ConnMeta {
target_dc: 4, target_dc: 4,
client_addr: addr, client_addr: addr,
@ -467,7 +498,8 @@ mod tests {
proto_flags: 0, proto_flags: 0,
}, },
) )
.await; .await
);
let snapshot = registry.writer_activity_snapshot().await; let snapshot = registry.writer_activity_snapshot().await;
assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&2)); assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&2));
@ -476,4 +508,130 @@ mod tests {
assert_eq!(snapshot.active_sessions_by_target_dc.get(&-2), Some(&1)); assert_eq!(snapshot.active_sessions_by_target_dc.get(&-2), Some(&1));
assert_eq!(snapshot.active_sessions_by_target_dc.get(&4), Some(&1)); assert_eq!(snapshot.active_sessions_by_target_dc.get(&4), Some(&1));
} }
#[tokio::test]
async fn bind_writer_rebinds_conn_atomically() {
let registry = ConnRegistry::new();
let (conn_id, _rx) = registry.register().await;
let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8);
let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8);
registry.register_writer(10, writer_tx_a).await;
registry.register_writer(20, writer_tx_b).await;
let client_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
let first_our_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)), 443);
let second_our_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(2, 2, 2, 2)), 443);
assert!(
registry
.bind_writer(
conn_id,
10,
ConnMeta {
target_dc: 2,
client_addr,
our_addr: first_our_addr,
proto_flags: 1,
},
)
.await
);
assert!(
registry
.bind_writer(
conn_id,
20,
ConnMeta {
target_dc: 2,
client_addr,
our_addr: second_our_addr,
proto_flags: 2,
},
)
.await
);
let writer = registry.get_writer(conn_id).await.expect("writer binding");
assert_eq!(writer.writer_id, 20);
let meta = registry.get_meta(conn_id).await.expect("conn meta");
assert_eq!(meta.our_addr, second_our_addr);
assert_eq!(meta.proto_flags, 2);
let snapshot = registry.writer_activity_snapshot().await;
assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&0));
assert_eq!(snapshot.bound_clients_by_writer.get(&20), Some(&1));
assert!(registry.writer_idle_since_snapshot().await.contains_key(&10));
}
#[tokio::test]
async fn writer_lost_does_not_drop_rebound_conn() {
let registry = ConnRegistry::new();
let (conn_id, _rx) = registry.register().await;
let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8);
let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8);
registry.register_writer(10, writer_tx_a).await;
registry.register_writer(20, writer_tx_b).await;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
assert!(
registry
.bind_writer(
conn_id,
10,
ConnMeta {
target_dc: 2,
client_addr: addr,
our_addr: addr,
proto_flags: 0,
},
)
.await
);
assert!(
registry
.bind_writer(
conn_id,
20,
ConnMeta {
target_dc: 2,
client_addr: addr,
our_addr: addr,
proto_flags: 1,
},
)
.await
);
let lost = registry.writer_lost(10).await;
assert!(lost.is_empty());
assert_eq!(registry.get_writer(conn_id).await.expect("writer").writer_id, 20);
let removed_writer = registry.unregister(conn_id).await;
assert_eq!(removed_writer, Some(20));
assert!(registry.is_writer_empty(20).await);
}
#[tokio::test]
async fn bind_writer_rejects_unregistered_writer() {
let registry = ConnRegistry::new();
let (conn_id, _rx) = registry.register().await;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
assert!(
!registry
.bind_writer(
conn_id,
10,
ConnMeta {
target_dc: 2,
client_addr: addr,
our_addr: addr,
proto_flags: 0,
},
)
.await
);
assert!(registry.get_writer(conn_id).await.is_none());
}
} }

View File

@ -375,9 +375,14 @@ impl MePool {
match w.tx.try_send(WriterCommand::Data(payload.clone())) { match w.tx.try_send(WriterCommand::Data(payload.clone())) {
Ok(()) => { Ok(()) => {
self.stats.increment_me_writer_pick_success_try_total(pick_mode); self.stats.increment_me_writer_pick_success_try_total(pick_mode);
self.registry if !self.registry.bind_writer(conn_id, w.id, meta).await {
.bind_writer(conn_id, w.id, w.tx.clone(), meta) debug!(
.await; conn_id,
writer_id = w.id,
"ME writer disappeared before bind commit, retrying"
);
continue;
}
if w.generation < self.current_generation() { if w.generation < self.current_generation() {
self.stats.increment_pool_stale_pick_total(); self.stats.increment_pool_stale_pick_total();
debug!( debug!(
@ -421,9 +426,14 @@ impl MePool {
Ok(()) => { Ok(()) => {
self.stats self.stats
.increment_me_writer_pick_success_fallback_total(pick_mode); .increment_me_writer_pick_success_fallback_total(pick_mode);
self.registry if !self.registry.bind_writer(conn_id, w.id, meta).await {
.bind_writer(conn_id, w.id, w.tx.clone(), meta) debug!(
.await; conn_id,
writer_id = w.id,
"ME writer disappeared before fallback bind commit, retrying"
);
continue;
}
if w.generation < self.current_generation() { if w.generation < self.current_generation() {
self.stats.increment_pool_stale_pick_total(); self.stats.increment_pool_stale_pick_total();
} }