Compare commits

..

24 Commits
3.2.2 ... 3.3.2

Author SHA1 Message Date
Alexey
8f3bdaec2c Merge pull request #329 from telemt/bump
Update Cargo.toml
2026-03-05 23:23:40 +03:00
Alexey
69b02caf77 Update Cargo.toml 2026-03-05 23:23:24 +03:00
Alexey
3854955069 Merge pull request #328 from telemt/flow-mep
Secret Atomic Snapshot + KDF Fingerprint on RwLock
2026-03-05 23:23:01 +03:00
Alexey
9b84fc7a5b Secret Atomic Snapshot + KDF Fingerprint on RwLock
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-05 23:18:26 +03:00
Alexey
e7cb9238dc Merge pull request #327 from telemt/bump
Update Cargo.toml
2026-03-05 22:32:20 +03:00
Alexey
0e2cbe6178 Update Cargo.toml 2026-03-05 22:32:08 +03:00
Alexey
cd076aeeeb Merge pull request #326 from telemt/flow-noroute
HybridAsyncPersistent - new ME Route NoWriter Mode
2026-03-05 22:31:46 +03:00
Alexey
d683faf922 HybridAsyncPersistent - new ME Route NoWriter Mode
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-05 22:31:01 +03:00
Alexey
0494f8ac8b Merge pull request #325 from telemt/bump
Update Cargo.toml
2026-03-05 16:40:40 +03:00
Alexey
48ce59900e Update Cargo.toml 2026-03-05 16:40:28 +03:00
Alexey
84e95fd229 ME Pool Init fixes: merge pull request #324 from telemt/flow-fixes
ME Pool Init fixes
2026-03-05 16:35:00 +03:00
Alexey
a80be78345 DC writer floor is below required only in runtime 2026-03-05 16:32:31 +03:00
Alexey
64130dd02e MEP not ready only after 3 attempts 2026-03-05 16:13:40 +03:00
Alexey
d62a6e0417 Shutdown Timer fixes 2026-03-05 16:04:32 +03:00
Alexey
3260746785 Init + Uptime timers 2026-03-05 15:48:09 +03:00
Alexey
8066ea2163 ME Pool Init fixes 2026-03-05 15:31:36 +03:00
Alexey
813f1df63e Performance improvements: merge pull request #323 from telemt/flow-perf
Performance improvements
2026-03-05 14:43:10 +03:00
Alexey
09bdafa718 Performance improvements 2026-03-05 14:39:32 +03:00
Alexey
fb0f75df43 Merge pull request #322 from Dimasssss/patch-3
Update README.md
2026-03-05 14:10:01 +03:00
Alexey
39255df549 Unique IP always in Metrics+API: merge pull request #321 from telemt/flow-iplimit
Unique IP always in Metrics+API
2026-03-05 14:09:40 +03:00
Dimasssss
456495fd62 Update README.md 2026-03-05 13:59:58 +03:00
Alexey
83cadc0bf3 No lock-contention in ip-tracker 2026-03-05 13:52:27 +03:00
Alexey
0b1a8cd3f8 IP Limit fixes 2026-03-05 13:41:41 +03:00
Alexey
565b4ee923 Unique IP always in Metrics+API
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-05 13:21:11 +03:00
23 changed files with 1785 additions and 488 deletions

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.2.2" version = "3.3.2"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@@ -118,8 +118,8 @@ We welcome ideas, architectural feedback, and pull requests.
## Quick Start Guide ## Quick Start Guide
### [Quick Start Guid RU](docs/QUICK_START_GUIDE.ru.md) ### [Quick Start Guide RU](docs/QUICK_START_GUIDE.ru.md)
### [Quick Start Guid EN](docs/QUICK_START_GUIDE.en.md) ### [Quick Start Guide EN](docs/QUICK_START_GUIDE.en.md)
### Advanced ### Advanced

View File

@@ -1,3 +1,5 @@
use std::net::IpAddr;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use hyper::StatusCode; use hyper::StatusCode;
use rand::Rng; use rand::Rng;
@@ -369,6 +371,9 @@ pub(super) struct UserInfo {
pub(super) max_unique_ips: Option<usize>, pub(super) max_unique_ips: Option<usize>,
pub(super) current_connections: u64, pub(super) current_connections: u64,
pub(super) active_unique_ips: usize, pub(super) active_unique_ips: usize,
pub(super) active_unique_ips_list: Vec<IpAddr>,
pub(super) recent_unique_ips: usize,
pub(super) recent_unique_ips_list: Vec<IpAddr>,
pub(super) total_octets: u64, pub(super) total_octets: u64,
pub(super) links: UserLinks, pub(super) links: UserLinks,
} }

View File

