mirror of
https://github.com/telemt/telemt.git
synced 2026-04-15 17:44:11 +03:00
Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0494f8ac8b | ||
|
|
48ce59900e | ||
|
|
84e95fd229 | ||
|
|
a80be78345 | ||
|
|
64130dd02e | ||
|
|
d62a6e0417 | ||
|
|
3260746785 | ||
|
|
8066ea2163 | ||
|
|
813f1df63e | ||
|
|
09bdafa718 | ||
|
|
fb0f75df43 | ||
|
|
39255df549 | ||
|
|
456495fd62 | ||
|
|
83cadc0bf3 | ||
|
|
0b1a8cd3f8 | ||
|
|
565b4ee923 |
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "telemt"
|
name = "telemt"
|
||||||
version = "3.2.2"
|
version = "3.3.0"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
@@ -118,8 +118,8 @@ We welcome ideas, architectural feedback, and pull requests.
|
|||||||
|
|
||||||
## Quick Start Guide
|
## Quick Start Guide
|
||||||
|
|
||||||
### [Quick Start Guid RU](docs/QUICK_START_GUIDE.ru.md)
|
### [Quick Start Guide RU](docs/QUICK_START_GUIDE.ru.md)
|
||||||
### [Quick Start Guid EN](docs/QUICK_START_GUIDE.en.md)
|
### [Quick Start Guide EN](docs/QUICK_START_GUIDE.en.md)
|
||||||
|
|
||||||
|
|
||||||
### Advanced
|
### Advanced
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
use std::net::IpAddr;
|
||||||
|
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use hyper::StatusCode;
|
use hyper::StatusCode;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
@@ -369,6 +371,9 @@ pub(super) struct UserInfo {
|
|||||||
pub(super) max_unique_ips: Option<usize>,
|
pub(super) max_unique_ips: Option<usize>,
|
||||||
pub(super) current_connections: u64,
|
pub(super) current_connections: u64,
|
||||||
pub(super) active_unique_ips: usize,
|
pub(super) active_unique_ips: usize,
|
||||||
|
pub(super) active_unique_ips_list: Vec<IpAddr>,
|
||||||
|
pub(super) recent_unique_ips: usize,
|
||||||
|
pub(super) recent_unique_ips_list: Vec<IpAddr>,
|
||||||
pub(super) total_octets: u64,
|
pub(super) total_octets: u64,
|
||||||
pub(super) links: UserLinks,
|
pub(super) links: UserLinks,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
use std::collections::HashMap;
|
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
|
|
||||||
use hyper::StatusCode;
|
use hyper::StatusCode;
|
||||||
@@ -112,6 +111,9 @@ pub(super) async fn create_user(
|
|||||||
max_unique_ips: updated_limit,
|
max_unique_ips: updated_limit,
|
||||||
current_connections: 0,
|
current_connections: 0,
|
||||||
active_unique_ips: 0,
|
active_unique_ips: 0,
|
||||||
|
active_unique_ips_list: Vec::new(),
|
||||||
|
recent_unique_ips: 0,
|
||||||
|
recent_unique_ips_list: Vec::new(),
|
||||||
total_octets: 0,
|
total_octets: 0,
|
||||||
links: build_user_links(
|
links: build_user_links(
|
||||||
&cfg,
|
&cfg,
|
||||||
@@ -300,18 +302,21 @@ pub(super) async fn users_from_config(
|
|||||||
startup_detected_ip_v4: Option<IpAddr>,
|
startup_detected_ip_v4: Option<IpAddr>,
|
||||||
startup_detected_ip_v6: Option<IpAddr>,
|
startup_detected_ip_v6: Option<IpAddr>,
|
||||||
) -> Vec<UserInfo> {
|
) -> Vec<UserInfo> {
|
||||||
let ip_counts = ip_tracker
|
|
||||||
.get_stats()
|
|
||||||
.await
|
|
||||||
.into_iter()
|
|
||||||
.map(|(user, count, _)| (user, count))
|
|
||||||
.collect::<HashMap<_, _>>();
|
|
||||||
|
|
||||||
let mut names = cfg.access.users.keys().cloned().collect::<Vec<_>>();
|
let mut names = cfg.access.users.keys().cloned().collect::<Vec<_>>();
|
||||||
names.sort();
|
names.sort();
|
||||||
|
let active_ip_lists = ip_tracker.get_active_ips_for_users(&names).await;
|
||||||
|
let recent_ip_lists = ip_tracker.get_recent_ips_for_users(&names).await;
|
||||||
|
|
||||||
let mut users = Vec::with_capacity(names.len());
|
let mut users = Vec::with_capacity(names.len());
|
||||||
for username in names {
|
for username in names {
|
||||||
|
let active_ip_list = active_ip_lists
|
||||||
|
.get(&username)
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_else(Vec::new);
|
||||||
|
let recent_ip_list = recent_ip_lists
|
||||||
|
.get(&username)
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_else(Vec::new);
|
||||||
let links = cfg
|
let links = cfg
|
||||||
.access
|
.access
|
||||||
.users
|
.users
|
||||||
@@ -340,7 +345,10 @@ pub(super) async fn users_from_config(
|
|||||||
data_quota_bytes: cfg.access.user_data_quota.get(&username).copied(),
|
data_quota_bytes: cfg.access.user_data_quota.get(&username).copied(),
|
||||||
max_unique_ips: cfg.access.user_max_unique_ips.get(&username).copied(),
|
max_unique_ips: cfg.access.user_max_unique_ips.get(&username).copied(),
|
||||||
current_connections: stats.get_user_curr_connects(&username),
|
current_connections: stats.get_user_curr_connects(&username),
|
||||||
active_unique_ips: ip_counts.get(&username).copied().unwrap_or(0),
|
active_unique_ips: active_ip_list.len(),
|
||||||
|
active_unique_ips_list: active_ip_list,
|
||||||
|
recent_unique_ips: recent_ip_list.len(),
|
||||||
|
recent_unique_ips_list: recent_ip_list,
|
||||||
total_octets: stats.get_user_total_octets(&username),
|
total_octets: stats.get_user_total_octets(&username),
|
||||||
links,
|
links,
|
||||||
username,
|
username,
|
||||||
|
|||||||
@@ -129,6 +129,10 @@ pub(crate) fn default_unknown_dc_log_path() -> Option<String> {
|
|||||||
Some("unknown-dc.txt".to_string())
|
Some("unknown-dc.txt".to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_unknown_dc_file_log_enabled() -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_pool_size() -> usize {
|
pub(crate) fn default_pool_size() -> usize {
|
||||||
8
|
8
|
||||||
}
|
}
|
||||||
@@ -137,6 +141,14 @@ pub(crate) fn default_proxy_secret_path() -> Option<String> {
|
|||||||
Some("proxy-secret".to_string())
|
Some("proxy-secret".to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_proxy_config_v4_cache_path() -> Option<String> {
|
||||||
|
Some("cache/proxy-config-v4.txt".to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_proxy_config_v6_cache_path() -> Option<String> {
|
||||||
|
Some("cache/proxy-config-v6.txt".to_string())
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_middle_proxy_nat_stun() -> Option<String> {
|
pub(crate) fn default_middle_proxy_nat_stun() -> Option<String> {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
@@ -273,6 +285,18 @@ pub(crate) fn default_me_route_backpressure_high_watermark_pct() -> u8 {
|
|||||||
80
|
80
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_route_no_writer_wait_ms() -> u64 {
|
||||||
|
250
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_route_inline_recovery_attempts() -> u32 {
|
||||||
|
3
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_route_inline_recovery_wait_ms() -> u64 {
|
||||||
|
3000
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_beobachten_minutes() -> u64 {
|
pub(crate) fn default_beobachten_minutes() -> u64 {
|
||||||
10
|
10
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -381,6 +381,22 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
|
|||||||
warned = true;
|
warned = true;
|
||||||
warn!("config reload: general.middle_proxy_pool_size changed; restart required");
|
warn!("config reload: general.middle_proxy_pool_size changed; restart required");
|
||||||
}
|
}
|
||||||
|
if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode
|
||||||
|
|| old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms
|
||||||
|
|| old.general.me_route_inline_recovery_attempts
|
||||||
|
!= new.general.me_route_inline_recovery_attempts
|
||||||
|
|| old.general.me_route_inline_recovery_wait_ms
|
||||||
|
!= new.general.me_route_inline_recovery_wait_ms
|
||||||
|
{
|
||||||
|
warned = true;
|
||||||
|
warn!("config reload: general.me_route_no_writer_* changed; restart required");
|
||||||
|
}
|
||||||
|
if old.general.unknown_dc_log_path != new.general.unknown_dc_log_path
|
||||||
|
|| old.general.unknown_dc_file_log_enabled != new.general.unknown_dc_file_log_enabled
|
||||||
|
{
|
||||||
|
warned = true;
|
||||||
|
warn!("config reload: general.unknown_dc_* changed; restart required");
|
||||||
|
}
|
||||||
if old.general.me_init_retry_attempts != new.general.me_init_retry_attempts {
|
if old.general.me_init_retry_attempts != new.general.me_init_retry_attempts {
|
||||||
warned = true;
|
warned = true;
|
||||||
warn!("config reload: general.me_init_retry_attempts changed; restart required");
|
warn!("config reload: general.me_init_retry_attempts changed; restart required");
|
||||||
@@ -389,6 +405,12 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
|
|||||||
warned = true;
|
warned = true;
|
||||||
warn!("config reload: general.me2dc_fallback changed; restart required");
|
warn!("config reload: general.me2dc_fallback changed; restart required");
|
||||||
}
|
}
|
||||||
|
if old.general.proxy_config_v4_cache_path != new.general.proxy_config_v4_cache_path
|
||||||
|
|| old.general.proxy_config_v6_cache_path != new.general.proxy_config_v6_cache_path
|
||||||
|
{
|
||||||
|
warned = true;
|
||||||
|
warn!("config reload: general.proxy_config_*_cache_path changed; restart required");
|
||||||
|
}
|
||||||
if old.general.me_keepalive_enabled != new.general.me_keepalive_enabled
|
if old.general.me_keepalive_enabled != new.general.me_keepalive_enabled
|
||||||
|| old.general.me_keepalive_interval_secs != new.general.me_keepalive_interval_secs
|
|| old.general.me_keepalive_interval_secs != new.general.me_keepalive_interval_secs
|
||||||
|| old.general.me_keepalive_jitter_secs != new.general.me_keepalive_jitter_secs
|
|| old.general.me_keepalive_jitter_secs != new.general.me_keepalive_jitter_secs
|
||||||
|
|||||||
@@ -203,6 +203,22 @@ impl ProxyConfig {
|
|||||||
|
|
||||||
sanitize_ad_tag(&mut config.general.ad_tag);
|
sanitize_ad_tag(&mut config.general.ad_tag);
|
||||||
|
|
||||||
|
if let Some(path) = &config.general.proxy_config_v4_cache_path
|
||||||
|
&& path.trim().is_empty()
|
||||||
|
{
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.proxy_config_v4_cache_path cannot be empty when provided".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(path) = &config.general.proxy_config_v6_cache_path
|
||||||
|
&& path.trim().is_empty()
|
||||||
|
{
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.proxy_config_v6_cache_path cannot be empty when provided".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(update_every) = config.general.update_every {
|
if let Some(update_every) = config.general.update_every {
|
||||||
if update_every == 0 {
|
if update_every == 0 {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
@@ -410,6 +426,24 @@ impl ProxyConfig {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !(10..=5000).contains(&config.general.me_route_no_writer_wait_ms) {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_route_no_writer_wait_ms must be within [10, 5000]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.general.me_route_inline_recovery_attempts == 0 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_route_inline_recovery_attempts must be > 0".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !(10..=30000).contains(&config.general.me_route_inline_recovery_wait_ms) {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_route_inline_recovery_wait_ms must be within [10, 30000]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if config.server.api.request_body_limit_bytes == 0 {
|
if config.server.api.request_body_limit_bytes == 0 {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
"server.api.request_body_limit_bytes must be > 0".to_string(),
|
"server.api.request_body_limit_bytes must be > 0".to_string(),
|
||||||
@@ -673,6 +707,14 @@ mod tests {
|
|||||||
cfg.general.me2dc_fallback,
|
cfg.general.me2dc_fallback,
|
||||||
default_me2dc_fallback()
|
default_me2dc_fallback()
|
||||||
);
|
);
|
||||||
|
assert_eq!(
|
||||||
|
cfg.general.proxy_config_v4_cache_path,
|
||||||
|
default_proxy_config_v4_cache_path()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
cfg.general.proxy_config_v6_cache_path,
|
||||||
|
default_proxy_config_v6_cache_path()
|
||||||
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
cfg.general.me_single_endpoint_shadow_writers,
|
cfg.general.me_single_endpoint_shadow_writers,
|
||||||
default_me_single_endpoint_shadow_writers()
|
default_me_single_endpoint_shadow_writers()
|
||||||
@@ -783,6 +825,14 @@ mod tests {
|
|||||||
default_me_init_retry_attempts()
|
default_me_init_retry_attempts()
|
||||||
);
|
);
|
||||||
assert_eq!(general.me2dc_fallback, default_me2dc_fallback());
|
assert_eq!(general.me2dc_fallback, default_me2dc_fallback());
|
||||||
|
assert_eq!(
|
||||||
|
general.proxy_config_v4_cache_path,
|
||||||
|
default_proxy_config_v4_cache_path()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
general.proxy_config_v6_cache_path,
|
||||||
|
default_proxy_config_v6_cache_path()
|
||||||
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
general.me_single_endpoint_shadow_writers,
|
general.me_single_endpoint_shadow_writers,
|
||||||
default_me_single_endpoint_shadow_writers()
|
default_me_single_endpoint_shadow_writers()
|
||||||
@@ -1206,6 +1256,85 @@ mod tests {
|
|||||||
let _ = std::fs::remove_file(path_valid);
|
let _ = std::fs::remove_file(path_valid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn me_route_no_writer_wait_ms_out_of_range_is_rejected() {
|
||||||
|
let toml = r#"
|
||||||
|
[general]
|
||||||
|
me_route_no_writer_wait_ms = 5
|
||||||
|
|
||||||
|
[censorship]
|
||||||
|
tls_domain = "example.com"
|
||||||
|
|
||||||
|
[access.users]
|
||||||
|
user = "00000000000000000000000000000000"
|
||||||
|
"#;
|
||||||
|
let dir = std::env::temp_dir();
|
||||||
|
let path = dir.join("telemt_me_route_no_writer_wait_ms_out_of_range_test.toml");
|
||||||
|
std::fs::write(&path, toml).unwrap();
|
||||||
|
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||||
|
assert!(err.contains("general.me_route_no_writer_wait_ms must be within [10, 5000]"));
|
||||||
|
let _ = std::fs::remove_file(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn me_route_no_writer_mode_is_parsed() {
|
||||||
|
let toml = r#"
|
||||||
|
[general]
|
||||||
|
me_route_no_writer_mode = "inline_recovery_legacy"
|
||||||
|
|
||||||
|
[censorship]
|
||||||
|
tls_domain = "example.com"
|
||||||
|
|
||||||
|
[access.users]
|
||||||
|
user = "00000000000000000000000000000000"
|
||||||
|
"#;
|
||||||
|
let dir = std::env::temp_dir();
|
||||||
|
let path = dir.join("telemt_me_route_no_writer_mode_parse_test.toml");
|
||||||
|
std::fs::write(&path, toml).unwrap();
|
||||||
|
let cfg = ProxyConfig::load(&path).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
cfg.general.me_route_no_writer_mode,
|
||||||
|
crate::config::MeRouteNoWriterMode::InlineRecoveryLegacy
|
||||||
|
);
|
||||||
|
let _ = std::fs::remove_file(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn proxy_config_cache_paths_empty_are_rejected() {
|
||||||
|
let toml = r#"
|
||||||
|
[general]
|
||||||
|
proxy_config_v4_cache_path = " "
|
||||||
|
|
||||||
|
[censorship]
|
||||||
|
tls_domain = "example.com"
|
||||||
|
|
||||||
|
[access.users]
|
||||||
|
user = "00000000000000000000000000000000"
|
||||||
|
"#;
|
||||||
|
let dir = std::env::temp_dir();
|
||||||
|
let path = dir.join("telemt_proxy_config_v4_cache_path_empty_test.toml");
|
||||||
|
std::fs::write(&path, toml).unwrap();
|
||||||
|
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||||
|
assert!(err.contains("general.proxy_config_v4_cache_path cannot be empty"));
|
||||||
|
let _ = std::fs::remove_file(path);
|
||||||
|
|
||||||
|
let toml_v6 = r#"
|
||||||
|
[general]
|
||||||
|
proxy_config_v6_cache_path = ""
|
||||||
|
|
||||||
|
[censorship]
|
||||||
|
tls_domain = "example.com"
|
||||||
|
|
||||||
|
[access.users]
|
||||||
|
user = "00000000000000000000000000000000"
|
||||||
|
"#;
|
||||||
|
let path_v6 = dir.join("telemt_proxy_config_v6_cache_path_empty_test.toml");
|
||||||
|
std::fs::write(&path_v6, toml_v6).unwrap();
|
||||||
|
let err_v6 = ProxyConfig::load(&path_v6).unwrap_err().to_string();
|
||||||
|
assert!(err_v6.contains("general.proxy_config_v6_cache_path cannot be empty"));
|
||||||
|
let _ = std::fs::remove_file(path_v6);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn me_hardswap_warmup_defaults_are_set() {
|
fn me_hardswap_warmup_defaults_are_set() {
|
||||||
let toml = r#"
|
let toml = r#"
|
||||||
|
|||||||
@@ -183,6 +183,31 @@ impl MeFloorMode {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Middle-End route behavior when no writer is immediately available.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum MeRouteNoWriterMode {
|
||||||
|
#[default]
|
||||||
|
AsyncRecoveryFailfast,
|
||||||
|
InlineRecoveryLegacy,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MeRouteNoWriterMode {
|
||||||
|
pub fn as_u8(self) -> u8 {
|
||||||
|
match self {
|
||||||
|
MeRouteNoWriterMode::AsyncRecoveryFailfast => 0,
|
||||||
|
MeRouteNoWriterMode::InlineRecoveryLegacy => 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_u8(raw: u8) -> Self {
|
||||||
|
match raw {
|
||||||
|
1 => MeRouteNoWriterMode::InlineRecoveryLegacy,
|
||||||
|
_ => MeRouteNoWriterMode::AsyncRecoveryFailfast,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Per-user unique source IP limit mode.
|
/// Per-user unique source IP limit mode.
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||||
#[serde(rename_all = "snake_case")]
|
#[serde(rename_all = "snake_case")]
|
||||||
@@ -318,6 +343,14 @@ pub struct GeneralConfig {
|
|||||||
#[serde(default = "default_proxy_secret_path")]
|
#[serde(default = "default_proxy_secret_path")]
|
||||||
pub proxy_secret_path: Option<String>,
|
pub proxy_secret_path: Option<String>,
|
||||||
|
|
||||||
|
/// Optional path to cache raw getProxyConfig (IPv4) snapshot for startup fallback.
|
||||||
|
#[serde(default = "default_proxy_config_v4_cache_path")]
|
||||||
|
pub proxy_config_v4_cache_path: Option<String>,
|
||||||
|
|
||||||
|
/// Optional path to cache raw getProxyConfigV6 snapshot for startup fallback.
|
||||||
|
#[serde(default = "default_proxy_config_v6_cache_path")]
|
||||||
|
pub proxy_config_v6_cache_path: Option<String>,
|
||||||
|
|
||||||
/// Global ad_tag (32 hex chars from @MTProxybot). Fallback when user has no per-user tag in access.user_ad_tags.
|
/// Global ad_tag (32 hex chars from @MTProxybot). Fallback when user has no per-user tag in access.user_ad_tags.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub ad_tag: Option<String>,
|
pub ad_tag: Option<String>,
|
||||||
@@ -511,6 +544,10 @@ pub struct GeneralConfig {
|
|||||||
#[serde(default = "default_unknown_dc_log_path")]
|
#[serde(default = "default_unknown_dc_log_path")]
|
||||||
pub unknown_dc_log_path: Option<String>,
|
pub unknown_dc_log_path: Option<String>,
|
||||||
|
|
||||||
|
/// Enable unknown-DC file logging.
|
||||||
|
#[serde(default = "default_unknown_dc_file_log_enabled")]
|
||||||
|
pub unknown_dc_file_log_enabled: bool,
|
||||||
|
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub log_level: LogLevel,
|
pub log_level: LogLevel,
|
||||||
|
|
||||||
@@ -538,6 +575,22 @@ pub struct GeneralConfig {
|
|||||||
#[serde(default = "default_me_route_backpressure_high_watermark_pct")]
|
#[serde(default = "default_me_route_backpressure_high_watermark_pct")]
|
||||||
pub me_route_backpressure_high_watermark_pct: u8,
|
pub me_route_backpressure_high_watermark_pct: u8,
|
||||||
|
|
||||||
|
/// ME route behavior when no writer is immediately available.
|
||||||
|
#[serde(default)]
|
||||||
|
pub me_route_no_writer_mode: MeRouteNoWriterMode,
|
||||||
|
|
||||||
|
/// Maximum wait time in milliseconds for async-recovery failfast mode.
|
||||||
|
#[serde(default = "default_me_route_no_writer_wait_ms")]
|
||||||
|
pub me_route_no_writer_wait_ms: u64,
|
||||||
|
|
||||||
|
/// Number of inline recovery attempts in legacy mode.
|
||||||
|
#[serde(default = "default_me_route_inline_recovery_attempts")]
|
||||||
|
pub me_route_inline_recovery_attempts: u32,
|
||||||
|
|
||||||
|
/// Maximum wait time in milliseconds for inline recovery in legacy mode.
|
||||||
|
#[serde(default = "default_me_route_inline_recovery_wait_ms")]
|
||||||
|
pub me_route_inline_recovery_wait_ms: u64,
|
||||||
|
|
||||||
/// [general.links] — proxy link generation overrides.
|
/// [general.links] — proxy link generation overrides.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub links: LinksConfig,
|
pub links: LinksConfig,
|
||||||
@@ -682,6 +735,8 @@ impl Default for GeneralConfig {
|
|||||||
use_middle_proxy: default_true(),
|
use_middle_proxy: default_true(),
|
||||||
ad_tag: None,
|
ad_tag: None,
|
||||||
proxy_secret_path: default_proxy_secret_path(),
|
proxy_secret_path: default_proxy_secret_path(),
|
||||||
|
proxy_config_v4_cache_path: default_proxy_config_v4_cache_path(),
|
||||||
|
proxy_config_v6_cache_path: default_proxy_config_v6_cache_path(),
|
||||||
middle_proxy_nat_ip: None,
|
middle_proxy_nat_ip: None,
|
||||||
middle_proxy_nat_probe: default_true(),
|
middle_proxy_nat_probe: default_true(),
|
||||||
middle_proxy_nat_stun: default_middle_proxy_nat_stun(),
|
middle_proxy_nat_stun: default_middle_proxy_nat_stun(),
|
||||||
@@ -719,6 +774,7 @@ impl Default for GeneralConfig {
|
|||||||
upstream_connect_failfast_hard_errors: default_upstream_connect_failfast_hard_errors(),
|
upstream_connect_failfast_hard_errors: default_upstream_connect_failfast_hard_errors(),
|
||||||
stun_iface_mismatch_ignore: false,
|
stun_iface_mismatch_ignore: false,
|
||||||
unknown_dc_log_path: default_unknown_dc_log_path(),
|
unknown_dc_log_path: default_unknown_dc_log_path(),
|
||||||
|
unknown_dc_file_log_enabled: default_unknown_dc_file_log_enabled(),
|
||||||
log_level: LogLevel::Normal,
|
log_level: LogLevel::Normal,
|
||||||
disable_colors: false,
|
disable_colors: false,
|
||||||
telemetry: TelemetryConfig::default(),
|
telemetry: TelemetryConfig::default(),
|
||||||
@@ -726,6 +782,10 @@ impl Default for GeneralConfig {
|
|||||||
me_route_backpressure_base_timeout_ms: default_me_route_backpressure_base_timeout_ms(),
|
me_route_backpressure_base_timeout_ms: default_me_route_backpressure_base_timeout_ms(),
|
||||||
me_route_backpressure_high_timeout_ms: default_me_route_backpressure_high_timeout_ms(),
|
me_route_backpressure_high_timeout_ms: default_me_route_backpressure_high_timeout_ms(),
|
||||||
me_route_backpressure_high_watermark_pct: default_me_route_backpressure_high_watermark_pct(),
|
me_route_backpressure_high_watermark_pct: default_me_route_backpressure_high_watermark_pct(),
|
||||||
|
me_route_no_writer_mode: MeRouteNoWriterMode::default(),
|
||||||
|
me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(),
|
||||||
|
me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(),
|
||||||
|
me_route_inline_recovery_wait_ms: default_me_route_inline_recovery_wait_ms(),
|
||||||
links: LinksConfig::default(),
|
links: LinksConfig::default(),
|
||||||
crypto_pending_buffer: default_crypto_pending_buffer(),
|
crypto_pending_buffer: default_crypto_pending_buffer(),
|
||||||
max_client_frame: default_max_client_frame(),
|
max_client_frame: default_max_client_frame(),
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::HashMap;
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
@@ -13,7 +13,7 @@ use crate::config::UserMaxUniqueIpsMode;
|
|||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct UserIpTracker {
|
pub struct UserIpTracker {
|
||||||
active_ips: Arc<RwLock<HashMap<String, HashSet<IpAddr>>>>,
|
active_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, usize>>>>,
|
||||||
recent_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, Instant>>>>,
|
recent_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, Instant>>>>,
|
||||||
max_ips: Arc<RwLock<HashMap<String, usize>>>,
|
max_ips: Arc<RwLock<HashMap<String, usize>>>,
|
||||||
limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>,
|
limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>,
|
||||||
@@ -67,21 +67,14 @@ impl UserIpTracker {
|
|||||||
let max_ips = self.max_ips.read().await;
|
let max_ips = self.max_ips.read().await;
|
||||||
max_ips.get(username).copied()
|
max_ips.get(username).copied()
|
||||||
};
|
};
|
||||||
|
let mode = *self.limit_mode.read().await;
|
||||||
|
let window = *self.limit_window.read().await;
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
let mut active_ips = self.active_ips.write().await;
|
let mut active_ips = self.active_ips.write().await;
|
||||||
let user_active = active_ips
|
let user_active = active_ips
|
||||||
.entry(username.to_string())
|
.entry(username.to_string())
|
||||||
.or_insert_with(HashSet::new);
|
.or_insert_with(HashMap::new);
|
||||||
|
|
||||||
if limit.is_none() {
|
|
||||||
user_active.insert(ip);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let limit = limit.unwrap_or_default();
|
|
||||||
let mode = *self.limit_mode.read().await;
|
|
||||||
let window = *self.limit_window.read().await;
|
|
||||||
let now = Instant::now();
|
|
||||||
|
|
||||||
let mut recent_ips = self.recent_ips.write().await;
|
let mut recent_ips = self.recent_ips.write().await;
|
||||||
let user_recent = recent_ips
|
let user_recent = recent_ips
|
||||||
@@ -89,32 +82,35 @@ impl UserIpTracker {
|
|||||||
.or_insert_with(HashMap::new);
|
.or_insert_with(HashMap::new);
|
||||||
Self::prune_recent(user_recent, now, window);
|
Self::prune_recent(user_recent, now, window);
|
||||||
|
|
||||||
if user_active.contains(&ip) {
|
if let Some(count) = user_active.get_mut(&ip) {
|
||||||
|
*count = count.saturating_add(1);
|
||||||
user_recent.insert(ip, now);
|
user_recent.insert(ip, now);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let active_limit_reached = user_active.len() >= limit;
|
if let Some(limit) = limit {
|
||||||
let recent_limit_reached = user_recent.len() >= limit;
|
let active_limit_reached = user_active.len() >= limit;
|
||||||
let deny = match mode {
|
let recent_limit_reached = user_recent.len() >= limit;
|
||||||
UserMaxUniqueIpsMode::ActiveWindow => active_limit_reached,
|
let deny = match mode {
|
||||||
UserMaxUniqueIpsMode::TimeWindow => recent_limit_reached,
|
UserMaxUniqueIpsMode::ActiveWindow => active_limit_reached,
|
||||||
UserMaxUniqueIpsMode::Combined => active_limit_reached || recent_limit_reached,
|
UserMaxUniqueIpsMode::TimeWindow => recent_limit_reached,
|
||||||
};
|
UserMaxUniqueIpsMode::Combined => active_limit_reached || recent_limit_reached,
|
||||||
|
};
|
||||||
|
|
||||||
if deny {
|
if deny {
|
||||||
return Err(format!(
|
return Err(format!(
|
||||||
"IP limit reached for user '{}': active={}/{} recent={}/{} mode={:?}",
|
"IP limit reached for user '{}': active={}/{} recent={}/{} mode={:?}",
|
||||||
username,
|
username,
|
||||||
user_active.len(),
|
user_active.len(),
|
||||||
limit,
|
limit,
|
||||||
user_recent.len(),
|
user_recent.len(),
|
||||||
limit,
|
limit,
|
||||||
mode
|
mode
|
||||||
));
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
user_active.insert(ip);
|
user_active.insert(ip, 1);
|
||||||
user_recent.insert(ip, now);
|
user_recent.insert(ip, now);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -122,23 +118,73 @@ impl UserIpTracker {
|
|||||||
pub async fn remove_ip(&self, username: &str, ip: IpAddr) {
|
pub async fn remove_ip(&self, username: &str, ip: IpAddr) {
|
||||||
let mut active_ips = self.active_ips.write().await;
|
let mut active_ips = self.active_ips.write().await;
|
||||||
if let Some(user_ips) = active_ips.get_mut(username) {
|
if let Some(user_ips) = active_ips.get_mut(username) {
|
||||||
user_ips.remove(&ip);
|
if let Some(count) = user_ips.get_mut(&ip) {
|
||||||
|
if *count > 1 {
|
||||||
|
*count -= 1;
|
||||||
|
} else {
|
||||||
|
user_ips.remove(&ip);
|
||||||
|
}
|
||||||
|
}
|
||||||
if user_ips.is_empty() {
|
if user_ips.is_empty() {
|
||||||
active_ips.remove(username);
|
active_ips.remove(username);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
drop(active_ips);
|
}
|
||||||
|
|
||||||
let mode = *self.limit_mode.read().await;
|
pub async fn get_recent_counts_for_users(&self, users: &[String]) -> HashMap<String, usize> {
|
||||||
if matches!(mode, UserMaxUniqueIpsMode::ActiveWindow) {
|
let window = *self.limit_window.read().await;
|
||||||
let mut recent_ips = self.recent_ips.write().await;
|
let now = Instant::now();
|
||||||
if let Some(user_recent) = recent_ips.get_mut(username) {
|
let recent_ips = self.recent_ips.read().await;
|
||||||
user_recent.remove(&ip);
|
|
||||||
if user_recent.is_empty() {
|
let mut counts = HashMap::with_capacity(users.len());
|
||||||
recent_ips.remove(username);
|
for user in users {
|
||||||
}
|
let count = if let Some(user_recent) = recent_ips.get(user) {
|
||||||
}
|
user_recent
|
||||||
|
.values()
|
||||||
|
.filter(|seen_at| now.duration_since(**seen_at) <= window)
|
||||||
|
.count()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
counts.insert(user.clone(), count);
|
||||||
}
|
}
|
||||||
|
counts
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_active_ips_for_users(&self, users: &[String]) -> HashMap<String, Vec<IpAddr>> {
|
||||||
|
let active_ips = self.active_ips.read().await;
|
||||||
|
let mut out = HashMap::with_capacity(users.len());
|
||||||
|
for user in users {
|
||||||
|
let mut ips = active_ips
|
||||||
|
.get(user)
|
||||||
|
.map(|per_ip| per_ip.keys().copied().collect::<Vec<_>>())
|
||||||
|
.unwrap_or_else(Vec::new);
|
||||||
|
ips.sort();
|
||||||
|
out.insert(user.clone(), ips);
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_recent_ips_for_users(&self, users: &[String]) -> HashMap<String, Vec<IpAddr>> {
|
||||||
|
let window = *self.limit_window.read().await;
|
||||||
|
let now = Instant::now();
|
||||||
|
let recent_ips = self.recent_ips.read().await;
|
||||||
|
|
||||||
|
let mut out = HashMap::with_capacity(users.len());
|
||||||
|
for user in users {
|
||||||
|
let mut ips = if let Some(user_recent) = recent_ips.get(user) {
|
||||||
|
user_recent
|
||||||
|
.iter()
|
||||||
|
.filter(|(_, seen_at)| now.duration_since(**seen_at) <= window)
|
||||||
|
.map(|(ip, _)| *ip)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
} else {
|
||||||
|
Vec::new()
|
||||||
|
};
|
||||||
|
ips.sort();
|
||||||
|
out.insert(user.clone(), ips);
|
||||||
|
}
|
||||||
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_active_ip_count(&self, username: &str) -> usize {
|
pub async fn get_active_ip_count(&self, username: &str) -> usize {
|
||||||
@@ -150,7 +196,7 @@ impl UserIpTracker {
|
|||||||
let active_ips = self.active_ips.read().await;
|
let active_ips = self.active_ips.read().await;
|
||||||
active_ips
|
active_ips
|
||||||
.get(username)
|
.get(username)
|
||||||
.map(|ips| ips.iter().copied().collect())
|
.map(|ips| ips.keys().copied().collect())
|
||||||
.unwrap_or_else(Vec::new)
|
.unwrap_or_else(Vec::new)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -190,7 +236,7 @@ impl UserIpTracker {
|
|||||||
let active_ips = self.active_ips.read().await;
|
let active_ips = self.active_ips.read().await;
|
||||||
active_ips
|
active_ips
|
||||||
.get(username)
|
.get(username)
|
||||||
.map(|ips| ips.contains(&ip))
|
.map(|ips| ips.contains_key(&ip))
|
||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -266,6 +312,26 @@ mod tests {
|
|||||||
assert_eq!(tracker.get_active_ip_count("test_user").await, 2);
|
assert_eq!(tracker.get_active_ip_count("test_user").await, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_active_window_rejects_new_ip_and_keeps_existing_session() {
|
||||||
|
let tracker = UserIpTracker::new();
|
||||||
|
tracker.set_user_limit("test_user", 1).await;
|
||||||
|
tracker
|
||||||
|
.set_limit_policy(UserMaxUniqueIpsMode::ActiveWindow, 30)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let ip1 = test_ipv4(10, 10, 10, 1);
|
||||||
|
let ip2 = test_ipv4(10, 10, 10, 2);
|
||||||
|
|
||||||
|
assert!(tracker.check_and_add("test_user", ip1).await.is_ok());
|
||||||
|
assert!(tracker.is_ip_active("test_user", ip1).await);
|
||||||
|
assert!(tracker.check_and_add("test_user", ip2).await.is_err());
|
||||||
|
|
||||||
|
// Existing session remains active; only new unique IP is denied.
|
||||||
|
assert!(tracker.is_ip_active("test_user", ip1).await);
|
||||||
|
assert_eq!(tracker.get_active_ip_count("test_user").await, 1);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_reconnection_from_same_ip() {
|
async fn test_reconnection_from_same_ip() {
|
||||||
let tracker = UserIpTracker::new();
|
let tracker = UserIpTracker::new();
|
||||||
@@ -278,6 +344,24 @@ mod tests {
|
|||||||
assert_eq!(tracker.get_active_ip_count("test_user").await, 1);
|
assert_eq!(tracker.get_active_ip_count("test_user").await, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_same_ip_disconnect_keeps_active_while_other_session_alive() {
|
||||||
|
let tracker = UserIpTracker::new();
|
||||||
|
tracker.set_user_limit("test_user", 2).await;
|
||||||
|
|
||||||
|
let ip1 = test_ipv4(192, 168, 1, 1);
|
||||||
|
|
||||||
|
assert!(tracker.check_and_add("test_user", ip1).await.is_ok());
|
||||||
|
assert!(tracker.check_and_add("test_user", ip1).await.is_ok());
|
||||||
|
assert_eq!(tracker.get_active_ip_count("test_user").await, 1);
|
||||||
|
|
||||||
|
tracker.remove_ip("test_user", ip1).await;
|
||||||
|
assert_eq!(tracker.get_active_ip_count("test_user").await, 1);
|
||||||
|
|
||||||
|
tracker.remove_ip("test_user", ip1).await;
|
||||||
|
assert_eq!(tracker.get_active_ip_count("test_user").await, 0);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_ip_removal() {
|
async fn test_ip_removal() {
|
||||||
let tracker = UserIpTracker::new();
|
let tracker = UserIpTracker::new();
|
||||||
|
|||||||
573
src/main.rs
573
src/main.rs
@@ -4,7 +4,7 @@
|
|||||||
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
use tokio::signal;
|
use tokio::signal;
|
||||||
@@ -41,8 +41,9 @@ use crate::stats::telemetry::TelemetryPolicy;
|
|||||||
use crate::stats::{ReplayChecker, Stats};
|
use crate::stats::{ReplayChecker, Stats};
|
||||||
use crate::stream::BufferPool;
|
use crate::stream::BufferPool;
|
||||||
use crate::transport::middle_proxy::{
|
use crate::transport::middle_proxy::{
|
||||||
MePool, fetch_proxy_config, run_me_ping, MePingFamily, MePingSample, MeReinitTrigger, format_sample_line,
|
MePool, ProxyConfigData, fetch_proxy_config_with_raw, format_me_route, format_sample_line,
|
||||||
format_me_route,
|
load_proxy_config_cache, run_me_ping, save_proxy_config_cache, MePingFamily, MePingSample,
|
||||||
|
MeReinitTrigger,
|
||||||
};
|
};
|
||||||
use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes};
|
use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes};
|
||||||
use crate::tls_front::TlsFrontCache;
|
use crate::tls_front::TlsFrontCache;
|
||||||
@@ -172,8 +173,191 @@ async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result
|
|||||||
tokio::fs::write(path, payload).await
|
tokio::fs::write(path, payload).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn unit_label(value: u64, singular: &'static str, plural: &'static str) -> &'static str {
|
||||||
|
if value == 1 { singular } else { plural }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn format_uptime(total_secs: u64) -> String {
|
||||||
|
const SECS_PER_MINUTE: u64 = 60;
|
||||||
|
const SECS_PER_HOUR: u64 = 60 * SECS_PER_MINUTE;
|
||||||
|
const SECS_PER_DAY: u64 = 24 * SECS_PER_HOUR;
|
||||||
|
const SECS_PER_MONTH: u64 = 30 * SECS_PER_DAY;
|
||||||
|
const SECS_PER_YEAR: u64 = 12 * SECS_PER_MONTH;
|
||||||
|
|
||||||
|
let mut remaining = total_secs;
|
||||||
|
let years = remaining / SECS_PER_YEAR;
|
||||||
|
remaining %= SECS_PER_YEAR;
|
||||||
|
let months = remaining / SECS_PER_MONTH;
|
||||||
|
remaining %= SECS_PER_MONTH;
|
||||||
|
let days = remaining / SECS_PER_DAY;
|
||||||
|
remaining %= SECS_PER_DAY;
|
||||||
|
let hours = remaining / SECS_PER_HOUR;
|
||||||
|
remaining %= SECS_PER_HOUR;
|
||||||
|
let minutes = remaining / SECS_PER_MINUTE;
|
||||||
|
let seconds = remaining % SECS_PER_MINUTE;
|
||||||
|
|
||||||
|
let mut parts = Vec::new();
|
||||||
|
if total_secs > SECS_PER_YEAR {
|
||||||
|
parts.push(format!(
|
||||||
|
"{} {}",
|
||||||
|
years,
|
||||||
|
unit_label(years, "year", "years")
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if total_secs > SECS_PER_MONTH {
|
||||||
|
parts.push(format!(
|
||||||
|
"{} {}",
|
||||||
|
months,
|
||||||
|
unit_label(months, "month", "months")
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if total_secs > SECS_PER_DAY {
|
||||||
|
parts.push(format!(
|
||||||
|
"{} {}",
|
||||||
|
days,
|
||||||
|
unit_label(days, "day", "days")
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if total_secs > SECS_PER_HOUR {
|
||||||
|
parts.push(format!(
|
||||||
|
"{} {}",
|
||||||
|
hours,
|
||||||
|
unit_label(hours, "hour", "hours")
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if total_secs > SECS_PER_MINUTE {
|
||||||
|
parts.push(format!(
|
||||||
|
"{} {}",
|
||||||
|
minutes,
|
||||||
|
unit_label(minutes, "minute", "minutes")
|
||||||
|
));
|
||||||
|
}
|
||||||
|
parts.push(format!(
|
||||||
|
"{} {}",
|
||||||
|
seconds,
|
||||||
|
unit_label(seconds, "second", "seconds")
|
||||||
|
));
|
||||||
|
|
||||||
|
format!("{} / {} seconds", parts.join(", "), total_secs)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn load_startup_proxy_config_snapshot(
|
||||||
|
url: &str,
|
||||||
|
cache_path: Option<&str>,
|
||||||
|
me2dc_fallback: bool,
|
||||||
|
label: &'static str,
|
||||||
|
) -> Option<ProxyConfigData> {
|
||||||
|
loop {
|
||||||
|
match fetch_proxy_config_with_raw(url).await {
|
||||||
|
Ok((cfg, raw)) => {
|
||||||
|
if !cfg.map.is_empty() {
|
||||||
|
if let Some(path) = cache_path
|
||||||
|
&& let Err(e) = save_proxy_config_cache(path, &raw).await
|
||||||
|
{
|
||||||
|
warn!(error = %e, path, snapshot = label, "Failed to store startup proxy-config cache");
|
||||||
|
}
|
||||||
|
return Some(cfg);
|
||||||
|
}
|
||||||
|
|
||||||
|
warn!(snapshot = label, url, "Startup proxy-config is empty; trying disk cache");
|
||||||
|
if let Some(path) = cache_path {
|
||||||
|
match load_proxy_config_cache(path).await {
|
||||||
|
Ok(cached) if !cached.map.is_empty() => {
|
||||||
|
info!(
|
||||||
|
snapshot = label,
|
||||||
|
path,
|
||||||
|
proxy_for_lines = cached.proxy_for_lines,
|
||||||
|
"Loaded startup proxy-config from disk cache"
|
||||||
|
);
|
||||||
|
return Some(cached);
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
|
warn!(
|
||||||
|
snapshot = label,
|
||||||
|
path,
|
||||||
|
"Startup proxy-config cache is empty; ignoring cache file"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(cache_err) => {
|
||||||
|
debug!(
|
||||||
|
snapshot = label,
|
||||||
|
path,
|
||||||
|
error = %cache_err,
|
||||||
|
"Startup proxy-config cache unavailable"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if me2dc_fallback {
|
||||||
|
error!(
|
||||||
|
snapshot = label,
|
||||||
|
"Startup proxy-config unavailable and no saved config found; falling back to direct mode"
|
||||||
|
);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
warn!(
|
||||||
|
snapshot = label,
|
||||||
|
retry_in_secs = 2,
|
||||||
|
"Startup proxy-config unavailable and no saved config found; retrying because me2dc_fallback=false"
|
||||||
|
);
|
||||||
|
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||||
|
}
|
||||||
|
Err(fetch_err) => {
|
||||||
|
if let Some(path) = cache_path {
|
||||||
|
match load_proxy_config_cache(path).await {
|
||||||
|
Ok(cached) if !cached.map.is_empty() => {
|
||||||
|
info!(
|
||||||
|
snapshot = label,
|
||||||
|
path,
|
||||||
|
proxy_for_lines = cached.proxy_for_lines,
|
||||||
|
"Loaded startup proxy-config from disk cache"
|
||||||
|
);
|
||||||
|
return Some(cached);
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
|
warn!(
|
||||||
|
snapshot = label,
|
||||||
|
path,
|
||||||
|
"Startup proxy-config cache is empty; ignoring cache file"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(cache_err) => {
|
||||||
|
debug!(
|
||||||
|
snapshot = label,
|
||||||
|
path,
|
||||||
|
error = %cache_err,
|
||||||
|
"Startup proxy-config cache unavailable"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if me2dc_fallback {
|
||||||
|
error!(
|
||||||
|
snapshot = label,
|
||||||
|
error = %fetch_err,
|
||||||
|
"Startup proxy-config unavailable and no cached data; falling back to direct mode"
|
||||||
|
);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
warn!(
|
||||||
|
snapshot = label,
|
||||||
|
error = %fetch_err,
|
||||||
|
retry_in_secs = 2,
|
||||||
|
"Startup proxy-config unavailable; retrying because me2dc_fallback=false"
|
||||||
|
);
|
||||||
|
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let process_started_at = Instant::now();
|
||||||
let (config_path, cli_silent, cli_log_level) = parse_cli();
|
let (config_path, cli_silent, cli_log_level) = parse_cli();
|
||||||
|
|
||||||
let mut config = match ProxyConfig::load(&config_path) {
|
let mut config = match ProxyConfig::load(&config_path) {
|
||||||
@@ -445,6 +629,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
let me2dc_fallback = config.general.me2dc_fallback;
|
let me2dc_fallback = config.general.me2dc_fallback;
|
||||||
let me_init_retry_attempts = config.general.me_init_retry_attempts;
|
let me_init_retry_attempts = config.general.me_init_retry_attempts;
|
||||||
|
let me_init_warn_after_attempts: u32 = 3;
|
||||||
if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me {
|
if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me {
|
||||||
if me2dc_fallback {
|
if me2dc_fallback {
|
||||||
warn!("No usable IP family for Middle Proxy detected; falling back to direct DC");
|
warn!("No usable IP family for Middle Proxy detected; falling back to direct DC");
|
||||||
@@ -484,189 +669,199 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
// =============================================================
|
// =============================================================
|
||||||
let proxy_secret_path = config.general.proxy_secret_path.as_deref();
|
let proxy_secret_path = config.general.proxy_secret_path.as_deref();
|
||||||
let pool_size = config.general.middle_proxy_pool_size.max(1);
|
let pool_size = config.general.middle_proxy_pool_size.max(1);
|
||||||
let mut init_attempt: u32 = 0;
|
let proxy_secret = loop {
|
||||||
loop {
|
match crate::transport::middle_proxy::fetch_proxy_secret(
|
||||||
init_attempt = init_attempt.saturating_add(1);
|
|
||||||
|
|
||||||
let proxy_secret = match crate::transport::middle_proxy::fetch_proxy_secret(
|
|
||||||
proxy_secret_path,
|
proxy_secret_path,
|
||||||
config.general.proxy_secret_len_max,
|
config.general.proxy_secret_len_max,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(proxy_secret) => proxy_secret,
|
Ok(proxy_secret) => break Some(proxy_secret),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let retries_limited = me2dc_fallback && me_init_retry_attempts > 0;
|
if me2dc_fallback {
|
||||||
if retries_limited && init_attempt >= me_init_retry_attempts {
|
|
||||||
error!(
|
error!(
|
||||||
error = %e,
|
error = %e,
|
||||||
attempt = init_attempt,
|
"ME startup failed: proxy-secret is unavailable and no saved secret found; falling back to direct mode"
|
||||||
retry_limit = me_init_retry_attempts,
|
|
||||||
"ME startup retries exhausted while loading proxy-secret; falling back to direct mode"
|
|
||||||
);
|
);
|
||||||
break None;
|
break None;
|
||||||
}
|
}
|
||||||
|
|
||||||
warn!(
|
warn!(
|
||||||
error = %e,
|
error = %e,
|
||||||
attempt = init_attempt,
|
|
||||||
retry_limit = if me_init_retry_attempts == 0 {
|
|
||||||
String::from("unlimited")
|
|
||||||
} else {
|
|
||||||
me_init_retry_attempts.to_string()
|
|
||||||
},
|
|
||||||
me2dc_fallback = me2dc_fallback,
|
|
||||||
retry_in_secs = 2,
|
retry_in_secs = 2,
|
||||||
"Failed to fetch proxy-secret; retrying ME startup"
|
"ME startup failed: proxy-secret is unavailable and no saved secret found; retrying because me2dc_fallback=false"
|
||||||
);
|
);
|
||||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
info!(
|
|
||||||
secret_len = proxy_secret.len(),
|
|
||||||
key_sig = format_args!(
|
|
||||||
"0x{:08x}",
|
|
||||||
if proxy_secret.len() >= 4 {
|
|
||||||
u32::from_le_bytes([
|
|
||||||
proxy_secret[0],
|
|
||||||
proxy_secret[1],
|
|
||||||
proxy_secret[2],
|
|
||||||
proxy_secret[3],
|
|
||||||
])
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
),
|
|
||||||
"Proxy-secret loaded"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Load ME config (v4/v6) + default DC
|
|
||||||
let mut cfg_v4 = fetch_proxy_config(
|
|
||||||
"https://core.telegram.org/getProxyConfig",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap_or_default();
|
|
||||||
let mut cfg_v6 = fetch_proxy_config(
|
|
||||||
"https://core.telegram.org/getProxyConfigV6",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
if cfg_v4.map.is_empty() {
|
|
||||||
cfg_v4.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V4.clone();
|
|
||||||
}
|
|
||||||
if cfg_v6.map.is_empty() {
|
|
||||||
cfg_v6.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V6.clone();
|
|
||||||
}
|
|
||||||
|
|
||||||
let pool = MePool::new(
|
|
||||||
proxy_tag.clone(),
|
|
||||||
proxy_secret,
|
|
||||||
config.general.middle_proxy_nat_ip,
|
|
||||||
me_nat_probe,
|
|
||||||
None,
|
|
||||||
config.network.stun_servers.clone(),
|
|
||||||
config.general.stun_nat_probe_concurrency,
|
|
||||||
probe.detected_ipv6,
|
|
||||||
config.timeouts.me_one_retry,
|
|
||||||
config.timeouts.me_one_timeout_ms,
|
|
||||||
cfg_v4.map.clone(),
|
|
||||||
cfg_v6.map.clone(),
|
|
||||||
cfg_v4.default_dc.or(cfg_v6.default_dc),
|
|
||||||
decision.clone(),
|
|
||||||
Some(upstream_manager.clone()),
|
|
||||||
rng.clone(),
|
|
||||||
stats.clone(),
|
|
||||||
config.general.me_keepalive_enabled,
|
|
||||||
config.general.me_keepalive_interval_secs,
|
|
||||||
config.general.me_keepalive_jitter_secs,
|
|
||||||
config.general.me_keepalive_payload_random,
|
|
||||||
config.general.rpc_proxy_req_every,
|
|
||||||
config.general.me_warmup_stagger_enabled,
|
|
||||||
config.general.me_warmup_step_delay_ms,
|
|
||||||
config.general.me_warmup_step_jitter_ms,
|
|
||||||
config.general.me_reconnect_max_concurrent_per_dc,
|
|
||||||
config.general.me_reconnect_backoff_base_ms,
|
|
||||||
config.general.me_reconnect_backoff_cap_ms,
|
|
||||||
config.general.me_reconnect_fast_retry_count,
|
|
||||||
config.general.me_single_endpoint_shadow_writers,
|
|
||||||
config.general.me_single_endpoint_outage_mode_enabled,
|
|
||||||
config.general.me_single_endpoint_outage_disable_quarantine,
|
|
||||||
config.general.me_single_endpoint_outage_backoff_min_ms,
|
|
||||||
config.general.me_single_endpoint_outage_backoff_max_ms,
|
|
||||||
config.general.me_single_endpoint_shadow_rotate_every_secs,
|
|
||||||
config.general.me_floor_mode,
|
|
||||||
config.general.me_adaptive_floor_idle_secs,
|
|
||||||
config.general.me_adaptive_floor_min_writers_single_endpoint,
|
|
||||||
config.general.me_adaptive_floor_recover_grace_secs,
|
|
||||||
config.general.hardswap,
|
|
||||||
config.general.me_pool_drain_ttl_secs,
|
|
||||||
config.general.effective_me_pool_force_close_secs(),
|
|
||||||
config.general.me_pool_min_fresh_ratio,
|
|
||||||
config.general.me_hardswap_warmup_delay_min_ms,
|
|
||||||
config.general.me_hardswap_warmup_delay_max_ms,
|
|
||||||
config.general.me_hardswap_warmup_extra_passes,
|
|
||||||
config.general.me_hardswap_warmup_pass_backoff_base_ms,
|
|
||||||
config.general.me_bind_stale_mode,
|
|
||||||
config.general.me_bind_stale_ttl_secs,
|
|
||||||
config.general.me_secret_atomic_snapshot,
|
|
||||||
config.general.me_deterministic_writer_sort,
|
|
||||||
config.general.me_socks_kdf_policy,
|
|
||||||
config.general.me_route_backpressure_base_timeout_ms,
|
|
||||||
config.general.me_route_backpressure_high_timeout_ms,
|
|
||||||
config.general.me_route_backpressure_high_watermark_pct,
|
|
||||||
);
|
|
||||||
|
|
||||||
match pool.init(pool_size, &rng).await {
|
|
||||||
Ok(()) => {
|
|
||||||
info!(
|
|
||||||
attempt = init_attempt,
|
|
||||||
"Middle-End pool initialized successfully"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Phase 4: Start health monitor
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let rng_clone = rng.clone();
|
|
||||||
let min_conns = pool_size;
|
|
||||||
tokio::spawn(async move {
|
|
||||||
crate::transport::middle_proxy::me_health_monitor(
|
|
||||||
pool_clone, rng_clone, min_conns,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
});
|
|
||||||
|
|
||||||
break Some(pool);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
let retries_limited = me2dc_fallback && me_init_retry_attempts > 0;
|
|
||||||
if retries_limited && init_attempt >= me_init_retry_attempts {
|
|
||||||
error!(
|
|
||||||
error = %e,
|
|
||||||
attempt = init_attempt,
|
|
||||||
retry_limit = me_init_retry_attempts,
|
|
||||||
"ME pool init retries exhausted; falling back to direct mode"
|
|
||||||
);
|
|
||||||
break None;
|
|
||||||
}
|
|
||||||
|
|
||||||
warn!(
|
|
||||||
error = %e,
|
|
||||||
attempt = init_attempt,
|
|
||||||
retry_limit = if me_init_retry_attempts == 0 {
|
|
||||||
String::from("unlimited")
|
|
||||||
} else {
|
|
||||||
me_init_retry_attempts.to_string()
|
|
||||||
},
|
|
||||||
me2dc_fallback = me2dc_fallback,
|
|
||||||
retry_in_secs = 2,
|
|
||||||
"ME pool is not ready yet; retrying startup initialization"
|
|
||||||
);
|
|
||||||
pool.reset_stun_state();
|
|
||||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
match proxy_secret {
|
||||||
|
Some(proxy_secret) => {
|
||||||
|
info!(
|
||||||
|
secret_len = proxy_secret.len(),
|
||||||
|
key_sig = format_args!(
|
||||||
|
"0x{:08x}",
|
||||||
|
if proxy_secret.len() >= 4 {
|
||||||
|
u32::from_le_bytes([
|
||||||
|
proxy_secret[0],
|
||||||
|
proxy_secret[1],
|
||||||
|
proxy_secret[2],
|
||||||
|
proxy_secret[3],
|
||||||
|
])
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
),
|
||||||
|
"Proxy-secret loaded"
|
||||||
|
);
|
||||||
|
|
||||||
|
let cfg_v4 = load_startup_proxy_config_snapshot(
|
||||||
|
"https://core.telegram.org/getProxyConfig",
|
||||||
|
config.general.proxy_config_v4_cache_path.as_deref(),
|
||||||
|
me2dc_fallback,
|
||||||
|
"getProxyConfig",
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let cfg_v6 = load_startup_proxy_config_snapshot(
|
||||||
|
"https://core.telegram.org/getProxyConfigV6",
|
||||||
|
config.general.proxy_config_v6_cache_path.as_deref(),
|
||||||
|
me2dc_fallback,
|
||||||
|
"getProxyConfigV6",
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
if let (Some(cfg_v4), Some(cfg_v6)) = (cfg_v4, cfg_v6) {
|
||||||
|
let pool = MePool::new(
|
||||||
|
proxy_tag.clone(),
|
||||||
|
proxy_secret,
|
||||||
|
config.general.middle_proxy_nat_ip,
|
||||||
|
me_nat_probe,
|
||||||
|
None,
|
||||||
|
config.network.stun_servers.clone(),
|
||||||
|
config.general.stun_nat_probe_concurrency,
|
||||||
|
probe.detected_ipv6,
|
||||||
|
config.timeouts.me_one_retry,
|
||||||
|
config.timeouts.me_one_timeout_ms,
|
||||||
|
cfg_v4.map.clone(),
|
||||||
|
cfg_v6.map.clone(),
|
||||||
|
cfg_v4.default_dc.or(cfg_v6.default_dc),
|
||||||
|
decision.clone(),
|
||||||
|
Some(upstream_manager.clone()),
|
||||||
|
rng.clone(),
|
||||||
|
stats.clone(),
|
||||||
|
config.general.me_keepalive_enabled,
|
||||||
|
config.general.me_keepalive_interval_secs,
|
||||||
|
config.general.me_keepalive_jitter_secs,
|
||||||
|
config.general.me_keepalive_payload_random,
|
||||||
|
config.general.rpc_proxy_req_every,
|
||||||
|
config.general.me_warmup_stagger_enabled,
|
||||||
|
config.general.me_warmup_step_delay_ms,
|
||||||
|
config.general.me_warmup_step_jitter_ms,
|
||||||
|
config.general.me_reconnect_max_concurrent_per_dc,
|
||||||
|
config.general.me_reconnect_backoff_base_ms,
|
||||||
|
config.general.me_reconnect_backoff_cap_ms,
|
||||||
|
config.general.me_reconnect_fast_retry_count,
|
||||||
|
config.general.me_single_endpoint_shadow_writers,
|
||||||
|
config.general.me_single_endpoint_outage_mode_enabled,
|
||||||
|
config.general.me_single_endpoint_outage_disable_quarantine,
|
||||||
|
config.general.me_single_endpoint_outage_backoff_min_ms,
|
||||||
|
config.general.me_single_endpoint_outage_backoff_max_ms,
|
||||||
|
config.general.me_single_endpoint_shadow_rotate_every_secs,
|
||||||
|
config.general.me_floor_mode,
|
||||||
|
config.general.me_adaptive_floor_idle_secs,
|
||||||
|
config.general.me_adaptive_floor_min_writers_single_endpoint,
|
||||||
|
config.general.me_adaptive_floor_recover_grace_secs,
|
||||||
|
config.general.hardswap,
|
||||||
|
config.general.me_pool_drain_ttl_secs,
|
||||||
|
config.general.effective_me_pool_force_close_secs(),
|
||||||
|
config.general.me_pool_min_fresh_ratio,
|
||||||
|
config.general.me_hardswap_warmup_delay_min_ms,
|
||||||
|
config.general.me_hardswap_warmup_delay_max_ms,
|
||||||
|
config.general.me_hardswap_warmup_extra_passes,
|
||||||
|
config.general.me_hardswap_warmup_pass_backoff_base_ms,
|
||||||
|
config.general.me_bind_stale_mode,
|
||||||
|
config.general.me_bind_stale_ttl_secs,
|
||||||
|
config.general.me_secret_atomic_snapshot,
|
||||||
|
config.general.me_deterministic_writer_sort,
|
||||||
|
config.general.me_socks_kdf_policy,
|
||||||
|
config.general.me_route_backpressure_base_timeout_ms,
|
||||||
|
config.general.me_route_backpressure_high_timeout_ms,
|
||||||
|
config.general.me_route_backpressure_high_watermark_pct,
|
||||||
|
config.general.me_route_no_writer_mode,
|
||||||
|
config.general.me_route_no_writer_wait_ms,
|
||||||
|
config.general.me_route_inline_recovery_attempts,
|
||||||
|
config.general.me_route_inline_recovery_wait_ms,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut init_attempt: u32 = 0;
|
||||||
|
loop {
|
||||||
|
init_attempt = init_attempt.saturating_add(1);
|
||||||
|
match pool.init(pool_size, &rng).await {
|
||||||
|
Ok(()) => {
|
||||||
|
info!(
|
||||||
|
attempt = init_attempt,
|
||||||
|
"Middle-End pool initialized successfully"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Phase 4: Start health monitor
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
let rng_clone = rng.clone();
|
||||||
|
let min_conns = pool_size;
|
||||||
|
tokio::spawn(async move {
|
||||||
|
crate::transport::middle_proxy::me_health_monitor(
|
||||||
|
pool_clone, rng_clone, min_conns,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
});
|
||||||
|
|
||||||
|
break Some(pool);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let retries_limited = me2dc_fallback && me_init_retry_attempts > 0;
|
||||||
|
if retries_limited && init_attempt >= me_init_retry_attempts {
|
||||||
|
error!(
|
||||||
|
error = %e,
|
||||||
|
attempt = init_attempt,
|
||||||
|
retry_limit = me_init_retry_attempts,
|
||||||
|
"ME pool init retries exhausted; falling back to direct mode"
|
||||||
|
);
|
||||||
|
break None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let retry_limit = if !me2dc_fallback || me_init_retry_attempts == 0 {
|
||||||
|
String::from("unlimited")
|
||||||
|
} else {
|
||||||
|
me_init_retry_attempts.to_string()
|
||||||
|
};
|
||||||
|
if init_attempt >= me_init_warn_after_attempts {
|
||||||
|
warn!(
|
||||||
|
error = %e,
|
||||||
|
attempt = init_attempt,
|
||||||
|
retry_limit = retry_limit,
|
||||||
|
me2dc_fallback = me2dc_fallback,
|
||||||
|
retry_in_secs = 2,
|
||||||
|
"ME pool is not ready yet; retrying startup initialization"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
error = %e,
|
||||||
|
attempt = init_attempt,
|
||||||
|
retry_limit = retry_limit,
|
||||||
|
me2dc_fallback = me2dc_fallback,
|
||||||
|
retry_in_secs = 2,
|
||||||
|
"ME pool startup warmup: retrying initialization"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
pool.reset_stun_state();
|
||||||
|
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
@@ -847,6 +1042,19 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let initialized_secs = process_started_at.elapsed().as_secs();
|
||||||
|
let second_suffix = if initialized_secs == 1 { "" } else { "s" };
|
||||||
|
info!("===================== Telegram Startup =====================");
|
||||||
|
info!(
|
||||||
|
" DC/ME Initialized in {} second{}",
|
||||||
|
initialized_secs, second_suffix
|
||||||
|
);
|
||||||
|
info!("============================================================");
|
||||||
|
|
||||||
|
if let Some(ref pool) = me_pool {
|
||||||
|
pool.set_runtime_ready(true);
|
||||||
|
}
|
||||||
|
|
||||||
// Background tasks
|
// Background tasks
|
||||||
let um_clone = upstream_manager.clone();
|
let um_clone = upstream_manager.clone();
|
||||||
let decision_clone = decision.clone();
|
let decision_clone = decision.clone();
|
||||||
@@ -1400,7 +1608,36 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
match signal::ctrl_c().await {
|
match signal::ctrl_c().await {
|
||||||
Ok(()) => info!("Shutting down..."),
|
Ok(()) => {
|
||||||
|
let shutdown_started_at = Instant::now();
|
||||||
|
info!("Shutting down...");
|
||||||
|
let uptime_secs = process_started_at.elapsed().as_secs();
|
||||||
|
info!("Uptime: {}", format_uptime(uptime_secs));
|
||||||
|
if let Some(pool) = &me_pool {
|
||||||
|
match tokio::time::timeout(
|
||||||
|
Duration::from_secs(2),
|
||||||
|
pool.shutdown_send_close_conn_all(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(total) => {
|
||||||
|
info!(
|
||||||
|
close_conn_sent = total,
|
||||||
|
"ME shutdown: RPC_CLOSE_CONN broadcast completed"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
warn!("ME shutdown: RPC_CLOSE_CONN broadcast timed out");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let shutdown_secs = shutdown_started_at.elapsed().as_secs();
|
||||||
|
info!(
|
||||||
|
"Shutdown completed successfully in {} {}.",
|
||||||
|
shutdown_secs,
|
||||||
|
unit_label(shutdown_secs, "second", "seconds")
|
||||||
|
);
|
||||||
|
}
|
||||||
Err(e) => error!("Signal error: {}", e),
|
Err(e) => error!("Signal error: {}", e),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1199,6 +1199,48 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||||||
0
|
0
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_no_writer_failfast_total ME route failfast errors due to missing writer in bounded wait window"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_no_writer_failfast_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_no_writer_failfast_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_no_writer_failfast_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_async_recovery_trigger_total Async ME recovery trigger attempts from route path"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_async_recovery_trigger_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_async_recovery_trigger_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_async_recovery_trigger_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_inline_recovery_total Legacy inline ME recovery attempts from route path"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_inline_recovery_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_inline_recovery_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_inline_recovery_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
let unresolved_writer_losses = if me_allows_normal {
|
let unresolved_writer_losses = if me_allows_normal {
|
||||||
stats
|
stats
|
||||||
@@ -1237,6 +1279,29 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||||||
let _ = writeln!(out, "# TYPE telemt_user_msgs_from_client counter");
|
let _ = writeln!(out, "# TYPE telemt_user_msgs_from_client counter");
|
||||||
let _ = writeln!(out, "# HELP telemt_user_msgs_to_client Per-user messages sent");
|
let _ = writeln!(out, "# HELP telemt_user_msgs_to_client Per-user messages sent");
|
||||||
let _ = writeln!(out, "# TYPE telemt_user_msgs_to_client counter");
|
let _ = writeln!(out, "# TYPE telemt_user_msgs_to_client counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_ip_reservation_rollback_total IP reservation rollbacks caused by later limit checks"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_ip_reservation_rollback_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_ip_reservation_rollback_total{{reason=\"tcp_limit\"}} {}",
|
||||||
|
if core_enabled {
|
||||||
|
stats.get_ip_reservation_rollback_tcp_limit_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_ip_reservation_rollback_total{{reason=\"quota_limit\"}} {}",
|
||||||
|
if core_enabled {
|
||||||
|
stats.get_ip_reservation_rollback_quota_limit_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
out,
|
out,
|
||||||
"# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag"
|
"# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag"
|
||||||
@@ -1267,11 +1332,21 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let mut unique_users = BTreeSet::new();
|
let mut unique_users = BTreeSet::new();
|
||||||
|
unique_users.extend(config.access.users.keys().cloned());
|
||||||
unique_users.extend(config.access.user_max_unique_ips.keys().cloned());
|
unique_users.extend(config.access.user_max_unique_ips.keys().cloned());
|
||||||
unique_users.extend(ip_counts.keys().cloned());
|
unique_users.extend(ip_counts.keys().cloned());
|
||||||
|
let unique_users_vec: Vec<String> = unique_users.iter().cloned().collect();
|
||||||
|
let recent_counts = ip_tracker
|
||||||
|
.get_recent_counts_for_users(&unique_users_vec)
|
||||||
|
.await;
|
||||||
|
|
||||||
let _ = writeln!(out, "# HELP telemt_user_unique_ips_current Per-user current number of unique active IPs");
|
let _ = writeln!(out, "# HELP telemt_user_unique_ips_current Per-user current number of unique active IPs");
|
||||||
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_current gauge");
|
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_current gauge");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_user_unique_ips_recent_window Per-user unique IPs seen in configured observation window"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_recent_window gauge");
|
||||||
let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)");
|
let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)");
|
||||||
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge");
|
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge");
|
||||||
let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)");
|
let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)");
|
||||||
@@ -1286,6 +1361,12 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||||||
0.0
|
0.0
|
||||||
};
|
};
|
||||||
let _ = writeln!(out, "telemt_user_unique_ips_current{{user=\"{}\"}} {}", user, current);
|
let _ = writeln!(out, "telemt_user_unique_ips_current{{user=\"{}\"}} {}", user, current);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_user_unique_ips_recent_window{{user=\"{}\"}} {}",
|
||||||
|
user,
|
||||||
|
recent_counts.get(&user).copied().unwrap_or(0)
|
||||||
|
);
|
||||||
let _ = writeln!(out, "telemt_user_unique_ips_limit{{user=\"{}\"}} {}", user, limit);
|
let _ = writeln!(out, "telemt_user_unique_ips_limit{{user=\"{}\"}} {}", user, limit);
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
out,
|
out,
|
||||||
@@ -1378,6 +1459,7 @@ mod tests {
|
|||||||
assert!(output.contains("telemt_user_msgs_from_client{user=\"alice\"} 1"));
|
assert!(output.contains("telemt_user_msgs_from_client{user=\"alice\"} 1"));
|
||||||
assert!(output.contains("telemt_user_msgs_to_client{user=\"alice\"} 2"));
|
assert!(output.contains("telemt_user_msgs_to_client{user=\"alice\"} 2"));
|
||||||
assert!(output.contains("telemt_user_unique_ips_current{user=\"alice\"} 1"));
|
assert!(output.contains("telemt_user_unique_ips_current{user=\"alice\"} 1"));
|
||||||
|
assert!(output.contains("telemt_user_unique_ips_recent_window{user=\"alice\"} 1"));
|
||||||
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 4"));
|
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 4"));
|
||||||
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.250000"));
|
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.250000"));
|
||||||
}
|
}
|
||||||
@@ -1391,7 +1473,8 @@ mod tests {
|
|||||||
assert!(output.contains("telemt_connections_total 0"));
|
assert!(output.contains("telemt_connections_total 0"));
|
||||||
assert!(output.contains("telemt_connections_bad_total 0"));
|
assert!(output.contains("telemt_connections_bad_total 0"));
|
||||||
assert!(output.contains("telemt_handshake_timeouts_total 0"));
|
assert!(output.contains("telemt_handshake_timeouts_total 0"));
|
||||||
assert!(!output.contains("user="));
|
assert!(output.contains("telemt_user_unique_ips_current{user="));
|
||||||
|
assert!(output.contains("telemt_user_unique_ips_recent_window{user="));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -1412,6 +1495,7 @@ mod tests {
|
|||||||
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
|
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
|
||||||
));
|
));
|
||||||
assert!(output.contains("# TYPE telemt_user_unique_ips_current gauge"));
|
assert!(output.contains("# TYPE telemt_user_unique_ips_current gauge"));
|
||||||
|
assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge"));
|
||||||
assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge"));
|
assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge"));
|
||||||
assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge"));
|
assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge"));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -672,42 +672,16 @@ impl RunningClientHandler {
|
|||||||
R: AsyncRead + Unpin + Send + 'static,
|
R: AsyncRead + Unpin + Send + 'static,
|
||||||
W: AsyncWrite + Unpin + Send + 'static,
|
W: AsyncWrite + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
let user = &success.user;
|
let user = success.user.clone();
|
||||||
|
|
||||||
if let Err(e) = Self::check_user_limits_static(user, &config, &stats, peer_addr, &ip_tracker).await {
|
if let Err(e) = Self::check_user_limits_static(&user, &config, &stats, peer_addr, &ip_tracker).await {
|
||||||
warn!(user = %user, error = %e, "User limit exceeded");
|
warn!(user = %user, error = %e, "User limit exceeded");
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// IP Cleanup Guard: автоматически удаляет IP при выходе из scope
|
let relay_result = if config.general.use_middle_proxy {
|
||||||
struct IpCleanupGuard {
|
|
||||||
tracker: Arc<UserIpTracker>,
|
|
||||||
user: String,
|
|
||||||
ip: std::net::IpAddr,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for IpCleanupGuard {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
let tracker = self.tracker.clone();
|
|
||||||
let user = self.user.clone();
|
|
||||||
let ip = self.ip;
|
|
||||||
tokio::spawn(async move {
|
|
||||||
tracker.remove_ip(&user, ip).await;
|
|
||||||
debug!(user = %user, ip = %ip, "IP cleaned up on disconnect");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let _cleanup = IpCleanupGuard {
|
|
||||||
tracker: ip_tracker,
|
|
||||||
user: user.clone(),
|
|
||||||
ip: peer_addr.ip(),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Decide: middle proxy or direct
|
|
||||||
if config.general.use_middle_proxy {
|
|
||||||
if let Some(ref pool) = me_pool {
|
if let Some(ref pool) = me_pool {
|
||||||
return handle_via_middle_proxy(
|
handle_via_middle_proxy(
|
||||||
client_reader,
|
client_reader,
|
||||||
client_writer,
|
client_writer,
|
||||||
success,
|
success,
|
||||||
@@ -718,23 +692,38 @@ impl RunningClientHandler {
|
|||||||
local_addr,
|
local_addr,
|
||||||
rng,
|
rng,
|
||||||
)
|
)
|
||||||
.await;
|
.await
|
||||||
|
} else {
|
||||||
|
warn!("use_middle_proxy=true but MePool not initialized, falling back to direct");
|
||||||
|
handle_via_direct(
|
||||||
|
client_reader,
|
||||||
|
client_writer,
|
||||||
|
success,
|
||||||
|
upstream_manager,
|
||||||
|
stats,
|
||||||
|
config,
|
||||||
|
buffer_pool,
|
||||||
|
rng,
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
warn!("use_middle_proxy=true but MePool not initialized, falling back to direct");
|
} else {
|
||||||
}
|
// Direct mode (original behavior)
|
||||||
|
handle_via_direct(
|
||||||
|
client_reader,
|
||||||
|
client_writer,
|
||||||
|
success,
|
||||||
|
upstream_manager,
|
||||||
|
stats,
|
||||||
|
config,
|
||||||
|
buffer_pool,
|
||||||
|
rng,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
};
|
||||||
|
|
||||||
// Direct mode (original behavior)
|
ip_tracker.remove_ip(&user, peer_addr.ip()).await;
|
||||||
handle_via_direct(
|
relay_result
|
||||||
client_reader,
|
|
||||||
client_writer,
|
|
||||||
success,
|
|
||||||
upstream_manager,
|
|
||||||
stats,
|
|
||||||
config,
|
|
||||||
buffer_pool,
|
|
||||||
rng,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn check_user_limits_static(
|
async fn check_user_limits_static(
|
||||||
@@ -752,22 +741,32 @@ impl RunningClientHandler {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut ip_reserved = false;
|
||||||
// IP limit check
|
// IP limit check
|
||||||
if let Err(reason) = ip_tracker.check_and_add(user, peer_addr.ip()).await {
|
match ip_tracker.check_and_add(user, peer_addr.ip()).await {
|
||||||
warn!(
|
Ok(()) => {
|
||||||
user = %user,
|
ip_reserved = true;
|
||||||
ip = %peer_addr.ip(),
|
}
|
||||||
reason = %reason,
|
Err(reason) => {
|
||||||
"IP limit exceeded"
|
warn!(
|
||||||
);
|
user = %user,
|
||||||
return Err(ProxyError::ConnectionLimitExceeded {
|
ip = %peer_addr.ip(),
|
||||||
user: user.to_string(),
|
reason = %reason,
|
||||||
});
|
"IP limit exceeded"
|
||||||
|
);
|
||||||
|
return Err(ProxyError::ConnectionLimitExceeded {
|
||||||
|
user: user.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(limit) = config.access.user_max_tcp_conns.get(user)
|
if let Some(limit) = config.access.user_max_tcp_conns.get(user)
|
||||||
&& stats.get_user_curr_connects(user) >= *limit as u64
|
&& stats.get_user_curr_connects(user) >= *limit as u64
|
||||||
{
|
{
|
||||||
|
if ip_reserved {
|
||||||
|
ip_tracker.remove_ip(user, peer_addr.ip()).await;
|
||||||
|
stats.increment_ip_reservation_rollback_tcp_limit_total();
|
||||||
|
}
|
||||||
return Err(ProxyError::ConnectionLimitExceeded {
|
return Err(ProxyError::ConnectionLimitExceeded {
|
||||||
user: user.to_string(),
|
user: user.to_string(),
|
||||||
});
|
});
|
||||||
@@ -776,6 +775,10 @@ impl RunningClientHandler {
|
|||||||
if let Some(quota) = config.access.user_data_quota.get(user)
|
if let Some(quota) = config.access.user_data_quota.get(user)
|
||||||
&& stats.get_user_total_octets(user) >= *quota
|
&& stats.get_user_total_octets(user) >= *quota
|
||||||
{
|
{
|
||||||
|
if ip_reserved {
|
||||||
|
ip_tracker.remove_ip(user, peer_addr.ip()).await;
|
||||||
|
stats.increment_ip_reservation_rollback_quota_limit_total();
|
||||||
|
}
|
||||||
return Err(ProxyError::DataQuotaExceeded {
|
return Err(ProxyError::DataQuotaExceeded {
|
||||||
user: user.to_string(),
|
user: user.to_string(),
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -118,10 +118,16 @@ fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> {
|
|||||||
// Unknown DC requested by client without override: log and fall back.
|
// Unknown DC requested by client without override: log and fall back.
|
||||||
if !config.dc_overrides.contains_key(&dc_key) {
|
if !config.dc_overrides.contains_key(&dc_key) {
|
||||||
warn!(dc_idx = dc_idx, "Requested non-standard DC with no override; falling back to default cluster");
|
warn!(dc_idx = dc_idx, "Requested non-standard DC with no override; falling back to default cluster");
|
||||||
if let Some(path) = &config.general.unknown_dc_log_path
|
if config.general.unknown_dc_file_log_enabled
|
||||||
&& let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path)
|
&& let Some(path) = &config.general.unknown_dc_log_path
|
||||||
|
&& let Ok(handle) = tokio::runtime::Handle::try_current()
|
||||||
{
|
{
|
||||||
let _ = writeln!(file, "dc_idx={dc_idx}");
|
let path = path.clone();
|
||||||
|
handle.spawn_blocking(move || {
|
||||||
|
if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) {
|
||||||
|
let _ = writeln!(file, "dc_idx={dc_idx}");
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -100,6 +100,11 @@ pub struct Stats {
|
|||||||
me_refill_failed_total: AtomicU64,
|
me_refill_failed_total: AtomicU64,
|
||||||
me_writer_restored_same_endpoint_total: AtomicU64,
|
me_writer_restored_same_endpoint_total: AtomicU64,
|
||||||
me_writer_restored_fallback_total: AtomicU64,
|
me_writer_restored_fallback_total: AtomicU64,
|
||||||
|
me_no_writer_failfast_total: AtomicU64,
|
||||||
|
me_async_recovery_trigger_total: AtomicU64,
|
||||||
|
me_inline_recovery_total: AtomicU64,
|
||||||
|
ip_reservation_rollback_tcp_limit_total: AtomicU64,
|
||||||
|
ip_reservation_rollback_quota_limit_total: AtomicU64,
|
||||||
telemetry_core_enabled: AtomicBool,
|
telemetry_core_enabled: AtomicBool,
|
||||||
telemetry_user_enabled: AtomicBool,
|
telemetry_user_enabled: AtomicBool,
|
||||||
telemetry_me_level: AtomicU8,
|
telemetry_me_level: AtomicU8,
|
||||||
@@ -522,6 +527,34 @@ impl Stats {
|
|||||||
.fetch_add(1, Ordering::Relaxed);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub fn increment_me_no_writer_failfast_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_no_writer_failfast_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_async_recovery_trigger_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_async_recovery_trigger_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_inline_recovery_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_inline_recovery_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_ip_reservation_rollback_tcp_limit_total(&self) {
|
||||||
|
if self.telemetry_core_enabled() {
|
||||||
|
self.ip_reservation_rollback_tcp_limit_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_ip_reservation_rollback_quota_limit_total(&self) {
|
||||||
|
if self.telemetry_core_enabled() {
|
||||||
|
self.ip_reservation_rollback_quota_limit_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
pub fn increment_me_endpoint_quarantine_total(&self) {
|
pub fn increment_me_endpoint_quarantine_total(&self) {
|
||||||
if self.telemetry_me_allows_normal() {
|
if self.telemetry_me_allows_normal() {
|
||||||
self.me_endpoint_quarantine_total
|
self.me_endpoint_quarantine_total
|
||||||
@@ -791,6 +824,23 @@ impl Stats {
|
|||||||
pub fn get_me_writer_restored_fallback_total(&self) -> u64 {
|
pub fn get_me_writer_restored_fallback_total(&self) -> u64 {
|
||||||
self.me_writer_restored_fallback_total.load(Ordering::Relaxed)
|
self.me_writer_restored_fallback_total.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
pub fn get_me_no_writer_failfast_total(&self) -> u64 {
|
||||||
|
self.me_no_writer_failfast_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_async_recovery_trigger_total(&self) -> u64 {
|
||||||
|
self.me_async_recovery_trigger_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_inline_recovery_total(&self) -> u64 {
|
||||||
|
self.me_inline_recovery_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_ip_reservation_rollback_tcp_limit_total(&self) -> u64 {
|
||||||
|
self.ip_reservation_rollback_tcp_limit_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_ip_reservation_rollback_quota_limit_total(&self) -> u64 {
|
||||||
|
self.ip_reservation_rollback_quota_limit_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn increment_user_connects(&self, user: &str) {
|
pub fn increment_user_connects(&self, user: &str) {
|
||||||
if !self.telemetry_user_enabled() {
|
if !self.telemetry_user_enabled() {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::hash::{DefaultHasher, Hash, Hasher};
|
use std::hash::{DefaultHasher, Hash, Hasher};
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
@@ -42,6 +43,87 @@ pub struct ProxyConfigData {
|
|||||||
pub proxy_for_lines: u32,
|
pub proxy_for_lines: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn parse_proxy_config_text(text: &str, http_status: u16) -> ProxyConfigData {
|
||||||
|
let mut map: HashMap<i32, Vec<(IpAddr, u16)>> = HashMap::new();
|
||||||
|
let mut proxy_for_lines: u32 = 0;
|
||||||
|
for line in text.lines() {
|
||||||
|
if let Some((dc, ip, port)) = parse_proxy_line(line) {
|
||||||
|
map.entry(dc).or_default().push((ip, port));
|
||||||
|
proxy_for_lines = proxy_for_lines.saturating_add(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let default_dc = text.lines().find_map(|l| {
|
||||||
|
let t = l.trim();
|
||||||
|
if let Some(rest) = t.strip_prefix("default") {
|
||||||
|
return rest.trim().trim_end_matches(';').parse::<i32>().ok();
|
||||||
|
}
|
||||||
|
None
|
||||||
|
});
|
||||||
|
|
||||||
|
ProxyConfigData {
|
||||||
|
map,
|
||||||
|
default_dc,
|
||||||
|
http_status,
|
||||||
|
proxy_for_lines,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn load_proxy_config_cache(path: &str) -> Result<ProxyConfigData> {
|
||||||
|
let text = tokio::fs::read_to_string(path).await.map_err(|e| {
|
||||||
|
crate::error::ProxyError::Proxy(format!("read proxy-config cache '{path}' failed: {e}"))
|
||||||
|
})?;
|
||||||
|
Ok(parse_proxy_config_text(&text, 200))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn save_proxy_config_cache(path: &str, raw_text: &str) -> Result<()> {
|
||||||
|
if let Some(parent) = Path::new(path).parent()
|
||||||
|
&& !parent.as_os_str().is_empty()
|
||||||
|
{
|
||||||
|
tokio::fs::create_dir_all(parent).await.map_err(|e| {
|
||||||
|
crate::error::ProxyError::Proxy(format!(
|
||||||
|
"create proxy-config cache dir '{}' failed: {e}",
|
||||||
|
parent.display()
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::fs::write(path, raw_text).await.map_err(|e| {
|
||||||
|
crate::error::ProxyError::Proxy(format!("write proxy-config cache '{path}' failed: {e}"))
|
||||||
|
})?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn fetch_proxy_config_with_raw(url: &str) -> Result<(ProxyConfigData, String)> {
|
||||||
|
let resp = reqwest::get(url)
|
||||||
|
.await
|
||||||
|
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))?
|
||||||
|
;
|
||||||
|
let http_status = resp.status().as_u16();
|
||||||
|
|
||||||
|
if let Some(date) = resp.headers().get(reqwest::header::DATE)
|
||||||
|
&& let Ok(date_str) = date.to_str()
|
||||||
|
&& let Ok(server_time) = httpdate::parse_http_date(date_str)
|
||||||
|
&& let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| {
|
||||||
|
server_time.duration_since(SystemTime::now()).map_err(|_| e)
|
||||||
|
})
|
||||||
|
{
|
||||||
|
let skew_secs = skew.as_secs();
|
||||||
|
if skew_secs > 60 {
|
||||||
|
warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header");
|
||||||
|
} else if skew_secs > 30 {
|
||||||
|
warn!(skew_secs, "Time skew >30s detected from fetch_proxy_config Date header");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let text = resp
|
||||||
|
.text()
|
||||||
|
.await
|
||||||
|
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?;
|
||||||
|
let parsed = parse_proxy_config_text(&text, http_status);
|
||||||
|
Ok((parsed, text))
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
struct StableSnapshot {
|
struct StableSnapshot {
|
||||||
candidate_hash: Option<u64>,
|
candidate_hash: Option<u64>,
|
||||||
@@ -170,61 +252,9 @@ fn parse_proxy_line(line: &str) -> Option<(i32, IpAddr, u16)> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
|
pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
|
||||||
let resp = reqwest::get(url)
|
fetch_proxy_config_with_raw(url)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))?
|
.map(|(parsed, _raw)| parsed)
|
||||||
;
|
|
||||||
let http_status = resp.status().as_u16();
|
|
||||||
|
|
||||||
if let Some(date) = resp.headers().get(reqwest::header::DATE)
|
|
||||||
&& let Ok(date_str) = date.to_str()
|
|
||||||
&& let Ok(server_time) = httpdate::parse_http_date(date_str)
|
|
||||||
&& let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| {
|
|
||||||
server_time.duration_since(SystemTime::now()).map_err(|_| e)
|
|
||||||
})
|
|
||||||
{
|
|
||||||
let skew_secs = skew.as_secs();
|
|
||||||
if skew_secs > 60 {
|
|
||||||
warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header");
|
|
||||||
} else if skew_secs > 30 {
|
|
||||||
warn!(skew_secs, "Time skew >30s detected from fetch_proxy_config Date header");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let text = resp
|
|
||||||
.text()
|
|
||||||
.await
|
|
||||||
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?;
|
|
||||||
|
|
||||||
let mut map: HashMap<i32, Vec<(IpAddr, u16)>> = HashMap::new();
|
|
||||||
let mut proxy_for_lines: u32 = 0;
|
|
||||||
for line in text.lines() {
|
|
||||||
if let Some((dc, ip, port)) = parse_proxy_line(line) {
|
|
||||||
map.entry(dc).or_default().push((ip, port));
|
|
||||||
proxy_for_lines = proxy_for_lines.saturating_add(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let default_dc = text
|
|
||||||
.lines()
|
|
||||||
.find_map(|l| {
|
|
||||||
let t = l.trim();
|
|
||||||
if let Some(rest) = t.strip_prefix("default") {
|
|
||||||
return rest
|
|
||||||
.trim()
|
|
||||||
.trim_end_matches(';')
|
|
||||||
.parse::<i32>()
|
|
||||||
.ok();
|
|
||||||
}
|
|
||||||
None
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(ProxyConfigData {
|
|
||||||
map,
|
|
||||||
default_dc,
|
|
||||||
http_status,
|
|
||||||
proxy_for_lines,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn snapshot_passes_guards(
|
fn snapshot_passes_guards(
|
||||||
|
|||||||
@@ -295,15 +295,27 @@ async fn check_family(
|
|||||||
let wait = Duration::from_millis(next_ms)
|
let wait = Duration::from_millis(next_ms)
|
||||||
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
|
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
|
||||||
next_attempt.insert(key, now + wait);
|
next_attempt.insert(key, now + wait);
|
||||||
warn!(
|
if pool.is_runtime_ready() {
|
||||||
dc = %dc,
|
warn!(
|
||||||
?family,
|
dc = %dc,
|
||||||
alive = now_alive,
|
?family,
|
||||||
required,
|
alive = now_alive,
|
||||||
endpoint_count = endpoints.len(),
|
required,
|
||||||
backoff_ms = next_ms,
|
endpoint_count = endpoints.len(),
|
||||||
"DC writer floor is below required level, scheduled reconnect"
|
backoff_ms = next_ms,
|
||||||
);
|
"DC writer floor is below required level, scheduled reconnect"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
dc = %dc,
|
||||||
|
?family,
|
||||||
|
alive = now_alive,
|
||||||
|
required,
|
||||||
|
endpoint_count = endpoints.len(),
|
||||||
|
backoff_ms = next_ms,
|
||||||
|
"DC writer floor is below required level during startup, scheduled reconnect"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if let Some(v) = inflight.get_mut(&key) {
|
if let Some(v) = inflight.get_mut(&key) {
|
||||||
*v = v.saturating_sub(1);
|
*v = v.saturating_sub(1);
|
||||||
|
|||||||
@@ -30,7 +30,11 @@ pub use pool::MePool;
|
|||||||
pub use pool_nat::{stun_probe, detect_public_ip};
|
pub use pool_nat::{stun_probe, detect_public_ip};
|
||||||
pub use registry::ConnRegistry;
|
pub use registry::ConnRegistry;
|
||||||
pub use secret::fetch_proxy_secret;
|
pub use secret::fetch_proxy_secret;
|
||||||
pub use config_updater::{fetch_proxy_config, me_config_updater};
|
#[allow(unused_imports)]
|
||||||
|
pub use config_updater::{
|
||||||
|
ProxyConfigData, fetch_proxy_config, fetch_proxy_config_with_raw, load_proxy_config_cache,
|
||||||
|
me_config_updater, save_proxy_config_cache,
|
||||||
|
};
|
||||||
pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task};
|
pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task};
|
||||||
pub use wire::proto_flags_for_tag;
|
pub use wire::proto_flags_for_tag;
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
|||||||
use tokio::sync::{Mutex, Notify, RwLock, mpsc};
|
use tokio::sync::{Mutex, Notify, RwLock, mpsc};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
use crate::config::{MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy};
|
use crate::config::{MeBindStaleMode, MeFloorMode, MeRouteNoWriterMode, MeSocksKdfPolicy};
|
||||||
use crate::crypto::SecureRandom;
|
use crate::crypto::SecureRandom;
|
||||||
use crate::network::IpFamily;
|
use crate::network::IpFamily;
|
||||||
use crate::network::probe::NetworkDecision;
|
use crate::network::probe::NetworkDecision;
|
||||||
@@ -145,6 +145,11 @@ pub struct MePool {
|
|||||||
pub(super) secret_atomic_snapshot: AtomicBool,
|
pub(super) secret_atomic_snapshot: AtomicBool,
|
||||||
pub(super) me_deterministic_writer_sort: AtomicBool,
|
pub(super) me_deterministic_writer_sort: AtomicBool,
|
||||||
pub(super) me_socks_kdf_policy: AtomicU8,
|
pub(super) me_socks_kdf_policy: AtomicU8,
|
||||||
|
pub(super) me_route_no_writer_mode: AtomicU8,
|
||||||
|
pub(super) me_route_no_writer_wait: Duration,
|
||||||
|
pub(super) me_route_inline_recovery_attempts: u32,
|
||||||
|
pub(super) me_route_inline_recovery_wait: Duration,
|
||||||
|
pub(super) runtime_ready: AtomicBool,
|
||||||
pool_size: usize,
|
pool_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -227,6 +232,10 @@ impl MePool {
|
|||||||
me_route_backpressure_base_timeout_ms: u64,
|
me_route_backpressure_base_timeout_ms: u64,
|
||||||
me_route_backpressure_high_timeout_ms: u64,
|
me_route_backpressure_high_timeout_ms: u64,
|
||||||
me_route_backpressure_high_watermark_pct: u8,
|
me_route_backpressure_high_watermark_pct: u8,
|
||||||
|
me_route_no_writer_mode: MeRouteNoWriterMode,
|
||||||
|
me_route_no_writer_wait_ms: u64,
|
||||||
|
me_route_inline_recovery_attempts: u32,
|
||||||
|
me_route_inline_recovery_wait_ms: u64,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
let registry = Arc::new(ConnRegistry::new());
|
let registry = Arc::new(ConnRegistry::new());
|
||||||
registry.update_route_backpressure_policy(
|
registry.update_route_backpressure_policy(
|
||||||
@@ -343,6 +352,11 @@ impl MePool {
|
|||||||
secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot),
|
secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot),
|
||||||
me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort),
|
me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort),
|
||||||
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
|
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
|
||||||
|
me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()),
|
||||||
|
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
|
||||||
|
me_route_inline_recovery_attempts,
|
||||||
|
me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms),
|
||||||
|
runtime_ready: AtomicBool::new(false),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -350,6 +364,14 @@ impl MePool {
|
|||||||
self.active_generation.load(Ordering::Relaxed)
|
self.active_generation.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_runtime_ready(&self, ready: bool) {
|
||||||
|
self.runtime_ready.store(ready, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_runtime_ready(&self) -> bool {
|
||||||
|
self.runtime_ready.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn update_runtime_reinit_policy(
|
pub fn update_runtime_reinit_policy(
|
||||||
&self,
|
&self,
|
||||||
hardswap: bool,
|
hardswap: bool,
|
||||||
|
|||||||
@@ -278,6 +278,11 @@ impl ConnRegistry {
|
|||||||
Some(ConnWriter { writer_id, tx: writer })
|
Some(ConnWriter { writer_id, tx: writer })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn active_conn_ids(&self) -> Vec<u64> {
|
||||||
|
let inner = self.inner.read().await;
|
||||||
|
inner.writer_for_conn.keys().copied().collect()
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
|
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
|
||||||
let mut inner = self.inner.write().await;
|
let mut inner = self.inner.write().await;
|
||||||
inner.writers.remove(&writer_id);
|
inner.writers.remove(&writer_id);
|
||||||
|
|||||||
@@ -1,16 +1,17 @@
|
|||||||
use std::cmp::Reverse;
|
use std::cmp::Reverse;
|
||||||
use std::collections::HashMap;
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use tokio::sync::mpsc::error::TrySendError;
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
use tracing::{debug, warn};
|
use tracing::{debug, warn};
|
||||||
|
|
||||||
|
use crate::config::MeRouteNoWriterMode;
|
||||||
use crate::error::{ProxyError, Result};
|
use crate::error::{ProxyError, Result};
|
||||||
use crate::network::IpFamily;
|
use crate::network::IpFamily;
|
||||||
use crate::protocol::constants::RPC_CLOSE_EXT_U32;
|
use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32};
|
||||||
|
|
||||||
use super::MePool;
|
use super::MePool;
|
||||||
use super::codec::WriterCommand;
|
use super::codec::WriterCommand;
|
||||||
@@ -49,7 +50,11 @@ impl MePool {
|
|||||||
our_addr,
|
our_addr,
|
||||||
proto_flags,
|
proto_flags,
|
||||||
};
|
};
|
||||||
let mut emergency_attempts = 0;
|
let no_writer_mode =
|
||||||
|
MeRouteNoWriterMode::from_u8(self.me_route_no_writer_mode.load(Ordering::Relaxed));
|
||||||
|
let mut no_writer_deadline: Option<Instant> = None;
|
||||||
|
let mut emergency_attempts = 0u32;
|
||||||
|
let mut async_recovery_triggered = false;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Some(current) = self.registry.get_writer(conn_id).await {
|
if let Some(current) = self.registry.get_writer(conn_id).await {
|
||||||
@@ -74,34 +79,66 @@ impl MePool {
|
|||||||
let mut writers_snapshot = {
|
let mut writers_snapshot = {
|
||||||
let ws = self.writers.read().await;
|
let ws = self.writers.read().await;
|
||||||
if ws.is_empty() {
|
if ws.is_empty() {
|
||||||
// Create waiter before recovery attempts so notify_one permits are not missed.
|
|
||||||
let waiter = self.writer_available.notified();
|
|
||||||
drop(ws);
|
drop(ws);
|
||||||
for family in self.family_order() {
|
match no_writer_mode {
|
||||||
let map = match family {
|
MeRouteNoWriterMode::AsyncRecoveryFailfast => {
|
||||||
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
|
let deadline = *no_writer_deadline.get_or_insert_with(|| {
|
||||||
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
|
Instant::now() + self.me_route_no_writer_wait
|
||||||
};
|
});
|
||||||
for (_dc, addrs) in map.iter() {
|
if !async_recovery_triggered {
|
||||||
for (ip, port) in addrs {
|
let triggered =
|
||||||
let addr = SocketAddr::new(*ip, *port);
|
self.trigger_async_recovery_for_target_dc(target_dc).await;
|
||||||
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
|
if !triggered {
|
||||||
self.writer_available.notify_one();
|
self.trigger_async_recovery_global().await;
|
||||||
|
}
|
||||||
|
async_recovery_triggered = true;
|
||||||
|
}
|
||||||
|
if self.wait_for_writer_until(deadline).await {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
self.stats.increment_me_no_writer_failfast_total();
|
||||||
|
return Err(ProxyError::Proxy(
|
||||||
|
"No ME writer available in failfast window".into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
MeRouteNoWriterMode::InlineRecoveryLegacy => {
|
||||||
|
self.stats.increment_me_inline_recovery_total();
|
||||||
|
for _ in 0..self.me_route_inline_recovery_attempts.max(1) {
|
||||||
|
for family in self.family_order() {
|
||||||
|
let map = match family {
|
||||||
|
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
|
||||||
|
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
|
||||||
|
};
|
||||||
|
for (_dc, addrs) in &map {
|
||||||
|
for (ip, port) in addrs {
|
||||||
|
let addr = SocketAddr::new(*ip, *port);
|
||||||
|
let _ = self.connect_one(addr, self.rng.as_ref()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !self.writers.read().await.is_empty() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
if !self.writers.read().await.is_empty() {
|
||||||
}
|
continue;
|
||||||
if !self.writers.read().await.is_empty() {
|
}
|
||||||
continue;
|
let waiter = self.writer_available.notified();
|
||||||
}
|
if tokio::time::timeout(self.me_route_inline_recovery_wait, waiter)
|
||||||
if tokio::time::timeout(Duration::from_secs(3), waiter).await.is_err() {
|
.await
|
||||||
if !self.writers.read().await.is_empty() {
|
.is_err()
|
||||||
|
{
|
||||||
|
if !self.writers.read().await.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
self.stats.increment_me_no_writer_failfast_total();
|
||||||
|
return Err(ProxyError::Proxy(
|
||||||
|
"All ME connections dead (legacy wait timeout)".into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
return Err(ProxyError::Proxy("All ME connections dead (waited 3s)".into()));
|
|
||||||
}
|
}
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
ws.clone()
|
ws.clone()
|
||||||
};
|
};
|
||||||
@@ -115,46 +152,70 @@ impl MePool {
|
|||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
if candidate_indices.is_empty() {
|
if candidate_indices.is_empty() {
|
||||||
// Emergency connect-on-demand
|
match no_writer_mode {
|
||||||
if emergency_attempts >= 3 {
|
MeRouteNoWriterMode::AsyncRecoveryFailfast => {
|
||||||
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
|
let deadline = *no_writer_deadline.get_or_insert_with(|| {
|
||||||
}
|
Instant::now() + self.me_route_no_writer_wait
|
||||||
emergency_attempts += 1;
|
});
|
||||||
for family in self.family_order() {
|
if !async_recovery_triggered {
|
||||||
let map_guard = match family {
|
let triggered = self.trigger_async_recovery_for_target_dc(target_dc).await;
|
||||||
IpFamily::V4 => self.proxy_map_v4.read().await,
|
if !triggered {
|
||||||
IpFamily::V6 => self.proxy_map_v6.read().await,
|
self.trigger_async_recovery_global().await;
|
||||||
};
|
}
|
||||||
if let Some(addrs) = map_guard.get(&(target_dc as i32)) {
|
async_recovery_triggered = true;
|
||||||
let mut shuffled = addrs.clone();
|
}
|
||||||
shuffled.shuffle(&mut rand::rng());
|
if self.wait_for_candidate_until(target_dc, deadline).await {
|
||||||
drop(map_guard);
|
continue;
|
||||||
for (ip, port) in shuffled {
|
}
|
||||||
let addr = SocketAddr::new(ip, port);
|
self.stats.increment_me_no_writer_failfast_total();
|
||||||
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
|
return Err(ProxyError::Proxy(
|
||||||
break;
|
"No ME writers available for target DC in failfast window".into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
MeRouteNoWriterMode::InlineRecoveryLegacy => {
|
||||||
|
self.stats.increment_me_inline_recovery_total();
|
||||||
|
if emergency_attempts >= self.me_route_inline_recovery_attempts.max(1) {
|
||||||
|
self.stats.increment_me_no_writer_failfast_total();
|
||||||
|
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
|
||||||
|
}
|
||||||
|
emergency_attempts += 1;
|
||||||
|
for family in self.family_order() {
|
||||||
|
let map_guard = match family {
|
||||||
|
IpFamily::V4 => self.proxy_map_v4.read().await,
|
||||||
|
IpFamily::V6 => self.proxy_map_v6.read().await,
|
||||||
|
};
|
||||||
|
if let Some(addrs) = map_guard.get(&(target_dc as i32)) {
|
||||||
|
let mut shuffled = addrs.clone();
|
||||||
|
shuffled.shuffle(&mut rand::rng());
|
||||||
|
drop(map_guard);
|
||||||
|
for (ip, port) in shuffled {
|
||||||
|
let addr = SocketAddr::new(ip, port);
|
||||||
|
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64)).await;
|
||||||
|
let ws2 = self.writers.read().await;
|
||||||
|
writers_snapshot = ws2.clone();
|
||||||
|
drop(ws2);
|
||||||
|
candidate_indices = self
|
||||||
|
.candidate_indices_for_dc(&writers_snapshot, target_dc, false)
|
||||||
|
.await;
|
||||||
|
if candidate_indices.is_empty() {
|
||||||
|
candidate_indices = self
|
||||||
|
.candidate_indices_for_dc(&writers_snapshot, target_dc, true)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
if !candidate_indices.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tokio::time::sleep(Duration::from_millis(100 * emergency_attempts)).await;
|
|
||||||
let ws2 = self.writers.read().await;
|
|
||||||
writers_snapshot = ws2.clone();
|
|
||||||
drop(ws2);
|
|
||||||
candidate_indices = self
|
|
||||||
.candidate_indices_for_dc(&writers_snapshot, target_dc, false)
|
|
||||||
.await;
|
|
||||||
if candidate_indices.is_empty() {
|
if candidate_indices.is_empty() {
|
||||||
candidate_indices = self
|
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
|
||||||
.candidate_indices_for_dc(&writers_snapshot, target_dc, true)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
if !candidate_indices.is_empty() {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if candidate_indices.is_empty() {
|
|
||||||
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
let writer_idle_since = self.registry.writer_idle_since_snapshot().await;
|
let writer_idle_since = self.registry.writer_idle_since_snapshot().await;
|
||||||
let now_epoch_secs = Self::now_epoch_secs();
|
let now_epoch_secs = Self::now_epoch_secs();
|
||||||
@@ -275,6 +336,129 @@ impl MePool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn wait_for_writer_until(&self, deadline: Instant) -> bool {
|
||||||
|
let waiter = self.writer_available.notified();
|
||||||
|
if !self.writers.read().await.is_empty() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
let now = Instant::now();
|
||||||
|
if now >= deadline {
|
||||||
|
return !self.writers.read().await.is_empty();
|
||||||
|
}
|
||||||
|
let timeout = deadline.saturating_duration_since(now);
|
||||||
|
if tokio::time::timeout(timeout, waiter).await.is_ok() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
!self.writers.read().await.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn wait_for_candidate_until(&self, target_dc: i16, deadline: Instant) -> bool {
|
||||||
|
loop {
|
||||||
|
if self.has_candidate_for_target_dc(target_dc).await {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
if now >= deadline {
|
||||||
|
return self.has_candidate_for_target_dc(target_dc).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let remaining = deadline.saturating_duration_since(now);
|
||||||
|
let sleep_for = remaining.min(Duration::from_millis(25));
|
||||||
|
let waiter = self.writer_available.notified();
|
||||||
|
tokio::select! {
|
||||||
|
_ = waiter => {}
|
||||||
|
_ = tokio::time::sleep(sleep_for) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn has_candidate_for_target_dc(&self, target_dc: i16) -> bool {
|
||||||
|
let writers_snapshot = {
|
||||||
|
let ws = self.writers.read().await;
|
||||||
|
if ws.is_empty() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ws.clone()
|
||||||
|
};
|
||||||
|
let mut candidate_indices = self
|
||||||
|
.candidate_indices_for_dc(&writers_snapshot, target_dc, false)
|
||||||
|
.await;
|
||||||
|
if candidate_indices.is_empty() {
|
||||||
|
candidate_indices = self
|
||||||
|
.candidate_indices_for_dc(&writers_snapshot, target_dc, true)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
!candidate_indices.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn trigger_async_recovery_for_target_dc(self: &Arc<Self>, target_dc: i16) -> bool {
|
||||||
|
let endpoints = self.endpoint_candidates_for_target_dc(target_dc).await;
|
||||||
|
if endpoints.is_empty() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
self.stats.increment_me_async_recovery_trigger_total();
|
||||||
|
for addr in endpoints.into_iter().take(8) {
|
||||||
|
self.trigger_immediate_refill(addr);
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn trigger_async_recovery_global(self: &Arc<Self>) {
|
||||||
|
self.stats.increment_me_async_recovery_trigger_total();
|
||||||
|
let mut seen = HashSet::<SocketAddr>::new();
|
||||||
|
for family in self.family_order() {
|
||||||
|
let map = match family {
|
||||||
|
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
|
||||||
|
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
|
||||||
|
};
|
||||||
|
for addrs in map.values() {
|
||||||
|
for (ip, port) in addrs {
|
||||||
|
let addr = SocketAddr::new(*ip, *port);
|
||||||
|
if seen.insert(addr) {
|
||||||
|
self.trigger_immediate_refill(addr);
|
||||||
|
}
|
||||||
|
if seen.len() >= 8 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn endpoint_candidates_for_target_dc(&self, target_dc: i16) -> Vec<SocketAddr> {
|
||||||
|
let key = target_dc as i32;
|
||||||
|
let mut preferred = Vec::<SocketAddr>::new();
|
||||||
|
let mut seen = HashSet::<SocketAddr>::new();
|
||||||
|
|
||||||
|
for family in self.family_order() {
|
||||||
|
let map = match family {
|
||||||
|
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
|
||||||
|
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
|
||||||
|
};
|
||||||
|
let mut lookup_keys = vec![key, key.abs(), -key.abs()];
|
||||||
|
let def = self.default_dc.load(Ordering::Relaxed);
|
||||||
|
if def != 0 {
|
||||||
|
lookup_keys.push(def);
|
||||||
|
}
|
||||||
|
for lookup in lookup_keys {
|
||||||
|
if let Some(addrs) = map.get(&lookup) {
|
||||||
|
for (ip, port) in addrs {
|
||||||
|
let addr = SocketAddr::new(*ip, *port);
|
||||||
|
if seen.insert(addr) {
|
||||||
|
preferred.push(addr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !preferred.is_empty() && !self.decision.effective_multipath {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
preferred
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn send_close(self: &Arc<Self>, conn_id: u64) -> Result<()> {
|
pub async fn send_close(self: &Arc<Self>, conn_id: u64) -> Result<()> {
|
||||||
if let Some(w) = self.registry.get_writer(conn_id).await {
|
if let Some(w) = self.registry.get_writer(conn_id).await {
|
||||||
let mut p = Vec::with_capacity(12);
|
let mut p = Vec::with_capacity(12);
|
||||||
@@ -292,6 +476,37 @@ impl MePool {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn send_close_conn(self: &Arc<Self>, conn_id: u64) -> Result<()> {
|
||||||
|
if let Some(w) = self.registry.get_writer(conn_id).await {
|
||||||
|
let mut p = Vec::with_capacity(12);
|
||||||
|
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
|
||||||
|
p.extend_from_slice(&conn_id.to_le_bytes());
|
||||||
|
match w.tx.try_send(WriterCommand::DataAndFlush(p)) {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(TrySendError::Full(cmd)) => {
|
||||||
|
let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await;
|
||||||
|
}
|
||||||
|
Err(TrySendError::Closed(_)) => {
|
||||||
|
debug!(conn_id, "ME close_conn skipped: writer channel closed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!(conn_id, "ME close_conn skipped (writer missing)");
|
||||||
|
}
|
||||||
|
|
||||||
|
self.registry.unregister(conn_id).await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn shutdown_send_close_conn_all(self: &Arc<Self>) -> usize {
|
||||||
|
let conn_ids = self.registry.active_conn_ids().await;
|
||||||
|
let total = conn_ids.len();
|
||||||
|
for conn_id in conn_ids {
|
||||||
|
let _ = self.send_close_conn(conn_id).await;
|
||||||
|
}
|
||||||
|
total
|
||||||
|
}
|
||||||
|
|
||||||
pub fn connection_count(&self) -> usize {
|
pub fn connection_count(&self) -> usize {
|
||||||
self.conn_count.load(Ordering::Relaxed)
|
self.conn_count.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user