@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::net::IpAddr; use std::net::IpAddr;
use hyper::StatusCode; use hyper::StatusCode;
@@ -112,6 +111,9 @@ pub(super) async fn create_user(
max_unique_ips: updated_limit, max_unique_ips: updated_limit,
current_connections: 0, current_connections: 0,
active_unique_ips: 0, active_unique_ips: 0,
active_unique_ips_list: Vec::new(),
recent_unique_ips: 0,
recent_unique_ips_list: Vec::new(),
total_octets: 0, total_octets: 0,
links: build_user_links( links: build_user_links(
&cfg, &cfg,
@@ -300,18 +302,21 @@ pub(super) async fn users_from_config(
startup_detected_ip_v4: Option<IpAddr>, startup_detected_ip_v4: Option<IpAddr>,
startup_detected_ip_v6: Option<IpAddr>, startup_detected_ip_v6: Option<IpAddr>,
) -> Vec<UserInfo> { ) -> Vec<UserInfo> {
let ip_counts = ip_tracker
.get_stats()
.await
.into_iter()
.map(|(user, count, _)| (user, count))
.collect::<HashMap<_, _>>();
let mut names = cfg.access.users.keys().cloned().collect::<Vec<_>>(); let mut names = cfg.access.users.keys().cloned().collect::<Vec<_>>();
names.sort(); names.sort();
let active_ip_lists = ip_tracker.get_active_ips_for_users(&names).await;
let recent_ip_lists = ip_tracker.get_recent_ips_for_users(&names).await;
let mut users = Vec::with_capacity(names.len()); let mut users = Vec::with_capacity(names.len());
for username in names { for username in names {
let active_ip_list = active_ip_lists
.get(&username)
.cloned()
.unwrap_or_else(Vec::new);
let recent_ip_list = recent_ip_lists
.get(&username)
.cloned()
.unwrap_or_else(Vec::new);
let links = cfg let links = cfg
.access .access
.users .users
@@ -340,7 +345,10 @@ pub(super) async fn users_from_config(
data_quota_bytes: cfg.access.user_data_quota.get(&username).copied(), data_quota_bytes: cfg.access.user_data_quota.get(&username).copied(),
max_unique_ips: cfg.access.user_max_unique_ips.get(&username).copied(), max_unique_ips: cfg.access.user_max_unique_ips.get(&username).copied(),
current_connections: stats.get_user_curr_connects(&username), current_connections: stats.get_user_curr_connects(&username),
active_unique_ips: ip_counts.get(&username).copied().unwrap_or(0), active_unique_ips: active_ip_list.len(),
active_unique_ips_list: active_ip_list,
recent_unique_ips: recent_ip_list.len(),
recent_unique_ips_list: recent_ip_list,
total_octets: stats.get_user_total_octets(&username), total_octets: stats.get_user_total_octets(&username),
links, links,
username, username,

View File

@@ -129,6 +129,10 @@ pub(crate) fn default_unknown_dc_log_path() -> Option<String> {
Some("unknown-dc.txt".to_string()) Some("unknown-dc.txt".to_string())
} }
pub(crate) fn default_unknown_dc_file_log_enabled() -> bool {
false
}
pub(crate) fn default_pool_size() -> usize { pub(crate) fn default_pool_size() -> usize {
8 8
} }
@@ -137,6 +141,14 @@ pub(crate) fn default_proxy_secret_path() -> Option<String> {
Some("proxy-secret".to_string()) Some("proxy-secret".to_string())
} }
pub(crate) fn default_proxy_config_v4_cache_path() -> Option<String> {
Some("cache/proxy-config-v4.txt".to_string())
}
pub(crate) fn default_proxy_config_v6_cache_path() -> Option<String> {
Some("cache/proxy-config-v6.txt".to_string())
}
pub(crate) fn default_middle_proxy_nat_stun() -> Option<String> { pub(crate) fn default_middle_proxy_nat_stun() -> Option<String> {
None None
} }
@@ -273,6 +285,18 @@ pub(crate) fn default_me_route_backpressure_high_watermark_pct() -> u8 {
80 80
} }
pub(crate) fn default_me_route_no_writer_wait_ms() -> u64 {
250
}
pub(crate) fn default_me_route_inline_recovery_attempts() -> u32 {
3
}
pub(crate) fn default_me_route_inline_recovery_wait_ms() -> u64 {
3000
}
pub(crate) fn default_beobachten_minutes() -> u64 { pub(crate) fn default_beobachten_minutes() -> u64 {
10 10
} }

View File

@@ -381,6 +381,22 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
warned = true; warned = true;
warn!("config reload: general.middle_proxy_pool_size changed; restart required"); warn!("config reload: general.middle_proxy_pool_size changed; restart required");
} }
if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode
|| old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms
|| old.general.me_route_inline_recovery_attempts
!= new.general.me_route_inline_recovery_attempts
|| old.general.me_route_inline_recovery_wait_ms
!= new.general.me_route_inline_recovery_wait_ms
{
warned = true;
warn!("config reload: general.me_route_no_writer_* changed; restart required");
}
if old.general.unknown_dc_log_path != new.general.unknown_dc_log_path
|| old.general.unknown_dc_file_log_enabled != new.general.unknown_dc_file_log_enabled
{
warned = true;
warn!("config reload: general.unknown_dc_* changed; restart required");
}
if old.general.me_init_retry_attempts != new.general.me_init_retry_attempts { if old.general.me_init_retry_attempts != new.general.me_init_retry_attempts {
warned = true; warned = true;
warn!("config reload: general.me_init_retry_attempts changed; restart required"); warn!("config reload: general.me_init_retry_attempts changed; restart required");
@@ -389,6 +405,12 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
warned = true; warned = true;
warn!("config reload: general.me2dc_fallback changed; restart required"); warn!("config reload: general.me2dc_fallback changed; restart required");
} }
if old.general.proxy_config_v4_cache_path != new.general.proxy_config_v4_cache_path
|| old.general.proxy_config_v6_cache_path != new.general.proxy_config_v6_cache_path
{
warned = true;
warn!("config reload: general.proxy_config_*_cache_path changed; restart required");
}
if old.general.me_keepalive_enabled != new.general.me_keepalive_enabled if old.general.me_keepalive_enabled != new.general.me_keepalive_enabled
|| old.general.me_keepalive_interval_secs != new.general.me_keepalive_interval_secs || old.general.me_keepalive_interval_secs != new.general.me_keepalive_interval_secs
|| old.general.me_keepalive_jitter_secs != new.general.me_keepalive_jitter_secs || old.general.me_keepalive_jitter_secs != new.general.me_keepalive_jitter_secs

View File

@@ -203,6 +203,22 @@ impl ProxyConfig {
sanitize_ad_tag(&mut config.general.ad_tag); sanitize_ad_tag(&mut config.general.ad_tag);
if let Some(path) = &config.general.proxy_config_v4_cache_path
&& path.trim().is_empty()
{
return Err(ProxyError::Config(
"general.proxy_config_v4_cache_path cannot be empty when provided".to_string(),
));
}
if let Some(path) = &config.general.proxy_config_v6_cache_path
&& path.trim().is_empty()
{
return Err(ProxyError::Config(
"general.proxy_config_v6_cache_path cannot be empty when provided".to_string(),
));
}
if let Some(update_every) = config.general.update_every { if let Some(update_every) = config.general.update_every {
if update_every == 0 { if update_every == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
@@ -410,6 +426,24 @@ impl ProxyConfig {
)); ));
} }
if !(10..=5000).contains(&config.general.me_route_no_writer_wait_ms) {
return Err(ProxyError::Config(
"general.me_route_no_writer_wait_ms must be within [10, 5000]".to_string(),
));
}
if config.general.me_route_inline_recovery_attempts == 0 {
return Err(ProxyError::Config(
"general.me_route_inline_recovery_attempts must be > 0".to_string(),
));
}
if !(10..=30000).contains(&config.general.me_route_inline_recovery_wait_ms) {
return Err(ProxyError::Config(
"general.me_route_inline_recovery_wait_ms must be within [10, 30000]".to_string(),
));
}
if config.server.api.request_body_limit_bytes == 0 { if config.server.api.request_body_limit_bytes == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"server.api.request_body_limit_bytes must be > 0".to_string(), "server.api.request_body_limit_bytes must be > 0".to_string(),
@@ -514,6 +548,12 @@ impl ProxyConfig {
config.general.middle_proxy_nat_probe = true; config.general.middle_proxy_nat_probe = true;
warn!("Auto-enabled middle_proxy_nat_probe for middle proxy mode"); warn!("Auto-enabled middle_proxy_nat_probe for middle proxy mode");
} }
if config.general.use_middle_proxy && !config.general.me_secret_atomic_snapshot {
config.general.me_secret_atomic_snapshot = true;
warn!(
"Auto-enabled me_secret_atomic_snapshot for middle proxy mode to keep KDF key_selector/secret coherent"
);
}
validate_network_cfg(&mut config.network)?; validate_network_cfg(&mut config.network)?;
crate::network::dns_overrides::validate_entries(&config.network.dns_overrides)?; crate::network::dns_overrides::validate_entries(&config.network.dns_overrides)?;
@@ -673,6 +713,14 @@ mod tests {
cfg.general.me2dc_fallback, cfg.general.me2dc_fallback,
default_me2dc_fallback() default_me2dc_fallback()
); );
assert_eq!(
cfg.general.proxy_config_v4_cache_path,
default_proxy_config_v4_cache_path()
);
assert_eq!(
cfg.general.proxy_config_v6_cache_path,
default_proxy_config_v6_cache_path()
);
assert_eq!( assert_eq!(
cfg.general.me_single_endpoint_shadow_writers, cfg.general.me_single_endpoint_shadow_writers,
default_me_single_endpoint_shadow_writers() default_me_single_endpoint_shadow_writers()
@@ -783,6 +831,14 @@ mod tests {
default_me_init_retry_attempts() default_me_init_retry_attempts()
); );
assert_eq!(general.me2dc_fallback, default_me2dc_fallback()); assert_eq!(general.me2dc_fallback, default_me2dc_fallback());
assert_eq!(
general.proxy_config_v4_cache_path,
default_proxy_config_v4_cache_path()
);
assert_eq!(
general.proxy_config_v6_cache_path,
default_proxy_config_v6_cache_path()
);
assert_eq!( assert_eq!(
general.me_single_endpoint_shadow_writers, general.me_single_endpoint_shadow_writers,
default_me_single_endpoint_shadow_writers() default_me_single_endpoint_shadow_writers()
@@ -1206,6 +1262,85 @@ mod tests {
let _ = std::fs::remove_file(path_valid); let _ = std::fs::remove_file(path_valid);
} }
#[test]
fn me_route_no_writer_wait_ms_out_of_range_is_rejected() {
let toml = r#"
[general]
me_route_no_writer_wait_ms = 5
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_route_no_writer_wait_ms_out_of_range_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_route_no_writer_wait_ms must be within [10, 5000]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn me_route_no_writer_mode_is_parsed() {
let toml = r#"
[general]
me_route_no_writer_mode = "inline_recovery_legacy"
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_route_no_writer_mode_parse_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert_eq!(
cfg.general.me_route_no_writer_mode,
crate::config::MeRouteNoWriterMode::InlineRecoveryLegacy
);
let _ = std::fs::remove_file(path);
}
#[test]
fn proxy_config_cache_paths_empty_are_rejected() {
let toml = r#"
[general]
proxy_config_v4_cache_path = " "
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_proxy_config_v4_cache_path_empty_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.proxy_config_v4_cache_path cannot be empty"));
let _ = std::fs::remove_file(path);
let toml_v6 = r#"
[general]
proxy_config_v6_cache_path = ""
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let path_v6 = dir.join("telemt_proxy_config_v6_cache_path_empty_test.toml");
std::fs::write(&path_v6, toml_v6).unwrap();
let err_v6 = ProxyConfig::load(&path_v6).unwrap_err().to_string();
assert!(err_v6.contains("general.proxy_config_v6_cache_path cannot be empty"));
let _ = std::fs::remove_file(path_v6);
}
#[test] #[test]
fn me_hardswap_warmup_defaults_are_set() { fn me_hardswap_warmup_defaults_are_set() {
let toml = r#" let toml = r#"

View File

@@ -183,6 +183,35 @@ impl MeFloorMode {
} }
} }
/// Middle-End route behavior when no writer is immediately available.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum MeRouteNoWriterMode {
AsyncRecoveryFailfast,
InlineRecoveryLegacy,
#[default]
HybridAsyncPersistent,
}
impl MeRouteNoWriterMode {
pub fn as_u8(self) -> u8 {
match self {
MeRouteNoWriterMode::AsyncRecoveryFailfast => 0,
MeRouteNoWriterMode::InlineRecoveryLegacy => 1,
MeRouteNoWriterMode::HybridAsyncPersistent => 2,
}
}
pub fn from_u8(raw: u8) -> Self {
match raw {
0 => MeRouteNoWriterMode::AsyncRecoveryFailfast,
1 => MeRouteNoWriterMode::InlineRecoveryLegacy,
2 => MeRouteNoWriterMode::HybridAsyncPersistent,
_ => MeRouteNoWriterMode::HybridAsyncPersistent,
}
}
}
/// Per-user unique source IP limit mode. /// Per-user unique source IP limit mode.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
@@ -318,6 +347,14 @@ pub struct GeneralConfig {
#[serde(default = "default_proxy_secret_path")] #[serde(default = "default_proxy_secret_path")]
pub proxy_secret_path: Option<String>, pub proxy_secret_path: Option<String>,
/// Optional path to cache raw getProxyConfig (IPv4) snapshot for startup fallback.
#[serde(default = "default_proxy_config_v4_cache_path")]
pub proxy_config_v4_cache_path: Option<String>,
/// Optional path to cache raw getProxyConfigV6 snapshot for startup fallback.
#[serde(default = "default_proxy_config_v6_cache_path")]
pub proxy_config_v6_cache_path: Option<String>,
/// Global ad_tag (32 hex chars from @MTProxybot). Fallback when user has no per-user tag in access.user_ad_tags. /// Global ad_tag (32 hex chars from @MTProxybot). Fallback when user has no per-user tag in access.user_ad_tags.
#[serde(default)] #[serde(default)]
pub ad_tag: Option<String>, pub ad_tag: Option<String>,
@@ -511,6 +548,10 @@ pub struct GeneralConfig {
#[serde(default = "default_unknown_dc_log_path")] #[serde(default = "default_unknown_dc_log_path")]
pub unknown_dc_log_path: Option<String>, pub unknown_dc_log_path: Option<String>,
/// Enable unknown-DC file logging.
#[serde(default = "default_unknown_dc_file_log_enabled")]
pub unknown_dc_file_log_enabled: bool,
#[serde(default)] #[serde(default)]
pub log_level: LogLevel, pub log_level: LogLevel,
@@ -538,6 +579,22 @@ pub struct GeneralConfig {
#[serde(default = "default_me_route_backpressure_high_watermark_pct")] #[serde(default = "default_me_route_backpressure_high_watermark_pct")]
pub me_route_backpressure_high_watermark_pct: u8, pub me_route_backpressure_high_watermark_pct: u8,
/// ME route behavior when no writer is immediately available.
#[serde(default)]
pub me_route_no_writer_mode: MeRouteNoWriterMode,
/// Maximum wait time in milliseconds for async-recovery failfast mode.
#[serde(default = "default_me_route_no_writer_wait_ms")]
pub me_route_no_writer_wait_ms: u64,
/// Number of inline recovery attempts in legacy mode.
#[serde(default = "default_me_route_inline_recovery_attempts")]
pub me_route_inline_recovery_attempts: u32,
/// Maximum wait time in milliseconds for inline recovery in legacy mode.
#[serde(default = "default_me_route_inline_recovery_wait_ms")]
pub me_route_inline_recovery_wait_ms: u64,
/// [general.links] — proxy link generation overrides. /// [general.links] — proxy link generation overrides.
#[serde(default)] #[serde(default)]
pub links: LinksConfig, pub links: LinksConfig,
@@ -682,6 +739,8 @@ impl Default for GeneralConfig {
use_middle_proxy: default_true(), use_middle_proxy: default_true(),
ad_tag: None, ad_tag: None,
proxy_secret_path: default_proxy_secret_path(), proxy_secret_path: default_proxy_secret_path(),
proxy_config_v4_cache_path: default_proxy_config_v4_cache_path(),
proxy_config_v6_cache_path: default_proxy_config_v6_cache_path(),
middle_proxy_nat_ip: None, middle_proxy_nat_ip: None,
middle_proxy_nat_probe: default_true(), middle_proxy_nat_probe: default_true(),
middle_proxy_nat_stun: default_middle_proxy_nat_stun(), middle_proxy_nat_stun: default_middle_proxy_nat_stun(),
@@ -719,6 +778,7 @@ impl Default for GeneralConfig {
upstream_connect_failfast_hard_errors: default_upstream_connect_failfast_hard_errors(), upstream_connect_failfast_hard_errors: default_upstream_connect_failfast_hard_errors(),
stun_iface_mismatch_ignore: false, stun_iface_mismatch_ignore: false,
unknown_dc_log_path: default_unknown_dc_log_path(), unknown_dc_log_path: default_unknown_dc_log_path(),
unknown_dc_file_log_enabled: default_unknown_dc_file_log_enabled(),
log_level: LogLevel::Normal, log_level: LogLevel::Normal,
disable_colors: false, disable_colors: false,
telemetry: TelemetryConfig::default(), telemetry: TelemetryConfig::default(),
@@ -726,6 +786,10 @@ impl Default for GeneralConfig {
me_route_backpressure_base_timeout_ms: default_me_route_backpressure_base_timeout_ms(), me_route_backpressure_base_timeout_ms: default_me_route_backpressure_base_timeout_ms(),
me_route_backpressure_high_timeout_ms: default_me_route_backpressure_high_timeout_ms(), me_route_backpressure_high_timeout_ms: default_me_route_backpressure_high_timeout_ms(),
me_route_backpressure_high_watermark_pct: default_me_route_backpressure_high_watermark_pct(), me_route_backpressure_high_watermark_pct: default_me_route_backpressure_high_watermark_pct(),
me_route_no_writer_mode: MeRouteNoWriterMode::default(),
me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(),
me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(),
me_route_inline_recovery_wait_ms: default_me_route_inline_recovery_wait_ms(),
links: LinksConfig::default(), links: LinksConfig::default(),
crypto_pending_buffer: default_crypto_pending_buffer(), crypto_pending_buffer: default_crypto_pending_buffer(),
max_client_frame: default_max_client_frame(), max_client_frame: default_max_client_frame(),

View File

@@ -2,7 +2,7 @@
#![allow(dead_code)] #![allow(dead_code)]
use std::collections::{HashMap, HashSet}; use std::collections::HashMap;
use std::net::IpAddr; use std::net::IpAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@@ -13,7 +13,7 @@ use crate::config::UserMaxUniqueIpsMode;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct UserIpTracker { pub struct UserIpTracker {
active_ips: Arc<RwLock<HashMap<String, HashSet<IpAddr>>>>, active_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, usize>>>>,
recent_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, Instant>>>>, recent_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, Instant>>>>,
max_ips: Arc<RwLock<HashMap<String, usize>>>, max_ips: Arc<RwLock<HashMap<String, usize>>>,
limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>, limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>,
@@ -67,21 +67,14 @@ impl UserIpTracker {
let max_ips = self.max_ips.read().await; let max_ips = self.max_ips.read().await;
max_ips.get(username).copied() max_ips.get(username).copied()
}; };
let mode = *self.limit_mode.read().await;
let window = *self.limit_window.read().await;
let now = Instant::now();
let mut active_ips = self.active_ips.write().await; let mut active_ips = self.active_ips.write().await;
let user_active = active_ips let user_active = active_ips
.entry(username.to_string()) .entry(username.to_string())
.or_insert_with(HashSet::new); .or_insert_with(HashMap::new);
if limit.is_none() {
user_active.insert(ip);
return Ok(());
}
let limit = limit.unwrap_or_default();
let mode = *self.limit_mode.read().await;
let window = *self.limit_window.read().await;
let now = Instant::now();
let mut recent_ips = self.recent_ips.write().await; let mut recent_ips = self.recent_ips.write().await;
let user_recent = recent_ips let user_recent = recent_ips
@@ -89,32 +82,35 @@ impl UserIpTracker {
.or_insert_with(HashMap::new); .or_insert_with(HashMap::new);
Self::prune_recent(user_recent, now, window); Self::prune_recent(user_recent, now, window);
if user_active.contains(&ip) { if let Some(count) = user_active.get_mut(&ip) {
*count = count.saturating_add(1);
user_recent.insert(ip, now); user_recent.insert(ip, now);
return Ok(()); return Ok(());
} }
let active_limit_reached = user_active.len() >= limit; if let Some(limit) = limit {
let recent_limit_reached = user_recent.len() >= limit; let active_limit_reached = user_active.len() >= limit;
let deny = match mode { let recent_limit_reached = user_recent.len() >= limit;
UserMaxUniqueIpsMode::ActiveWindow => active_limit_reached, let deny = match mode {
UserMaxUniqueIpsMode::TimeWindow => recent_limit_reached, UserMaxUniqueIpsMode::ActiveWindow => active_limit_reached,
UserMaxUniqueIpsMode::Combined => active_limit_reached || recent_limit_reached, UserMaxUniqueIpsMode::TimeWindow => recent_limit_reached,
}; UserMaxUniqueIpsMode::Combined => active_limit_reached || recent_limit_reached,
};
if deny { if deny {
return Err(format!( return Err(format!(
"IP limit reached for user '{}': active={}/{} recent={}/{} mode={:?}", "IP limit reached for user '{}': active={}/{} recent={}/{} mode={:?}",
username, username,
user_active.len(), user_active.len(),
limit, limit,
user_recent.len(), user_recent.len(),
limit, limit,
mode mode
)); ));
}
} }
user_active.insert(ip); user_active.insert(ip, 1);
user_recent.insert(ip, now); user_recent.insert(ip, now);
Ok(()) Ok(())
} }
@@ -122,23 +118,73 @@ impl UserIpTracker {
pub async fn remove_ip(&self, username: &str, ip: IpAddr) { pub async fn remove_ip(&self, username: &str, ip: IpAddr) {
let mut active_ips = self.active_ips.write().await; let mut active_ips = self.active_ips.write().await;
if let Some(user_ips) = active_ips.get_mut(username) { if let Some(user_ips) = active_ips.get_mut(username) {
user_ips.remove(&ip); if let Some(count) = user_ips.get_mut(&ip) {
if *count > 1 {
*count -= 1;
} else {
user_ips.remove(&ip);
}
}
if user_ips.is_empty() { if user_ips.is_empty() {
active_ips.remove(username); active_ips.remove(username);
} }
} }
drop(active_ips); }
let mode = *self.limit_mode.read().await; pub async fn get_recent_counts_for_users(&self, users: &[String]) -> HashMap<String, usize> {
if matches!(mode, UserMaxUniqueIpsMode::ActiveWindow) { let window = *self.limit_window.read().await;
let mut recent_ips = self.recent_ips.write().await; let now = Instant::now();
if let Some(user_recent) = recent_ips.get_mut(username) { let recent_ips = self.recent_ips.read().await;
user_recent.remove(&ip);
if user_recent.is_empty() { let mut counts = HashMap::with_capacity(users.len());
recent_ips.remove(username); for user in users {
} let count = if let Some(user_recent) = recent_ips.get(user) {
} user_recent
.values()
.filter(|seen_at| now.duration_since(**seen_at) <= window)
.count()
} else {
0
};
counts.insert(user.clone(), count);
} }
counts
}
pub async fn get_active_ips_for_users(&self, users: &[String]) -> HashMap<String, Vec<IpAddr>> {
let active_ips = self.active_ips.read().await;
let mut out = HashMap::with_capacity(users.len());
for user in users {
let mut ips = active_ips
.get(user)
.map(|per_ip| per_ip.keys().copied().collect::<Vec<_>>())
.unwrap_or_else(Vec::new);
ips.sort();
out.insert(user.clone(), ips);
}
out
}
pub async fn get_recent_ips_for_users(&self, users: &[String]) -> HashMap<String, Vec<IpAddr>> {
let window = *self.limit_window.read().await;
let now = Instant::now();
let recent_ips = self.recent_ips.read().await;
let mut out = HashMap::with_capacity(users.len());
for user in users {
let mut ips = if let Some(user_recent) = recent_ips.get(user) {
user_recent
.iter()
.filter(|(_, seen_at)| now.duration_since(**seen_at) <= window)
.map(|(ip, _)| *ip)
.collect::<Vec<_>>()
} else {
Vec::new()
};
ips.sort();
out.insert(user.clone(), ips);
}
out
} }
pub async fn get_active_ip_count(&self, username: &str) -> usize { pub async fn get_active_ip_count(&self, username: &str) -> usize {
@@ -150,7 +196,7 @@ impl UserIpTracker {
let active_ips = self.active_ips.read().await; let active_ips = self.active_ips.read().await;
active_ips active_ips
.get(username) .get(username)
.map(|ips| ips.iter().copied().collect()) .map(|ips| ips.keys().copied().collect())
.unwrap_or_else(Vec::new) .unwrap_or_else(Vec::new)
} }
@@ -190,7 +236,7 @@ impl UserIpTracker {
let active_ips = self.active_ips.read().await; let active_ips = self.active_ips.read().await;
active_ips active_ips
.get(username) .get(username)
.map(|ips| ips.contains(&ip)) .map(|ips| ips.contains_key(&ip))
.unwrap_or(false) .unwrap_or(false)
} }
@@ -266,6 +312,26 @@ mod tests {
assert_eq!(tracker.get_active_ip_count("test_user").await, 2); assert_eq!(tracker.get_active_ip_count("test_user").await, 2);
} }
#[tokio::test]
async fn test_active_window_rejects_new_ip_and_keeps_existing_session() {
let tracker = UserIpTracker::new();
tracker.set_user_limit("test_user", 1).await;
tracker
.set_limit_policy(UserMaxUniqueIpsMode::ActiveWindow, 30)
.await;
let ip1 = test_ipv4(10, 10, 10, 1);
let ip2 = test_ipv4(10, 10, 10, 2);
assert!(tracker.check_and_add("test_user", ip1).await.is_ok());
assert!(tracker.is_ip_active("test_user", ip1).await);
assert!(tracker.check_and_add("test_user", ip2).await.is_err());
// Existing session remains active; only new unique IP is denied.
assert!(tracker.is_ip_active("test_user", ip1).await);
assert_eq!(tracker.get_active_ip_count("test_user").await, 1);
}
#[tokio::test] #[tokio::test]
async fn test_reconnection_from_same_ip() { async fn test_reconnection_from_same_ip() {
let tracker = UserIpTracker::new(); let tracker = UserIpTracker::new();
@@ -278,6 +344,24 @@ mod tests {
assert_eq!(tracker.get_active_ip_count("test_user").await, 1); assert_eq!(tracker.get_active_ip_count("test_user").await, 1);
} }
#[tokio::test]
async fn test_same_ip_disconnect_keeps_active_while_other_session_alive() {
let tracker = UserIpTracker::new();
tracker.set_user_limit("test_user", 2).await;
let ip1 = test_ipv4(192, 168, 1, 1);
assert!(tracker.check_and_add("test_user", ip1).await.is_ok());
assert!(tracker.check_and_add("test_user", ip1).await.is_ok());
assert_eq!(tracker.get_active_ip_count("test_user").await, 1);
tracker.remove_ip("test_user", ip1).await;
assert_eq!(tracker.get_active_ip_count("test_user").await, 1);
tracker.remove_ip("test_user", ip1).await;
assert_eq!(tracker.get_active_ip_count("test_user").await, 0);
}
#[tokio::test] #[tokio::test]
async fn test_ip_removal() { async fn test_ip_removal() {
let tracker = UserIpTracker::new(); let tracker = UserIpTracker::new();

View File

@@ -4,11 +4,11 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::{Duration, Instant};
use rand::Rng; use rand::Rng;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::signal; use tokio::signal;
use tokio::sync::{Semaphore, mpsc}; use tokio::sync::{Semaphore, mpsc, watch};
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload}; use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload};
#[cfg(unix)] #[cfg(unix)]
@@ -41,8 +41,9 @@ use crate::stats::telemetry::TelemetryPolicy;
use crate::stats::{ReplayChecker, Stats}; use crate::stats::{ReplayChecker, Stats};
use crate::stream::BufferPool; use crate::stream::BufferPool;
use crate::transport::middle_proxy::{ use crate::transport::middle_proxy::{
MePool, fetch_proxy_config, run_me_ping, MePingFamily, MePingSample, MeReinitTrigger, format_sample_line, MePool, ProxyConfigData, fetch_proxy_config_with_raw, format_me_route, format_sample_line,
format_me_route, load_proxy_config_cache, run_me_ping, save_proxy_config_cache, MePingFamily, MePingSample,
MeReinitTrigger,
}; };
use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes}; use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes};
use crate::tls_front::TlsFrontCache; use crate::tls_front::TlsFrontCache;
@@ -172,8 +173,202 @@ async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result
tokio::fs::write(path, payload).await tokio::fs::write(path, payload).await
} }
fn unit_label(value: u64, singular: &'static str, plural: &'static str) -> &'static str {
if value == 1 { singular } else { plural }
}
fn format_uptime(total_secs: u64) -> String {
const SECS_PER_MINUTE: u64 = 60;
const SECS_PER_HOUR: u64 = 60 * SECS_PER_MINUTE;
const SECS_PER_DAY: u64 = 24 * SECS_PER_HOUR;
const SECS_PER_MONTH: u64 = 30 * SECS_PER_DAY;
const SECS_PER_YEAR: u64 = 12 * SECS_PER_MONTH;
let mut remaining = total_secs;
let years = remaining / SECS_PER_YEAR;
remaining %= SECS_PER_YEAR;
let months = remaining / SECS_PER_MONTH;
remaining %= SECS_PER_MONTH;
let days = remaining / SECS_PER_DAY;
remaining %= SECS_PER_DAY;
let hours = remaining / SECS_PER_HOUR;
remaining %= SECS_PER_HOUR;
let minutes = remaining / SECS_PER_MINUTE;
let seconds = remaining % SECS_PER_MINUTE;
let mut parts = Vec::new();
if total_secs > SECS_PER_YEAR {
parts.push(format!(
"{} {}",
years,
unit_label(years, "year", "years")
));
}
if total_secs > SECS_PER_MONTH {
parts.push(format!(
"{} {}",
months,
unit_label(months, "month", "months")
));
}
if total_secs > SECS_PER_DAY {
parts.push(format!(
"{} {}",
days,
unit_label(days, "day", "days")
));
}
if total_secs > SECS_PER_HOUR {
parts.push(format!(
"{} {}",
hours,
unit_label(hours, "hour", "hours")
));
}
if total_secs > SECS_PER_MINUTE {
parts.push(format!(
"{} {}",
minutes,
unit_label(minutes, "minute", "minutes")
));
}
parts.push(format!(
"{} {}",
seconds,
unit_label(seconds, "second", "seconds")
));
format!("{} / {} seconds", parts.join(", "), total_secs)
}
async fn wait_until_admission_open(admission_rx: &mut watch::Receiver<bool>) -> bool {
loop {
if *admission_rx.borrow() {
return true;
}
if admission_rx.changed().await.is_err() {
return *admission_rx.borrow();
}
}
}
async fn load_startup_proxy_config_snapshot(
url: &str,
cache_path: Option<&str>,
me2dc_fallback: bool,
label: &'static str,
) -> Option<ProxyConfigData> {
loop {
match fetch_proxy_config_with_raw(url).await {
Ok((cfg, raw)) => {
if !cfg.map.is_empty() {
if let Some(path) = cache_path
&& let Err(e) = save_proxy_config_cache(path, &raw).await
{
warn!(error = %e, path, snapshot = label, "Failed to store startup proxy-config cache");
}
return Some(cfg);
}
warn!(snapshot = label, url, "Startup proxy-config is empty; trying disk cache");
if let Some(path) = cache_path {
match load_proxy_config_cache(path).await {
Ok(cached) if !cached.map.is_empty() => {
info!(
snapshot = label,
path,
proxy_for_lines = cached.proxy_for_lines,
"Loaded startup proxy-config from disk cache"
);
return Some(cached);
}
Ok(_) => {
warn!(
snapshot = label,
path,
"Startup proxy-config cache is empty; ignoring cache file"
);
}
Err(cache_err) => {
debug!(
snapshot = label,
path,
error = %cache_err,
"Startup proxy-config cache unavailable"
);
}
}
}
if me2dc_fallback {
error!(
snapshot = label,
"Startup proxy-config unavailable and no saved config found; falling back to direct mode"
);
return None;
}
warn!(
snapshot = label,
retry_in_secs = 2,
"Startup proxy-config unavailable and no saved config found; retrying because me2dc_fallback=false"
);
tokio::time::sleep(Duration::from_secs(2)).await;
}
Err(fetch_err) => {
if let Some(path) = cache_path {
match load_proxy_config_cache(path).await {
Ok(cached) if !cached.map.is_empty() => {
info!(
snapshot = label,
path,
proxy_for_lines = cached.proxy_for_lines,
"Loaded startup proxy-config from disk cache"
);
return Some(cached);
}
Ok(_) => {
warn!(
snapshot = label,
path,
"Startup proxy-config cache is empty; ignoring cache file"
);
}
Err(cache_err) => {
debug!(
snapshot = label,
path,
error = %cache_err,
"Startup proxy-config cache unavailable"
);
}
}
}
if me2dc_fallback {
error!(
snapshot = label,
error = %fetch_err,
"Startup proxy-config unavailable and no cached data; falling back to direct mode"
);
return None;
}
warn!(
snapshot = label,
error = %fetch_err,
retry_in_secs = 2,
"Startup proxy-config unavailable; retrying because me2dc_fallback=false"
);
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
}
#[tokio::main] #[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> { async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let process_started_at = Instant::now();
let (config_path, cli_silent, cli_log_level) = parse_cli(); let (config_path, cli_silent, cli_log_level) = parse_cli();
let mut config = match ProxyConfig::load(&config_path) { let mut config = match ProxyConfig::load(&config_path) {
@@ -445,6 +640,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me2dc_fallback = config.general.me2dc_fallback; let me2dc_fallback = config.general.me2dc_fallback;
let me_init_retry_attempts = config.general.me_init_retry_attempts; let me_init_retry_attempts = config.general.me_init_retry_attempts;
let me_init_warn_after_attempts: u32 = 3;
if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me { if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me {
if me2dc_fallback { if me2dc_fallback {
warn!("No usable IP family for Middle Proxy detected; falling back to direct DC"); warn!("No usable IP family for Middle Proxy detected; falling back to direct DC");
@@ -484,189 +680,199 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
// ============================================================= // =============================================================
let proxy_secret_path = config.general.proxy_secret_path.as_deref(); let proxy_secret_path = config.general.proxy_secret_path.as_deref();
let pool_size = config.general.middle_proxy_pool_size.max(1); let pool_size = config.general.middle_proxy_pool_size.max(1);
let mut init_attempt: u32 = 0; let proxy_secret = loop {
loop { match crate::transport::middle_proxy::fetch_proxy_secret(
init_attempt = init_attempt.saturating_add(1);
let proxy_secret = match crate::transport::middle_proxy::fetch_proxy_secret(
proxy_secret_path, proxy_secret_path,
config.general.proxy_secret_len_max, config.general.proxy_secret_len_max,
) )
.await .await
{ {
Ok(proxy_secret) => proxy_secret, Ok(proxy_secret) => break Some(proxy_secret),
Err(e) => { Err(e) => {
let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; if me2dc_fallback {
if retries_limited && init_attempt >= me_init_retry_attempts {
error!( error!(
error = %e, error = %e,
attempt = init_attempt, "ME startup failed: proxy-secret is unavailable and no saved secret found; falling back to direct mode"
retry_limit = me_init_retry_attempts,
"ME startup retries exhausted while loading proxy-secret; falling back to direct mode"
); );
break None; break None;
} }
warn!( warn!(
error = %e, error = %e,
attempt = init_attempt,
retry_limit = if me_init_retry_attempts == 0 {
String::from("unlimited")
} else {
me_init_retry_attempts.to_string()
},
me2dc_fallback = me2dc_fallback,
retry_in_secs = 2, retry_in_secs = 2,
"Failed to fetch proxy-secret; retrying ME startup" "ME startup failed: proxy-secret is unavailable and no saved secret found; retrying because me2dc_fallback=false"
); );
tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
};
info!(
secret_len = proxy_secret.len(),
key_sig = format_args!(
"0x{:08x}",
if proxy_secret.len() >= 4 {
u32::from_le_bytes([
proxy_secret[0],
proxy_secret[1],
proxy_secret[2],
proxy_secret[3],
])
} else {
0
}
),
"Proxy-secret loaded"
);
// Load ME config (v4/v6) + default DC
let mut cfg_v4 = fetch_proxy_config(
"https://core.telegram.org/getProxyConfig",
)
.await
.unwrap_or_default();
let mut cfg_v6 = fetch_proxy_config(
"https://core.telegram.org/getProxyConfigV6",
)
.await
.unwrap_or_default();
if cfg_v4.map.is_empty() {
cfg_v4.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V4.clone();
}
if cfg_v6.map.is_empty() {
cfg_v6.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V6.clone();
}
let pool = MePool::new(
proxy_tag.clone(),
proxy_secret,
config.general.middle_proxy_nat_ip,
me_nat_probe,
None,
config.network.stun_servers.clone(),
config.general.stun_nat_probe_concurrency,
probe.detected_ipv6,
config.timeouts.me_one_retry,
config.timeouts.me_one_timeout_ms,
cfg_v4.map.clone(),
cfg_v6.map.clone(),
cfg_v4.default_dc.or(cfg_v6.default_dc),
decision.clone(),
Some(upstream_manager.clone()),
rng.clone(),
stats.clone(),
config.general.me_keepalive_enabled,
config.general.me_keepalive_interval_secs,
config.general.me_keepalive_jitter_secs,
config.general.me_keepalive_payload_random,
config.general.rpc_proxy_req_every,
config.general.me_warmup_stagger_enabled,
config.general.me_warmup_step_delay_ms,
config.general.me_warmup_step_jitter_ms,
config.general.me_reconnect_max_concurrent_per_dc,
config.general.me_reconnect_backoff_base_ms,
config.general.me_reconnect_backoff_cap_ms,
config.general.me_reconnect_fast_retry_count,
config.general.me_single_endpoint_shadow_writers,
config.general.me_single_endpoint_outage_mode_enabled,
config.general.me_single_endpoint_outage_disable_quarantine,
config.general.me_single_endpoint_outage_backoff_min_ms,
config.general.me_single_endpoint_outage_backoff_max_ms,
config.general.me_single_endpoint_shadow_rotate_every_secs,
config.general.me_floor_mode,
config.general.me_adaptive_floor_idle_secs,
config.general.me_adaptive_floor_min_writers_single_endpoint,
config.general.me_adaptive_floor_recover_grace_secs,
config.general.hardswap,
config.general.me_pool_drain_ttl_secs,
config.general.effective_me_pool_force_close_secs(),
config.general.me_pool_min_fresh_ratio,
config.general.me_hardswap_warmup_delay_min_ms,
config.general.me_hardswap_warmup_delay_max_ms,
config.general.me_hardswap_warmup_extra_passes,
config.general.me_hardswap_warmup_pass_backoff_base_ms,
config.general.me_bind_stale_mode,
config.general.me_bind_stale_ttl_secs,
config.general.me_secret_atomic_snapshot,
config.general.me_deterministic_writer_sort,
config.general.me_socks_kdf_policy,
config.general.me_route_backpressure_base_timeout_ms,
config.general.me_route_backpressure_high_timeout_ms,
config.general.me_route_backpressure_high_watermark_pct,
);
match pool.init(pool_size, &rng).await {
Ok(()) => {
info!(
attempt = init_attempt,
"Middle-End pool initialized successfully"
);
// Phase 4: Start health monitor
let pool_clone = pool.clone();
let rng_clone = rng.clone();
let min_conns = pool_size;
tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
pool_clone, rng_clone, min_conns,
)
.await;
});
break Some(pool);
}
Err(e) => {
let retries_limited = me2dc_fallback && me_init_retry_attempts > 0;
if retries_limited && init_attempt >= me_init_retry_attempts {
error!(
error = %e,
attempt = init_attempt,
retry_limit = me_init_retry_attempts,
"ME pool init retries exhausted; falling back to direct mode"
);
break None;
}
warn!(
error = %e,
attempt = init_attempt,
retry_limit = if me_init_retry_attempts == 0 {
String::from("unlimited")
} else {
me_init_retry_attempts.to_string()
},
me2dc_fallback = me2dc_fallback,
retry_in_secs = 2,
"ME pool is not ready yet; retrying startup initialization"
);
pool.reset_stun_state();
tokio::time::sleep(Duration::from_secs(2)).await;
} }
} }
};
match proxy_secret {
Some(proxy_secret) => {
info!(
secret_len = proxy_secret.len(),
key_sig = format_args!(
"0x{:08x}",
if proxy_secret.len() >= 4 {
u32::from_le_bytes([
proxy_secret[0],
proxy_secret[1],
proxy_secret[2],
proxy_secret[3],
])
} else {
0
}
),
"Proxy-secret loaded"
);
let cfg_v4 = load_startup_proxy_config_snapshot(
"https://core.telegram.org/getProxyConfig",
config.general.proxy_config_v4_cache_path.as_deref(),
me2dc_fallback,
"getProxyConfig",
)
.await;
let cfg_v6 = load_startup_proxy_config_snapshot(
"https://core.telegram.org/getProxyConfigV6",
config.general.proxy_config_v6_cache_path.as_deref(),
me2dc_fallback,
"getProxyConfigV6",
)
.await;
if let (Some(cfg_v4), Some(cfg_v6)) = (cfg_v4, cfg_v6) {
let pool = MePool::new(
proxy_tag.clone(),
proxy_secret,
config.general.middle_proxy_nat_ip,
me_nat_probe,
None,
config.network.stun_servers.clone(),
config.general.stun_nat_probe_concurrency,
probe.detected_ipv6,
config.timeouts.me_one_retry,
config.timeouts.me_one_timeout_ms,
cfg_v4.map.clone(),
cfg_v6.map.clone(),
cfg_v4.default_dc.or(cfg_v6.default_dc),
decision.clone(),
Some(upstream_manager.clone()),
rng.clone(),
stats.clone(),
config.general.me_keepalive_enabled,
config.general.me_keepalive_interval_secs,
config.general.me_keepalive_jitter_secs,
config.general.me_keepalive_payload_random,
config.general.rpc_proxy_req_every,
config.general.me_warmup_stagger_enabled,
config.general.me_warmup_step_delay_ms,
config.general.me_warmup_step_jitter_ms,
config.general.me_reconnect_max_concurrent_per_dc,
config.general.me_reconnect_backoff_base_ms,
config.general.me_reconnect_backoff_cap_ms,
config.general.me_reconnect_fast_retry_count,
config.general.me_single_endpoint_shadow_writers,
config.general.me_single_endpoint_outage_mode_enabled,
config.general.me_single_endpoint_outage_disable_quarantine,
config.general.me_single_endpoint_outage_backoff_min_ms,
config.general.me_single_endpoint_outage_backoff_max_ms,
config.general.me_single_endpoint_shadow_rotate_every_secs,
config.general.me_floor_mode,
config.general.me_adaptive_floor_idle_secs,
config.general.me_adaptive_floor_min_writers_single_endpoint,
config.general.me_adaptive_floor_recover_grace_secs,
config.general.hardswap,
config.general.me_pool_drain_ttl_secs,
config.general.effective_me_pool_force_close_secs(),
config.general.me_pool_min_fresh_ratio,
config.general.me_hardswap_warmup_delay_min_ms,
config.general.me_hardswap_warmup_delay_max_ms,
config.general.me_hardswap_warmup_extra_passes,
config.general.me_hardswap_warmup_pass_backoff_base_ms,
config.general.me_bind_stale_mode,
config.general.me_bind_stale_ttl_secs,
config.general.me_secret_atomic_snapshot,
config.general.me_deterministic_writer_sort,
config.general.me_socks_kdf_policy,
config.general.me_route_backpressure_base_timeout_ms,
config.general.me_route_backpressure_high_timeout_ms,
config.general.me_route_backpressure_high_watermark_pct,
config.general.me_route_no_writer_mode,
config.general.me_route_no_writer_wait_ms,
config.general.me_route_inline_recovery_attempts,
config.general.me_route_inline_recovery_wait_ms,
);
let mut init_attempt: u32 = 0;
loop {
init_attempt = init_attempt.saturating_add(1);
match pool.init(pool_size, &rng).await {
Ok(()) => {
info!(
attempt = init_attempt,
"Middle-End pool initialized successfully"
);
// Phase 4: Start health monitor
let pool_clone = pool.clone();
let rng_clone = rng.clone();
let min_conns = pool_size;
tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
pool_clone, rng_clone, min_conns,
)
.await;
});
break Some(pool);
}
Err(e) => {
let retries_limited = me2dc_fallback && me_init_retry_attempts > 0;
if retries_limited && init_attempt >= me_init_retry_attempts {
error!(
error = %e,
attempt = init_attempt,
retry_limit = me_init_retry_attempts,
"ME pool init retries exhausted; falling back to direct mode"
);
break None;
}
let retry_limit = if !me2dc_fallback || me_init_retry_attempts == 0 {
String::from("unlimited")
} else {
me_init_retry_attempts.to_string()
};
if init_attempt >= me_init_warn_after_attempts {
warn!(
error = %e,
attempt = init_attempt,
retry_limit = retry_limit,
me2dc_fallback = me2dc_fallback,
retry_in_secs = 2,
"ME pool is not ready yet; retrying startup initialization"
);
} else {
info!(
error = %e,
attempt = init_attempt,
retry_limit = retry_limit,
me2dc_fallback = me2dc_fallback,
retry_in_secs = 2,
"ME pool startup warmup: retrying initialization"
);
}
pool.reset_stun_state();
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
} else {
None
}
}
None => None,
} }
} else { } else {
None None
@@ -847,6 +1053,19 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
} }
} }
let initialized_secs = process_started_at.elapsed().as_secs();
let second_suffix = if initialized_secs == 1 { "" } else { "s" };
info!("===================== Telegram Startup =====================");
info!(
" DC/ME Initialized in {} second{}",
initialized_secs, second_suffix
);
info!("============================================================");
if let Some(ref pool) = me_pool {
pool.set_runtime_ready(true);
}
// Background tasks // Background tasks
let um_clone = upstream_manager.clone(); let um_clone = upstream_manager.clone();
let decision_clone = decision.clone(); let decision_clone = decision.clone();
@@ -1117,6 +1336,60 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
print_proxy_links(&host, port, &config); print_proxy_links(&host, port, &config);
} }
let (admission_tx, admission_rx) = watch::channel(true);
if config.general.use_middle_proxy {
if let Some(pool) = me_pool.as_ref() {
let initial_open = pool.admission_ready_conditional_cast().await;
admission_tx.send_replace(initial_open);
if initial_open {
info!("Conditional-admission gate: open (ME pool ready)");
} else {
warn!("Conditional-admission gate: closed (ME pool is not ready)");
}
let pool_for_gate = pool.clone();
let admission_tx_gate = admission_tx.clone();
tokio::spawn(async move {
let mut gate_open = initial_open;
let mut open_streak = if initial_open { 1u32 } else { 0u32 };
let mut close_streak = if initial_open { 0u32 } else { 1u32 };
loop {
let ready = pool_for_gate.admission_ready_conditional_cast().await;
if ready {
open_streak = open_streak.saturating_add(1);
close_streak = 0;
if !gate_open && open_streak >= 2 {
gate_open = true;
admission_tx_gate.send_replace(true);
info!(
open_streak,
"Conditional-admission gate opened (ME pool recovered)"
);
}
} else {
close_streak = close_streak.saturating_add(1);
open_streak = 0;
if gate_open && close_streak >= 2 {
gate_open = false;
admission_tx_gate.send_replace(false);
warn!(
close_streak,
"Conditional-admission gate closed (ME pool has uncovered DC groups)"
);
}
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
});
} else {
admission_tx.send_replace(false);
warn!("Conditional-admission gate: closed (ME pool is unavailable)");
}
} else {
admission_tx.send_replace(true);
}
let _admission_tx_hold = admission_tx;
// Unix socket setup (before listeners check so unix-only config works) // Unix socket setup (before listeners check so unix-only config works)
let mut has_unix_listener = false; let mut has_unix_listener = false;
#[cfg(unix)] #[cfg(unix)]
@@ -1150,6 +1423,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
has_unix_listener = true; has_unix_listener = true;
let mut config_rx_unix: tokio::sync::watch::Receiver<Arc<ProxyConfig>> = config_rx.clone(); let mut config_rx_unix: tokio::sync::watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
let mut admission_rx_unix = admission_rx.clone();
let stats = stats.clone(); let stats = stats.clone();
let upstream_manager = upstream_manager.clone(); let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone(); let replay_checker = replay_checker.clone();
@@ -1165,6 +1439,10 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let unix_conn_counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)); let unix_conn_counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1));
loop { loop {
if !wait_until_admission_open(&mut admission_rx_unix).await {
warn!("Conditional-admission gate channel closed for unix listener");
break;
}
match unix_listener.accept().await { match unix_listener.accept().await {
Ok((stream, _)) => { Ok((stream, _)) => {
let permit = match max_connections_unix.clone().acquire_owned().await { let permit = match max_connections_unix.clone().acquire_owned().await {
@@ -1299,6 +1577,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
for (listener, listener_proxy_protocol) in listeners { for (listener, listener_proxy_protocol) in listeners {
let mut config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>> = config_rx.clone(); let mut config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
let mut admission_rx_tcp = admission_rx.clone();
let stats = stats.clone(); let stats = stats.clone();
let upstream_manager = upstream_manager.clone(); let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone(); let replay_checker = replay_checker.clone();
@@ -1312,6 +1591,10 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
if !wait_until_admission_open(&mut admission_rx_tcp).await {
warn!("Conditional-admission gate channel closed for tcp listener");
break;
}
match listener.accept().await { match listener.accept().await {
Ok((stream, peer_addr)) => { Ok((stream, peer_addr)) => {
let permit = match max_connections_tcp.clone().acquire_owned().await { let permit = match max_connections_tcp.clone().acquire_owned().await {
@@ -1400,7 +1683,36 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
} }
match signal::ctrl_c().await { match signal::ctrl_c().await {
Ok(()) => info!("Shutting down..."), Ok(()) => {
let shutdown_started_at = Instant::now();
info!("Shutting down...");
let uptime_secs = process_started_at.elapsed().as_secs();
info!("Uptime: {}", format_uptime(uptime_secs));
if let Some(pool) = &me_pool {
match tokio::time::timeout(
Duration::from_secs(2),
pool.shutdown_send_close_conn_all(),
)
.await
{
Ok(total) => {
info!(
close_conn_sent = total,
"ME shutdown: RPC_CLOSE_CONN broadcast completed"
);
}
Err(_) => {
warn!("ME shutdown: RPC_CLOSE_CONN broadcast timed out");
}
}
}
let shutdown_secs = shutdown_started_at.elapsed().as_secs();
info!(
"Shutdown completed successfully in {} {}.",
shutdown_secs,
unit_label(shutdown_secs, "second", "seconds")
);
}
Err(e) => error!("Signal error: {}", e), Err(e) => error!("Signal error: {}", e),
} }

View File

@@ -1199,6 +1199,48 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
0 0
} }
); );
let _ = writeln!(
out,
"# HELP telemt_me_no_writer_failfast_total ME route failfast errors due to missing writer in bounded wait window"
);
let _ = writeln!(out, "# TYPE telemt_me_no_writer_failfast_total counter");
let _ = writeln!(
out,
"telemt_me_no_writer_failfast_total {}",
if me_allows_normal {
stats.get_me_no_writer_failfast_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_async_recovery_trigger_total Async ME recovery trigger attempts from route path"
);
let _ = writeln!(out, "# TYPE telemt_me_async_recovery_trigger_total counter");
let _ = writeln!(
out,
"telemt_me_async_recovery_trigger_total {}",
if me_allows_normal {
stats.get_me_async_recovery_trigger_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_inline_recovery_total Legacy inline ME recovery attempts from route path"
);
let _ = writeln!(out, "# TYPE telemt_me_inline_recovery_total counter");
let _ = writeln!(
out,
"telemt_me_inline_recovery_total {}",
if me_allows_normal {
stats.get_me_inline_recovery_total()
} else {
0
}
);
let unresolved_writer_losses = if me_allows_normal { let unresolved_writer_losses = if me_allows_normal {
stats stats
@@ -1237,6 +1279,29 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
let _ = writeln!(out, "# TYPE telemt_user_msgs_from_client counter"); let _ = writeln!(out, "# TYPE telemt_user_msgs_from_client counter");
let _ = writeln!(out, "# HELP telemt_user_msgs_to_client Per-user messages sent"); let _ = writeln!(out, "# HELP telemt_user_msgs_to_client Per-user messages sent");
let _ = writeln!(out, "# TYPE telemt_user_msgs_to_client counter"); let _ = writeln!(out, "# TYPE telemt_user_msgs_to_client counter");
let _ = writeln!(
out,
"# HELP telemt_ip_reservation_rollback_total IP reservation rollbacks caused by later limit checks"
);
let _ = writeln!(out, "# TYPE telemt_ip_reservation_rollback_total counter");
let _ = writeln!(
out,
"telemt_ip_reservation_rollback_total{{reason=\"tcp_limit\"}} {}",
if core_enabled {
stats.get_ip_reservation_rollback_tcp_limit_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_ip_reservation_rollback_total{{reason=\"quota_limit\"}} {}",
if core_enabled {
stats.get_ip_reservation_rollback_quota_limit_total()
} else {
0
}
);
let _ = writeln!( let _ = writeln!(
out, out,
"# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag" "# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag"
@@ -1267,11 +1332,21 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
.collect(); .collect();
let mut unique_users = BTreeSet::new(); let mut unique_users = BTreeSet::new();
unique_users.extend(config.access.users.keys().cloned());
unique_users.extend(config.access.user_max_unique_ips.keys().cloned()); unique_users.extend(config.access.user_max_unique_ips.keys().cloned());
unique_users.extend(ip_counts.keys().cloned()); unique_users.extend(ip_counts.keys().cloned());
let unique_users_vec: Vec<String> = unique_users.iter().cloned().collect();
let recent_counts = ip_tracker
.get_recent_counts_for_users(&unique_users_vec)
.await;
let _ = writeln!(out, "# HELP telemt_user_unique_ips_current Per-user current number of unique active IPs"); let _ = writeln!(out, "# HELP telemt_user_unique_ips_current Per-user current number of unique active IPs");
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_current gauge"); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_current gauge");
let _ = writeln!(
out,
"# HELP telemt_user_unique_ips_recent_window Per-user unique IPs seen in configured observation window"
);
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_recent_window gauge");
let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)"); let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)");
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge"); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge");
let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)"); let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)");
@@ -1286,6 +1361,12 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
0.0 0.0
}; };
let _ = writeln!(out, "telemt_user_unique_ips_current{{user=\"{}\"}} {}", user, current); let _ = writeln!(out, "telemt_user_unique_ips_current{{user=\"{}\"}} {}", user, current);
let _ = writeln!(
out,
"telemt_user_unique_ips_recent_window{{user=\"{}\"}} {}",
user,
recent_counts.get(&user).copied().unwrap_or(0)
);
let _ = writeln!(out, "telemt_user_unique_ips_limit{{user=\"{}\"}} {}", user, limit); let _ = writeln!(out, "telemt_user_unique_ips_limit{{user=\"{}\"}} {}", user, limit);
let _ = writeln!( let _ = writeln!(
out, out,
@@ -1378,6 +1459,7 @@ mod tests {
assert!(output.contains("telemt_user_msgs_from_client{user=\"alice\"} 1")); assert!(output.contains("telemt_user_msgs_from_client{user=\"alice\"} 1"));
assert!(output.contains("telemt_user_msgs_to_client{user=\"alice\"} 2")); assert!(output.contains("telemt_user_msgs_to_client{user=\"alice\"} 2"));
assert!(output.contains("telemt_user_unique_ips_current{user=\"alice\"} 1")); assert!(output.contains("telemt_user_unique_ips_current{user=\"alice\"} 1"));
assert!(output.contains("telemt_user_unique_ips_recent_window{user=\"alice\"} 1"));
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 4")); assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 4"));
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.250000")); assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.250000"));
} }
@@ -1391,7 +1473,8 @@ mod tests {
assert!(output.contains("telemt_connections_total 0")); assert!(output.contains("telemt_connections_total 0"));
assert!(output.contains("telemt_connections_bad_total 0")); assert!(output.contains("telemt_connections_bad_total 0"));
assert!(output.contains("telemt_handshake_timeouts_total 0")); assert!(output.contains("telemt_handshake_timeouts_total 0"));
assert!(!output.contains("user=")); assert!(output.contains("telemt_user_unique_ips_current{user="));
assert!(output.contains("telemt_user_unique_ips_recent_window{user="));
} }
#[tokio::test] #[tokio::test]
@@ -1412,6 +1495,7 @@ mod tests {
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge" "# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
)); ));
assert!(output.contains("# TYPE telemt_user_unique_ips_current gauge")); assert!(output.contains("# TYPE telemt_user_unique_ips_current gauge"));
assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge"));
assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge")); assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge"));
assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge")); assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge"));
} }

View File

@@ -672,42 +672,16 @@ impl RunningClientHandler {
R: AsyncRead + Unpin + Send + 'static, R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static,
{ {
let user = &success.user; let user = success.user.clone();
if let Err(e) = Self::check_user_limits_static(user, &config, &stats, peer_addr, &ip_tracker).await { if let Err(e) = Self::check_user_limits_static(&user, &config, &stats, peer_addr, &ip_tracker).await {
warn!(user = %user, error = %e, "User limit exceeded"); warn!(user = %user, error = %e, "User limit exceeded");
return Err(e); return Err(e);
} }
// IP Cleanup Guard: автоматически удаляет IP при выходе из scope let relay_result = if config.general.use_middle_proxy {
struct IpCleanupGuard {
tracker: Arc<UserIpTracker>,
user: String,
ip: std::net::IpAddr,
}
impl Drop for IpCleanupGuard {
fn drop(&mut self) {
let tracker = self.tracker.clone();
let user = self.user.clone();
let ip = self.ip;
tokio::spawn(async move {
tracker.remove_ip(&user, ip).await;
debug!(user = %user, ip = %ip, "IP cleaned up on disconnect");
});
}
}
let _cleanup = IpCleanupGuard {
tracker: ip_tracker,
user: user.clone(),
ip: peer_addr.ip(),
};
// Decide: middle proxy or direct
if config.general.use_middle_proxy {
if let Some(ref pool) = me_pool { if let Some(ref pool) = me_pool {
return handle_via_middle_proxy( handle_via_middle_proxy(
client_reader, client_reader,
client_writer, client_writer,
success, success,
@@ -718,23 +692,38 @@ impl RunningClientHandler {
local_addr, local_addr,
rng, rng,
) )
.await; .await
} else {
warn!("use_middle_proxy=true but MePool not initialized, falling back to direct");
handle_via_direct(
client_reader,
client_writer,
success,
upstream_manager,
stats,
config,
buffer_pool,
rng,
)
.await
} }
warn!("use_middle_proxy=true but MePool not initialized, falling back to direct"); } else {
} // Direct mode (original behavior)
handle_via_direct(
client_reader,
client_writer,
success,
upstream_manager,
stats,
config,
buffer_pool,
rng,
)
.await
};
// Direct mode (original behavior) ip_tracker.remove_ip(&user, peer_addr.ip()).await;
handle_via_direct( relay_result
client_reader,
client_writer,
success,
upstream_manager,
stats,
config,
buffer_pool,
rng,
)
.await
} }
async fn check_user_limits_static( async fn check_user_limits_static(
@@ -752,22 +741,32 @@ impl RunningClientHandler {
}); });
} }
let mut ip_reserved = false;
// IP limit check // IP limit check
if let Err(reason) = ip_tracker.check_and_add(user, peer_addr.ip()).await { match ip_tracker.check_and_add(user, peer_addr.ip()).await {
warn!( Ok(()) => {
user = %user, ip_reserved = true;
ip = %peer_addr.ip(), }
reason = %reason, Err(reason) => {
"IP limit exceeded" warn!(
); user = %user,
return Err(ProxyError::ConnectionLimitExceeded { ip = %peer_addr.ip(),
user: user.to_string(), reason = %reason,
}); "IP limit exceeded"
);
return Err(ProxyError::ConnectionLimitExceeded {
user: user.to_string(),
});
}
} }
if let Some(limit) = config.access.user_max_tcp_conns.get(user) if let Some(limit) = config.access.user_max_tcp_conns.get(user)
&& stats.get_user_curr_connects(user) >= *limit as u64 && stats.get_user_curr_connects(user) >= *limit as u64
{ {
if ip_reserved {
ip_tracker.remove_ip(user, peer_addr.ip()).await;
stats.increment_ip_reservation_rollback_tcp_limit_total();
}
return Err(ProxyError::ConnectionLimitExceeded { return Err(ProxyError::ConnectionLimitExceeded {
user: user.to_string(), user: user.to_string(),
}); });
@@ -776,6 +775,10 @@ impl RunningClientHandler {
if let Some(quota) = config.access.user_data_quota.get(user) if let Some(quota) = config.access.user_data_quota.get(user)
&& stats.get_user_total_octets(user) >= *quota && stats.get_user_total_octets(user) >= *quota
{ {
if ip_reserved {
ip_tracker.remove_ip(user, peer_addr.ip()).await;
stats.increment_ip_reservation_rollback_quota_limit_total();
}
return Err(ProxyError::DataQuotaExceeded { return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(), user: user.to_string(),
}); });

View File

@@ -118,10 +118,16 @@ fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> {
// Unknown DC requested by client without override: log and fall back. // Unknown DC requested by client without override: log and fall back.
if !config.dc_overrides.contains_key(&dc_key) { if !config.dc_overrides.contains_key(&dc_key) {
warn!(dc_idx = dc_idx, "Requested non-standard DC with no override; falling back to default cluster"); warn!(dc_idx = dc_idx, "Requested non-standard DC with no override; falling back to default cluster");
if let Some(path) = &config.general.unknown_dc_log_path if config.general.unknown_dc_file_log_enabled
&& let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) && let Some(path) = &config.general.unknown_dc_log_path
&& let Ok(handle) = tokio::runtime::Handle::try_current()
{ {
let _ = writeln!(file, "dc_idx={dc_idx}"); let path = path.clone();
handle.spawn_blocking(move || {
if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) {
let _ = writeln!(file, "dc_idx={dc_idx}");
}
});
} }
} }

View File

@@ -100,6 +100,11 @@ pub struct Stats {
me_refill_failed_total: AtomicU64, me_refill_failed_total: AtomicU64,
me_writer_restored_same_endpoint_total: AtomicU64, me_writer_restored_same_endpoint_total: AtomicU64,
me_writer_restored_fallback_total: AtomicU64, me_writer_restored_fallback_total: AtomicU64,
me_no_writer_failfast_total: AtomicU64,
me_async_recovery_trigger_total: AtomicU64,
me_inline_recovery_total: AtomicU64,
ip_reservation_rollback_tcp_limit_total: AtomicU64,
ip_reservation_rollback_quota_limit_total: AtomicU64,
telemetry_core_enabled: AtomicBool, telemetry_core_enabled: AtomicBool,
telemetry_user_enabled: AtomicBool, telemetry_user_enabled: AtomicBool,
telemetry_me_level: AtomicU8, telemetry_me_level: AtomicU8,
@@ -522,6 +527,34 @@ impl Stats {
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
} }
} }
pub fn increment_me_no_writer_failfast_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_no_writer_failfast_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_async_recovery_trigger_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_async_recovery_trigger_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_inline_recovery_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_inline_recovery_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_ip_reservation_rollback_tcp_limit_total(&self) {
if self.telemetry_core_enabled() {
self.ip_reservation_rollback_tcp_limit_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_ip_reservation_rollback_quota_limit_total(&self) {
if self.telemetry_core_enabled() {
self.ip_reservation_rollback_quota_limit_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_endpoint_quarantine_total(&self) { pub fn increment_me_endpoint_quarantine_total(&self) {
if self.telemetry_me_allows_normal() { if self.telemetry_me_allows_normal() {
self.me_endpoint_quarantine_total self.me_endpoint_quarantine_total
@@ -791,6 +824,23 @@ impl Stats {
pub fn get_me_writer_restored_fallback_total(&self) -> u64 { pub fn get_me_writer_restored_fallback_total(&self) -> u64 {
self.me_writer_restored_fallback_total.load(Ordering::Relaxed) self.me_writer_restored_fallback_total.load(Ordering::Relaxed)
} }
pub fn get_me_no_writer_failfast_total(&self) -> u64 {
self.me_no_writer_failfast_total.load(Ordering::Relaxed)
}
pub fn get_me_async_recovery_trigger_total(&self) -> u64 {
self.me_async_recovery_trigger_total.load(Ordering::Relaxed)
}
pub fn get_me_inline_recovery_total(&self) -> u64 {
self.me_inline_recovery_total.load(Ordering::Relaxed)
}
pub fn get_ip_reservation_rollback_tcp_limit_total(&self) -> u64 {
self.ip_reservation_rollback_tcp_limit_total
.load(Ordering::Relaxed)
}
pub fn get_ip_reservation_rollback_quota_limit_total(&self) -> u64 {
self.ip_reservation_rollback_quota_limit_total
.load(Ordering::Relaxed)
}
pub fn increment_user_connects(&self, user: &str) { pub fn increment_user_connects(&self, user: &str) {
if !self.telemetry_user_enabled() { if !self.telemetry_user_enabled() {

View File

@@ -1,6 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash, Hasher}; use std::hash::{DefaultHasher, Hash, Hasher};
use std::net::IpAddr; use std::net::IpAddr;
use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -42,6 +43,87 @@ pub struct ProxyConfigData {
pub proxy_for_lines: u32, pub proxy_for_lines: u32,
} }
pub fn parse_proxy_config_text(text: &str, http_status: u16) -> ProxyConfigData {
let mut map: HashMap<i32, Vec<(IpAddr, u16)>> = HashMap::new();
let mut proxy_for_lines: u32 = 0;
for line in text.lines() {
if let Some((dc, ip, port)) = parse_proxy_line(line) {
map.entry(dc).or_default().push((ip, port));
proxy_for_lines = proxy_for_lines.saturating_add(1);
}
}
let default_dc = text.lines().find_map(|l| {
let t = l.trim();
if let Some(rest) = t.strip_prefix("default") {
return rest.trim().trim_end_matches(';').parse::<i32>().ok();
}
None
});
ProxyConfigData {
map,
default_dc,
http_status,
proxy_for_lines,
}
}
pub async fn load_proxy_config_cache(path: &str) -> Result<ProxyConfigData> {
let text = tokio::fs::read_to_string(path).await.map_err(|e| {
crate::error::ProxyError::Proxy(format!("read proxy-config cache '{path}' failed: {e}"))
})?;
Ok(parse_proxy_config_text(&text, 200))
}
pub async fn save_proxy_config_cache(path: &str, raw_text: &str) -> Result<()> {
if let Some(parent) = Path::new(path).parent()
&& !parent.as_os_str().is_empty()
{
tokio::fs::create_dir_all(parent).await.map_err(|e| {
crate::error::ProxyError::Proxy(format!(
"create proxy-config cache dir '{}' failed: {e}",
parent.display()
))
})?;
}
tokio::fs::write(path, raw_text).await.map_err(|e| {
crate::error::ProxyError::Proxy(format!("write proxy-config cache '{path}' failed: {e}"))
})?;
Ok(())
}
pub async fn fetch_proxy_config_with_raw(url: &str) -> Result<(ProxyConfigData, String)> {
let resp = reqwest::get(url)
.await
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))?
;
let http_status = resp.status().as_u16();
if let Some(date) = resp.headers().get(reqwest::header::DATE)
&& let Ok(date_str) = date.to_str()
&& let Ok(server_time) = httpdate::parse_http_date(date_str)
&& let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| {
server_time.duration_since(SystemTime::now()).map_err(|_| e)
})
{
let skew_secs = skew.as_secs();
if skew_secs > 60 {
warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header");
} else if skew_secs > 30 {
warn!(skew_secs, "Time skew >30s detected from fetch_proxy_config Date header");
}
}
let text = resp
.text()
.await
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?;
let parsed = parse_proxy_config_text(&text, http_status);
Ok((parsed, text))
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct StableSnapshot { struct StableSnapshot {
candidate_hash: Option<u64>, candidate_hash: Option<u64>,
@@ -170,61 +252,9 @@ fn parse_proxy_line(line: &str) -> Option<(i32, IpAddr, u16)> {
} }
pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> { pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
let resp = reqwest::get(url) fetch_proxy_config_with_raw(url)
.await .await
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))? .map(|(parsed, _raw)| parsed)
;
let http_status = resp.status().as_u16();
if let Some(date) = resp.headers().get(reqwest::header::DATE)
&& let Ok(date_str) = date.to_str()
&& let Ok(server_time) = httpdate::parse_http_date(date_str)
&& let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| {
server_time.duration_since(SystemTime::now()).map_err(|_| e)
})
{
let skew_secs = skew.as_secs();
if skew_secs > 60 {
warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header");
} else if skew_secs > 30 {
warn!(skew_secs, "Time skew >30s detected from fetch_proxy_config Date header");
}
}
let text = resp
.text()
.await
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?;
let mut map: HashMap<i32, Vec<(IpAddr, u16)>> = HashMap::new();
let mut proxy_for_lines: u32 = 0;
for line in text.lines() {
if let Some((dc, ip, port)) = parse_proxy_line(line) {
map.entry(dc).or_default().push((ip, port));
proxy_for_lines = proxy_for_lines.saturating_add(1);
}
}
let default_dc = text
.lines()
.find_map(|l| {
let t = l.trim();
if let Some(rest) = t.strip_prefix("default") {
return rest
.trim()
.trim_end_matches(';')
.parse::<i32>()
.ok();
}
None
});
Ok(ProxyConfigData {
map,
default_dc,
http_status,
proxy_for_lines,
})
} }
fn snapshot_passes_guards( fn snapshot_passes_guards(

View File

@@ -387,9 +387,11 @@ impl MePool {
socks_bound_addr.map(|value| value.ip()), socks_bound_addr.map(|value| value.ip()),
client_port_source, client_port_source,
); );
let mut kdf_fingerprint_guard = self.kdf_material_fingerprint.lock().await; let previous_kdf_fingerprint = {
if let Some((prev_fingerprint, prev_client_port)) = let kdf_fingerprint_guard = self.kdf_material_fingerprint.read().await;
kdf_fingerprint_guard.get(&peer_addr_nat).copied() kdf_fingerprint_guard.get(&peer_addr_nat).copied()
};
if let Some((prev_fingerprint, prev_client_port)) = previous_kdf_fingerprint
{ {
if prev_fingerprint != kdf_fingerprint { if prev_fingerprint != kdf_fingerprint {
self.stats.increment_me_kdf_drift_total(); self.stats.increment_me_kdf_drift_total();
@@ -416,6 +418,9 @@ impl MePool {
); );
} }
} }
// Keep fingerprint updates eventually consistent for diagnostics while avoiding
// serializing all concurrent handshakes on a single async mutex.
let mut kdf_fingerprint_guard = self.kdf_material_fingerprint.write().await;
kdf_fingerprint_guard.insert(peer_addr_nat, (kdf_fingerprint, client_port_for_kdf)); kdf_fingerprint_guard.insert(peer_addr_nat, (kdf_fingerprint, client_port_for_kdf));
drop(kdf_fingerprint_guard); drop(kdf_fingerprint_guard);

View File

@@ -295,15 +295,27 @@ async fn check_family(
let wait = Duration::from_millis(next_ms) let wait = Duration::from_millis(next_ms)
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
next_attempt.insert(key, now + wait); next_attempt.insert(key, now + wait);
warn!( if pool.is_runtime_ready() {
dc = %dc, warn!(
?family, dc = %dc,
alive = now_alive, ?family,
required, alive = now_alive,
endpoint_count = endpoints.len(), required,
backoff_ms = next_ms, endpoint_count = endpoints.len(),
"DC writer floor is below required level, scheduled reconnect" backoff_ms = next_ms,
); "DC writer floor is below required level, scheduled reconnect"
);
} else {
info!(
dc = %dc,
?family,
alive = now_alive,
required,
endpoint_count = endpoints.len(),
backoff_ms = next_ms,
"DC writer floor is below required level during startup, scheduled reconnect"
);
}
} }
if let Some(v) = inflight.get_mut(&key) { if let Some(v) = inflight.get_mut(&key) {
*v = v.saturating_sub(1); *v = v.saturating_sub(1);

View File

@@ -30,7 +30,11 @@ pub use pool::MePool;
pub use pool_nat::{stun_probe, detect_public_ip}; pub use pool_nat::{stun_probe, detect_public_ip};
pub use registry::ConnRegistry; pub use registry::ConnRegistry;
pub use secret::fetch_proxy_secret; pub use secret::fetch_proxy_secret;
pub use config_updater::{fetch_proxy_config, me_config_updater}; #[allow(unused_imports)]
pub use config_updater::{
ProxyConfigData, fetch_proxy_config, fetch_proxy_config_with_raw, load_proxy_config_cache,
me_config_updater, save_proxy_config_cache,
};
pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task}; pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task};
pub use wire::proto_flags_for_tag; pub use wire::proto_flags_for_tag;

View File

@@ -7,7 +7,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::{Mutex, Notify, RwLock, mpsc}; use tokio::sync::{Mutex, Notify, RwLock, mpsc};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::config::{MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy}; use crate::config::{MeBindStaleMode, MeFloorMode, MeRouteNoWriterMode, MeSocksKdfPolicy};
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::network::IpFamily; use crate::network::IpFamily;
use crate::network::probe::NetworkDecision; use crate::network::probe::NetworkDecision;
@@ -132,7 +132,7 @@ pub struct MePool {
pub(super) pending_hardswap_map_hash: AtomicU64, pub(super) pending_hardswap_map_hash: AtomicU64,
pub(super) hardswap: AtomicBool, pub(super) hardswap: AtomicBool,
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>, pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
pub(super) kdf_material_fingerprint: Arc<Mutex<HashMap<SocketAddr, (u64, u16)>>>, pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>,
pub(super) me_pool_drain_ttl_secs: AtomicU64, pub(super) me_pool_drain_ttl_secs: AtomicU64,
pub(super) me_pool_force_close_secs: AtomicU64, pub(super) me_pool_force_close_secs: AtomicU64,
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32, pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,
@@ -145,6 +145,11 @@ pub struct MePool {
pub(super) secret_atomic_snapshot: AtomicBool, pub(super) secret_atomic_snapshot: AtomicBool,
pub(super) me_deterministic_writer_sort: AtomicBool, pub(super) me_deterministic_writer_sort: AtomicBool,
pub(super) me_socks_kdf_policy: AtomicU8, pub(super) me_socks_kdf_policy: AtomicU8,
pub(super) me_route_no_writer_mode: AtomicU8,
pub(super) me_route_no_writer_wait: Duration,
pub(super) me_route_inline_recovery_attempts: u32,
pub(super) me_route_inline_recovery_wait: Duration,
pub(super) runtime_ready: AtomicBool,
pool_size: usize, pool_size: usize,
} }
@@ -227,6 +232,10 @@ impl MePool {
me_route_backpressure_base_timeout_ms: u64, me_route_backpressure_base_timeout_ms: u64,
me_route_backpressure_high_timeout_ms: u64, me_route_backpressure_high_timeout_ms: u64,
me_route_backpressure_high_watermark_pct: u8, me_route_backpressure_high_watermark_pct: u8,
me_route_no_writer_mode: MeRouteNoWriterMode,
me_route_no_writer_wait_ms: u64,
me_route_inline_recovery_attempts: u32,
me_route_inline_recovery_wait_ms: u64,
) -> Arc<Self> { ) -> Arc<Self> {
let registry = Arc::new(ConnRegistry::new()); let registry = Arc::new(ConnRegistry::new());
registry.update_route_backpressure_policy( registry.update_route_backpressure_policy(
@@ -326,7 +335,7 @@ impl MePool {
pending_hardswap_map_hash: AtomicU64::new(0), pending_hardswap_map_hash: AtomicU64::new(0),
hardswap: AtomicBool::new(hardswap), hardswap: AtomicBool::new(hardswap),
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
kdf_material_fingerprint: Arc::new(Mutex::new(HashMap::new())), kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())),
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs), me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs), me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs),
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille( me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(
@@ -343,6 +352,11 @@ impl MePool {
secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot), secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot),
me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort), me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort),
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()),
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
me_route_inline_recovery_attempts,
me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms),
runtime_ready: AtomicBool::new(false),
}) })
} }
@@ -350,6 +364,14 @@ impl MePool {
self.active_generation.load(Ordering::Relaxed) self.active_generation.load(Ordering::Relaxed)
} }
pub fn set_runtime_ready(&self, ready: bool) {
self.runtime_ready.store(ready, Ordering::Relaxed);
}
pub fn is_runtime_ready(&self) -> bool {
self.runtime_ready.load(Ordering::Relaxed)
}
pub fn update_runtime_reinit_policy( pub fn update_runtime_reinit_policy(
&self, &self,
hardswap: bool, hardswap: bool,

View File

@@ -14,10 +14,12 @@ use super::pool::MePool;
impl MePool { impl MePool {
pub async fn init(self: &Arc<Self>, pool_size: usize, rng: &Arc<SecureRandom>) -> Result<()> { pub async fn init(self: &Arc<Self>, pool_size: usize, rng: &Arc<SecureRandom>) -> Result<()> {
let family_order = self.family_order(); let family_order = self.family_order();
let connect_concurrency = self.me_reconnect_max_concurrent_per_dc.max(1) as usize;
let ks = self.key_selector().await; let ks = self.key_selector().await;
info!( info!(
me_servers = self.proxy_map_v4.read().await.len(), me_servers = self.proxy_map_v4.read().await.len(),
pool_size, pool_size,
connect_concurrency,
key_selector = format_args!("0x{ks:08x}"), key_selector = format_args!("0x{ks:08x}"),
secret_len = self.proxy_secret.read().await.secret.len(), secret_len = self.proxy_secret.read().await.secret.len(),
"Initializing ME pool" "Initializing ME pool"
@@ -41,23 +43,39 @@ impl MePool {
}) })
.collect(); .collect();
dc_addrs.sort_unstable_by_key(|(dc, _)| *dc); dc_addrs.sort_unstable_by_key(|(dc, _)| *dc);
dc_addrs.sort_by_key(|(_, addrs)| (addrs.len() != 1, addrs.len()));
// Ensure at least one live writer per DC group; run missing DCs in parallel. // Stage 1: build base coverage for conditional-cast.
// Single-endpoint DCs are prefilled first; multi-endpoint DCs require one live writer.
let mut join = tokio::task::JoinSet::new(); let mut join = tokio::task::JoinSet::new();
for (dc, addrs) in dc_addrs.iter().cloned() { for (dc, addrs) in dc_addrs.iter().cloned() {
if addrs.is_empty() { if addrs.is_empty() {
continue; continue;
} }
let target_writers = if addrs.len() == 1 {
self.required_writers_for_dc_with_floor_mode(addrs.len(), false)
} else {
1usize
};
let endpoints: HashSet<SocketAddr> = addrs let endpoints: HashSet<SocketAddr> = addrs
.iter() .iter()
.map(|(ip, port)| SocketAddr::new(*ip, *port)) .map(|(ip, port)| SocketAddr::new(*ip, *port))
.collect(); .collect();
if self.active_writer_count_for_endpoints(&endpoints).await > 0 { if self.active_writer_count_for_endpoints(&endpoints).await >= target_writers {
continue; continue;
} }
let pool = Arc::clone(self); let pool = Arc::clone(self);
let rng_clone = Arc::clone(rng); let rng_clone = Arc::clone(rng);
join.spawn(async move { pool.connect_primary_for_dc(dc, addrs, rng_clone).await }); join.spawn(async move {
pool.connect_primary_for_dc(
dc,
addrs,
target_writers,
rng_clone,
connect_concurrency,
)
.await
});
} }
while join.join_next().await.is_some() {} while join.join_next().await.is_some() {}
@@ -77,47 +95,35 @@ impl MePool {
))); )));
} }
// Warm reserve writers asynchronously so startup does not block after first working pool is ready. // Stage 2: continue saturating multi-endpoint DC groups in background.
let pool = Arc::clone(self); let pool = Arc::clone(self);
let rng_clone = Arc::clone(rng); let rng_clone = Arc::clone(rng);
let dc_addrs_bg = dc_addrs.clone(); let dc_addrs_bg = dc_addrs.clone();
tokio::spawn(async move { tokio::spawn(async move {
if pool.me_warmup_stagger_enabled { let mut join_bg = tokio::task::JoinSet::new();
for (dc, addrs) in &dc_addrs_bg { for (dc, addrs) in dc_addrs_bg {
for (ip, port) in addrs { if addrs.len() <= 1 {
if pool.connection_count() >= pool_size { continue;
break;
}
let addr = SocketAddr::new(*ip, *port);
let jitter = rand::rng()
.random_range(0..=pool.me_warmup_step_jitter.as_millis() as u64);
let delay_ms = pool.me_warmup_step_delay.as_millis() as u64 + jitter;
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
if let Err(e) = pool.connect_one(addr, rng_clone.as_ref()).await {
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed (staggered)");
}
}
}
} else {
for (dc, addrs) in &dc_addrs_bg {
for (ip, port) in addrs {
if pool.connection_count() >= pool_size {
break;
}
let addr = SocketAddr::new(*ip, *port);
if let Err(e) = pool.connect_one(addr, rng_clone.as_ref()).await {
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed");
}
}
if pool.connection_count() >= pool_size {
break;
}
} }
let target_writers = pool.required_writers_for_dc_with_floor_mode(addrs.len(), false);
let pool_clone = Arc::clone(&pool);
let rng_clone_local = Arc::clone(&rng_clone);
join_bg.spawn(async move {
pool_clone
.connect_primary_for_dc(
dc,
addrs,
target_writers,
rng_clone_local,
connect_concurrency,
)
.await
});
} }
while join_bg.join_next().await.is_some() {}
debug!( debug!(
target_pool_size = pool_size,
current_pool_size = pool.connection_count(), current_pool_size = pool.connection_count(),
"Background ME reserve warmup finished" "Background ME saturation warmup finished"
); );
}); });
@@ -140,62 +146,85 @@ impl MePool {
self: Arc<Self>, self: Arc<Self>,
dc: i32, dc: i32,
mut addrs: Vec<(IpAddr, u16)>, mut addrs: Vec<(IpAddr, u16)>,
target_writers: usize,
rng: Arc<SecureRandom>, rng: Arc<SecureRandom>,
connect_concurrency: usize,
) -> bool { ) -> bool {
if addrs.is_empty() { if addrs.is_empty() {
return false; return false;
} }
let target_writers = target_writers.max(1);
addrs.shuffle(&mut rand::rng()); addrs.shuffle(&mut rand::rng());
if addrs.len() > 1 { let endpoints: Vec<SocketAddr> = addrs
let concurrency = 2usize; .iter()
.map(|(ip, port)| SocketAddr::new(*ip, *port))
.collect();
let endpoint_set: HashSet<SocketAddr> = endpoints.iter().copied().collect();
loop {
let alive = self.active_writer_count_for_endpoints(&endpoint_set).await;
if alive >= target_writers {
info!(
dc = %dc,
alive,
target_writers,
"ME connected"
);
return true;
}
let missing = target_writers.saturating_sub(alive).max(1);
let concurrency = connect_concurrency.max(1).min(missing);
let mut join = tokio::task::JoinSet::new(); let mut join = tokio::task::JoinSet::new();
let mut next_idx = 0usize; for _ in 0..concurrency {
let pool = Arc::clone(&self);
let rng_clone = Arc::clone(&rng);
let endpoints_clone = endpoints.clone();
join.spawn(async move {
pool.connect_endpoints_round_robin(&endpoints_clone, rng_clone.as_ref())
.await
});
}
while next_idx < addrs.len() || !join.is_empty() { let mut progress = false;
while next_idx < addrs.len() && join.len() < concurrency { while let Some(res) = join.join_next().await {
let (ip, port) = addrs[next_idx];
next_idx += 1;
let addr = SocketAddr::new(ip, port);
let pool = Arc::clone(&self);
let rng_clone = Arc::clone(&rng);
join.spawn(async move {
(addr, pool.connect_one(addr, rng_clone.as_ref()).await)
});
}
let Some(res) = join.join_next().await else {
break;
};
match res { match res {
Ok((addr, Ok(()))) => { Ok(true) => {
info!(%addr, dc = %dc, "ME connected"); progress = true;
join.abort_all();
while join.join_next().await.is_some() {}
return true;
}
Ok((addr, Err(e))) => {
warn!(%addr, dc = %dc, error = %e, "ME connect failed, trying next");
} }
Ok(false) => {}
Err(e) => { Err(e) => {
warn!(dc = %dc, error = %e, "ME connect task failed"); warn!(dc = %dc, error = %e, "ME connect task failed");
} }
} }
} }
warn!(dc = %dc, "All ME servers for DC failed at init");
return false;
}
for (ip, port) in addrs { let alive_after = self.active_writer_count_for_endpoints(&endpoint_set).await;
let addr = SocketAddr::new(ip, port); if alive_after >= target_writers {
match self.connect_one(addr, rng.as_ref()).await { info!(
Ok(()) => { dc = %dc,
info!(%addr, dc = %dc, "ME connected"); alive = alive_after,
return true; target_writers,
} "ME connected"
Err(e) => warn!(%addr, dc = %dc, error = %e, "ME connect failed, trying next"), );
return true;
}
if !progress {
warn!(
dc = %dc,
alive = alive_after,
target_writers,
"All ME servers for DC failed at init"
);
return false;
}
if self.me_warmup_stagger_enabled {
let jitter = rand::rng()
.random_range(0..=self.me_warmup_step_jitter.as_millis() as u64);
let delay_ms = self.me_warmup_step_delay.as_millis() as u64 + jitter;
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
} }
} }
warn!(dc = %dc, "All ME servers for DC failed at init");
false
} }
} }

View File

@@ -100,6 +100,134 @@ pub(crate) struct MeApiRuntimeSnapshot {
} }
impl MePool { impl MePool {
pub(crate) async fn admission_ready_conditional_cast(&self) -> bool {
let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new();
if self.decision.ipv4_me {
let map = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map {
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
}
if self.decision.ipv6_me {
let map = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map {
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
}
if endpoints_by_dc.is_empty() {
return false;
}
let writers = self.writers.read().await.clone();
let mut live_writers_by_endpoint = HashMap::<SocketAddr, usize>::new();
for writer in writers {
if writer.draining.load(Ordering::Relaxed) {
continue;
}
*live_writers_by_endpoint.entry(writer.addr).or_insert(0) += 1;
}
for endpoints in endpoints_by_dc.values() {
let alive: usize = endpoints
.iter()
.map(|endpoint| live_writers_by_endpoint.get(endpoint).copied().unwrap_or(0))
.sum();
if alive == 0 {
return false;
}
}
true
}
#[allow(dead_code)]
pub(crate) async fn admission_ready_full_floor(&self) -> bool {
let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new();
if self.decision.ipv4_me {
let map = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map {
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
}
if self.decision.ipv6_me {
let map = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map {
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
}
if endpoints_by_dc.is_empty() {
return false;
}
let writers = self.writers.read().await.clone();
let mut live_writers_by_endpoint = HashMap::<SocketAddr, usize>::new();
for writer in writers {
if writer.draining.load(Ordering::Relaxed) {
continue;
}
*live_writers_by_endpoint.entry(writer.addr).or_insert(0) += 1;
}
for endpoints in endpoints_by_dc.values() {
let endpoint_count = endpoints.len();
if endpoint_count == 0 {
return false;
}
let required = self.required_writers_for_dc_with_floor_mode(endpoint_count, false);
let alive: usize = endpoints
.iter()
.map(|endpoint| live_writers_by_endpoint.get(endpoint).copied().unwrap_or(0))
.sum();
if alive < required {
return false;
}
}
true
}
pub(crate) async fn api_status_snapshot(&self) -> MeApiStatusSnapshot { pub(crate) async fn api_status_snapshot(&self) -> MeApiStatusSnapshot {
let now_epoch_secs = Self::now_epoch_secs(); let now_epoch_secs = Self::now_epoch_secs();

View File

@@ -278,6 +278,11 @@ impl ConnRegistry {
Some(ConnWriter { writer_id, tx: writer }) Some(ConnWriter { writer_id, tx: writer })
} }
pub async fn active_conn_ids(&self) -> Vec<u64> {
let inner = self.inner.read().await;
inner.writer_for_conn.keys().copied().collect()
}
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> { pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
inner.writers.remove(&writer_id); inner.writers.remove(&writer_id);

View File

@@ -1,16 +1,17 @@
use std::cmp::Reverse; use std::cmp::Reverse;
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::time::Duration; use std::time::{Duration, Instant};
use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::error::TrySendError;
use tracing::{debug, warn}; use tracing::{debug, warn};
use crate::config::MeRouteNoWriterMode;
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::network::IpFamily; use crate::network::IpFamily;
use crate::protocol::constants::RPC_CLOSE_EXT_U32; use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32};
use super::MePool; use super::MePool;
use super::codec::WriterCommand; use super::codec::WriterCommand;
@@ -21,6 +22,7 @@ use super::registry::ConnMeta;
const IDLE_WRITER_PENALTY_MID_SECS: u64 = 45; const IDLE_WRITER_PENALTY_MID_SECS: u64 = 45;
const IDLE_WRITER_PENALTY_HIGH_SECS: u64 = 55; const IDLE_WRITER_PENALTY_HIGH_SECS: u64 = 55;
const HYBRID_GLOBAL_BURST_PERIOD_ROUNDS: u32 = 4;
impl MePool { impl MePool {
/// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default. /// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default.
@@ -49,7 +51,14 @@ impl MePool {
our_addr, our_addr,
proto_flags, proto_flags,
}; };
let mut emergency_attempts = 0; let no_writer_mode =
MeRouteNoWriterMode::from_u8(self.me_route_no_writer_mode.load(Ordering::Relaxed));
let mut no_writer_deadline: Option<Instant> = None;
let mut emergency_attempts = 0u32;
let mut async_recovery_triggered = false;
let mut hybrid_recovery_round = 0u32;
let mut hybrid_last_recovery_at: Option<Instant> = None;
let hybrid_wait_step = self.me_route_no_writer_wait.max(Duration::from_millis(50));
loop { loop {
if let Some(current) = self.registry.get_writer(conn_id).await { if let Some(current) = self.registry.get_writer(conn_id).await {
@@ -74,34 +83,78 @@ impl MePool {
let mut writers_snapshot = { let mut writers_snapshot = {
let ws = self.writers.read().await; let ws = self.writers.read().await;
if ws.is_empty() { if ws.is_empty() {
// Create waiter before recovery attempts so notify_one permits are not missed.
let waiter = self.writer_available.notified();
drop(ws); drop(ws);
for family in self.family_order() { match no_writer_mode {
let map = match family { MeRouteNoWriterMode::AsyncRecoveryFailfast => {
IpFamily::V4 => self.proxy_map_v4.read().await.clone(), let deadline = *no_writer_deadline.get_or_insert_with(|| {
IpFamily::V6 => self.proxy_map_v6.read().await.clone(), Instant::now() + self.me_route_no_writer_wait
}; });
for (_dc, addrs) in map.iter() { if !async_recovery_triggered {
for (ip, port) in addrs { let triggered =
let addr = SocketAddr::new(*ip, *port); self.trigger_async_recovery_for_target_dc(target_dc).await;
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() { if !triggered {
self.writer_available.notify_one(); self.trigger_async_recovery_global().await;
}
async_recovery_triggered = true;
}
if self.wait_for_writer_until(deadline).await {
continue;
}
self.stats.increment_me_no_writer_failfast_total();
return Err(ProxyError::Proxy(
"No ME writer available in failfast window".into(),
));
}
MeRouteNoWriterMode::InlineRecoveryLegacy => {
self.stats.increment_me_inline_recovery_total();
for _ in 0..self.me_route_inline_recovery_attempts.max(1) {
for family in self.family_order() {
let map = match family {
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
};
for (_dc, addrs) in &map {
for (ip, port) in addrs {
let addr = SocketAddr::new(*ip, *port);
let _ = self.connect_one(addr, self.rng.as_ref()).await;
}
}
}
if !self.writers.read().await.is_empty() {
break; break;
} }
} }
} if !self.writers.read().await.is_empty() {
} continue;
if !self.writers.read().await.is_empty() { }
continue; let waiter = self.writer_available.notified();
} if tokio::time::timeout(self.me_route_inline_recovery_wait, waiter)
if tokio::time::timeout(Duration::from_secs(3), waiter).await.is_err() { .await
if !self.writers.read().await.is_empty() { .is_err()
{
if !self.writers.read().await.is_empty() {
continue;
}
self.stats.increment_me_no_writer_failfast_total();
return Err(ProxyError::Proxy(
"All ME connections dead (legacy wait timeout)".into(),
));
}
continue;
}
MeRouteNoWriterMode::HybridAsyncPersistent => {
self.maybe_trigger_hybrid_recovery(
target_dc,
&mut hybrid_recovery_round,
&mut hybrid_last_recovery_at,
hybrid_wait_step,
)
.await;
let deadline = Instant::now() + hybrid_wait_step;
let _ = self.wait_for_writer_until(deadline).await;
continue; continue;
} }
return Err(ProxyError::Proxy("All ME connections dead (waited 3s)".into()));
} }
continue;
} }
ws.clone() ws.clone()
}; };
@@ -115,45 +168,81 @@ impl MePool {
.await; .await;
} }
if candidate_indices.is_empty() { if candidate_indices.is_empty() {
// Emergency connect-on-demand match no_writer_mode {
if emergency_attempts >= 3 { MeRouteNoWriterMode::AsyncRecoveryFailfast => {
return Err(ProxyError::Proxy("No ME writers available for target DC".into())); let deadline = *no_writer_deadline.get_or_insert_with(|| {
} Instant::now() + self.me_route_no_writer_wait
emergency_attempts += 1; });
for family in self.family_order() { if !async_recovery_triggered {
let map_guard = match family { let triggered = self.trigger_async_recovery_for_target_dc(target_dc).await;
IpFamily::V4 => self.proxy_map_v4.read().await, if !triggered {
IpFamily::V6 => self.proxy_map_v6.read().await, self.trigger_async_recovery_global().await;
}; }
if let Some(addrs) = map_guard.get(&(target_dc as i32)) { async_recovery_triggered = true;
let mut shuffled = addrs.clone(); }
shuffled.shuffle(&mut rand::rng()); if self.wait_for_candidate_until(target_dc, deadline).await {
drop(map_guard); continue;
for (ip, port) in shuffled { }
let addr = SocketAddr::new(ip, port); self.stats.increment_me_no_writer_failfast_total();
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() { return Err(ProxyError::Proxy(
break; "No ME writers available for target DC in failfast window".into(),
));
}
MeRouteNoWriterMode::InlineRecoveryLegacy => {
self.stats.increment_me_inline_recovery_total();
if emergency_attempts >= self.me_route_inline_recovery_attempts.max(1) {
self.stats.increment_me_no_writer_failfast_total();
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
}
emergency_attempts += 1;
for family in self.family_order() {
let map_guard = match family {
IpFamily::V4 => self.proxy_map_v4.read().await,
IpFamily::V6 => self.proxy_map_v6.read().await,
};
if let Some(addrs) = map_guard.get(&(target_dc as i32)) {
let mut shuffled = addrs.clone();
shuffled.shuffle(&mut rand::rng());
drop(map_guard);
for (ip, port) in shuffled {
let addr = SocketAddr::new(ip, port);
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
break;
}
}
tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64)).await;
let ws2 = self.writers.read().await;
writers_snapshot = ws2.clone();
drop(ws2);
candidate_indices = self
.candidate_indices_for_dc(&writers_snapshot, target_dc, false)
.await;
if candidate_indices.is_empty() {
candidate_indices = self
.candidate_indices_for_dc(&writers_snapshot, target_dc, true)
.await;
}
if !candidate_indices.is_empty() {
break;
}
} }
} }
tokio::time::sleep(Duration::from_millis(100 * emergency_attempts)).await;
let ws2 = self.writers.read().await;
writers_snapshot = ws2.clone();
drop(ws2);
candidate_indices = self
.candidate_indices_for_dc(&writers_snapshot, target_dc, false)
.await;
if candidate_indices.is_empty() { if candidate_indices.is_empty() {
candidate_indices = self return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
.candidate_indices_for_dc(&writers_snapshot, target_dc, true)
.await;
}
if !candidate_indices.is_empty() {
break;
} }
} }
} MeRouteNoWriterMode::HybridAsyncPersistent => {
if candidate_indices.is_empty() { self.maybe_trigger_hybrid_recovery(
return Err(ProxyError::Proxy("No ME writers available for target DC".into())); target_dc,
&mut hybrid_recovery_round,
&mut hybrid_last_recovery_at,
hybrid_wait_step,
)
.await;
let deadline = Instant::now() + hybrid_wait_step;
let _ = self.wait_for_candidate_until(target_dc, deadline).await;
continue;
}
} }
} }
let writer_idle_since = self.registry.writer_idle_since_snapshot().await; let writer_idle_since = self.registry.writer_idle_since_snapshot().await;
@@ -275,6 +364,151 @@ impl MePool {
} }
} }
async fn wait_for_writer_until(&self, deadline: Instant) -> bool {
let waiter = self.writer_available.notified();
if !self.writers.read().await.is_empty() {
return true;
}
let now = Instant::now();
if now >= deadline {
return !self.writers.read().await.is_empty();
}
let timeout = deadline.saturating_duration_since(now);
if tokio::time::timeout(timeout, waiter).await.is_ok() {
return true;
}
!self.writers.read().await.is_empty()
}
async fn wait_for_candidate_until(&self, target_dc: i16, deadline: Instant) -> bool {
loop {
if self.has_candidate_for_target_dc(target_dc).await {
return true;
}
let now = Instant::now();
if now >= deadline {
return self.has_candidate_for_target_dc(target_dc).await;
}
let remaining = deadline.saturating_duration_since(now);
let sleep_for = remaining.min(Duration::from_millis(25));
let waiter = self.writer_available.notified();
tokio::select! {
_ = waiter => {}
_ = tokio::time::sleep(sleep_for) => {}
}
}
}
async fn has_candidate_for_target_dc(&self, target_dc: i16) -> bool {
let writers_snapshot = {
let ws = self.writers.read().await;
if ws.is_empty() {
return false;
}
ws.clone()
};
let mut candidate_indices = self
.candidate_indices_for_dc(&writers_snapshot, target_dc, false)
.await;
if candidate_indices.is_empty() {
candidate_indices = self
.candidate_indices_for_dc(&writers_snapshot, target_dc, true)
.await;
}
!candidate_indices.is_empty()
}
async fn trigger_async_recovery_for_target_dc(self: &Arc<Self>, target_dc: i16) -> bool {
let endpoints = self.endpoint_candidates_for_target_dc(target_dc).await;
if endpoints.is_empty() {
return false;
}
self.stats.increment_me_async_recovery_trigger_total();
for addr in endpoints.into_iter().take(8) {
self.trigger_immediate_refill(addr);
}
true
}
async fn trigger_async_recovery_global(self: &Arc<Self>) {
self.stats.increment_me_async_recovery_trigger_total();
let mut seen = HashSet::<SocketAddr>::new();
for family in self.family_order() {
let map = match family {
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
};
for addrs in map.values() {
for (ip, port) in addrs {
let addr = SocketAddr::new(*ip, *port);
if seen.insert(addr) {
self.trigger_immediate_refill(addr);
}
if seen.len() >= 8 {
return;
}
}
}
}
}
async fn endpoint_candidates_for_target_dc(&self, target_dc: i16) -> Vec<SocketAddr> {
let key = target_dc as i32;
let mut preferred = Vec::<SocketAddr>::new();
let mut seen = HashSet::<SocketAddr>::new();
for family in self.family_order() {
let map = match family {
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
};
let mut lookup_keys = vec![key, key.abs(), -key.abs()];
let def = self.default_dc.load(Ordering::Relaxed);
if def != 0 {
lookup_keys.push(def);
}
for lookup in lookup_keys {
if let Some(addrs) = map.get(&lookup) {
for (ip, port) in addrs {
let addr = SocketAddr::new(*ip, *port);
if seen.insert(addr) {
preferred.push(addr);
}
}
}
}
if !preferred.is_empty() && !self.decision.effective_multipath {
break;
}
}
preferred
}
async fn maybe_trigger_hybrid_recovery(
self: &Arc<Self>,
target_dc: i16,
hybrid_recovery_round: &mut u32,
hybrid_last_recovery_at: &mut Option<Instant>,
hybrid_wait_step: Duration,
) {
if let Some(last) = *hybrid_last_recovery_at
&& last.elapsed() < hybrid_wait_step
{
return;
}
let round = *hybrid_recovery_round;
let target_triggered = self.trigger_async_recovery_for_target_dc(target_dc).await;
if !target_triggered || round % HYBRID_GLOBAL_BURST_PERIOD_ROUNDS == 0 {
self.trigger_async_recovery_global().await;
}
*hybrid_recovery_round = round.saturating_add(1);
*hybrid_last_recovery_at = Some(Instant::now());
}
pub async fn send_close(self: &Arc<Self>, conn_id: u64) -> Result<()> { pub async fn send_close(self: &Arc<Self>, conn_id: u64) -> Result<()> {
if let Some(w) = self.registry.get_writer(conn_id).await { if let Some(w) = self.registry.get_writer(conn_id).await {
let mut p = Vec::with_capacity(12); let mut p = Vec::with_capacity(12);
@@ -292,6 +526,37 @@ impl MePool {
Ok(()) Ok(())
} }
pub async fn send_close_conn(self: &Arc<Self>, conn_id: u64) -> Result<()> {
if let Some(w) = self.registry.get_writer(conn_id).await {
let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes());
match w.tx.try_send(WriterCommand::DataAndFlush(p)) {
Ok(()) => {}
Err(TrySendError::Full(cmd)) => {
let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await;
}
Err(TrySendError::Closed(_)) => {
debug!(conn_id, "ME close_conn skipped: writer channel closed");
}
}
} else {
debug!(conn_id, "ME close_conn skipped (writer missing)");
}
self.registry.unregister(conn_id).await;
Ok(())
}
pub async fn shutdown_send_close_conn_all(self: &Arc<Self>) -> usize {
let conn_ids = self.registry.active_conn_ids().await;
let total = conn_ids.len();
for conn_id in conn_ids {
let _ = self.send_close_conn(conn_id).await;
}
total
}
pub fn connection_count(&self) -> usize { pub fn connection_count(&self) -> usize {
self.conn_count.load(Ordering::Relaxed) self.conn_count.load(Ordering::Relaxed)
} }