Merge pull request #214 from telemt/flow

Desync Full Forensics + ME Pool Updater + Soft Reinit
This commit is contained in:
Alexey 2026-02-23 16:15:30 +03:00 committed by GitHub
commit ef51d0f62d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 708 additions and 91 deletions

View File

@ -194,6 +194,9 @@ prefer_ipv6 = false
fast_mode = true fast_mode = true
use_middle_proxy = false use_middle_proxy = false
log_level = "normal" log_level = "normal"
desync_all_full = false
update_every = 43200
me_reinit_drain_timeout_secs = 300
[network] [network]
ipv4 = true ipv4 = true

View File

@ -118,6 +118,10 @@ pub(crate) fn default_max_client_frame() -> usize {
16 * 1024 * 1024 16 * 1024 * 1024
} }
pub(crate) fn default_desync_all_full() -> bool {
false
}
pub(crate) fn default_tls_new_session_tickets() -> u8 { pub(crate) fn default_tls_new_session_tickets() -> u8 {
0 0
} }
@ -167,6 +171,14 @@ pub(crate) fn default_proxy_config_reload_secs() -> u64 {
12 * 60 * 60 12 * 60 * 60
} }
pub(crate) fn default_update_every_secs() -> u64 {
2 * 60 * 60
}
pub(crate) fn default_me_reinit_drain_timeout_secs() -> u64 {
300
}
pub(crate) fn default_ntp_check() -> bool { pub(crate) fn default_ntp_check() -> bool {
true true
} }

View File

@ -10,6 +10,9 @@
//! | `general` | `ad_tag` | Passed on next connection | //! | `general` | `ad_tag` | Passed on next connection |
//! | `general` | `middle_proxy_pool_size` | Passed on next connection | //! | `general` | `middle_proxy_pool_size` | Passed on next connection |
//! | `general` | `me_keepalive_*` | Passed on next connection | //! | `general` | `me_keepalive_*` | Passed on next connection |
//! | `general` | `desync_all_full` | Applied immediately |
//! | `general` | `update_every` | Applied to ME updater immediately |
//! | `general` | `me_reinit_drain_timeout_secs`| Applied on next ME map update |
//! | `access` | All user/quota fields | Effective immediately | //! | `access` | All user/quota fields | Effective immediately |
//! //!
//! Fields that require re-binding sockets (`server.port`, `censorship.*`, //! Fields that require re-binding sockets (`server.port`, `censorship.*`,
@ -34,6 +37,9 @@ pub struct HotFields {
pub log_level: LogLevel, pub log_level: LogLevel,
pub ad_tag: Option<String>, pub ad_tag: Option<String>,
pub middle_proxy_pool_size: usize, pub middle_proxy_pool_size: usize,
pub desync_all_full: bool,
pub update_every_secs: u64,
pub me_reinit_drain_timeout_secs: u64,
pub me_keepalive_enabled: bool, pub me_keepalive_enabled: bool,
pub me_keepalive_interval_secs: u64, pub me_keepalive_interval_secs: u64,
pub me_keepalive_jitter_secs: u64, pub me_keepalive_jitter_secs: u64,
@ -47,6 +53,9 @@ impl HotFields {
log_level: cfg.general.log_level.clone(), log_level: cfg.general.log_level.clone(),
ad_tag: cfg.general.ad_tag.clone(), ad_tag: cfg.general.ad_tag.clone(),
middle_proxy_pool_size: cfg.general.middle_proxy_pool_size, middle_proxy_pool_size: cfg.general.middle_proxy_pool_size,
desync_all_full: cfg.general.desync_all_full,
update_every_secs: cfg.general.effective_update_every_secs(),
me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs,
me_keepalive_enabled: cfg.general.me_keepalive_enabled, me_keepalive_enabled: cfg.general.me_keepalive_enabled,
me_keepalive_interval_secs: cfg.general.me_keepalive_interval_secs, me_keepalive_interval_secs: cfg.general.me_keepalive_interval_secs,
me_keepalive_jitter_secs: cfg.general.me_keepalive_jitter_secs, me_keepalive_jitter_secs: cfg.general.me_keepalive_jitter_secs,
@ -175,6 +184,27 @@ fn log_changes(
); );
} }
if old_hot.desync_all_full != new_hot.desync_all_full {
info!(
"config reload: desync_all_full: {} → {}",
old_hot.desync_all_full, new_hot.desync_all_full,
);
}
if old_hot.update_every_secs != new_hot.update_every_secs {
info!(
"config reload: update_every(effective): {}s → {}s",
old_hot.update_every_secs, new_hot.update_every_secs,
);
}
if old_hot.me_reinit_drain_timeout_secs != new_hot.me_reinit_drain_timeout_secs {
info!(
"config reload: me_reinit_drain_timeout_secs: {}s → {}s",
old_hot.me_reinit_drain_timeout_secs, new_hot.me_reinit_drain_timeout_secs,
);
}
if old_hot.me_keepalive_enabled != new_hot.me_keepalive_enabled if old_hot.me_keepalive_enabled != new_hot.me_keepalive_enabled
|| old_hot.me_keepalive_interval_secs != new_hot.me_keepalive_interval_secs || old_hot.me_keepalive_interval_secs != new_hot.me_keepalive_interval_secs
|| old_hot.me_keepalive_jitter_secs != new_hot.me_keepalive_jitter_secs || old_hot.me_keepalive_jitter_secs != new_hot.me_keepalive_jitter_secs

View File

@ -117,6 +117,34 @@ impl ProxyConfig {
let mut config: ProxyConfig = let mut config: ProxyConfig =
toml::from_str(&processed).map_err(|e| ProxyError::Config(e.to_string()))?; toml::from_str(&processed).map_err(|e| ProxyError::Config(e.to_string()))?;
if let Some(update_every) = config.general.update_every {
if update_every == 0 {
return Err(ProxyError::Config(
"general.update_every must be > 0".to_string(),
));
}
} else {
let legacy_secret = config.general.proxy_secret_auto_reload_secs;
let legacy_config = config.general.proxy_config_auto_reload_secs;
let effective = legacy_secret.min(legacy_config);
if effective == 0 {
return Err(ProxyError::Config(
"legacy proxy_*_auto_reload_secs values must be > 0 when general.update_every is not set".to_string(),
));
}
if legacy_secret != default_proxy_secret_reload_secs()
|| legacy_config != default_proxy_config_reload_secs()
{
warn!(
proxy_secret_auto_reload_secs = legacy_secret,
proxy_config_auto_reload_secs = legacy_config,
effective_update_every_secs = effective,
"proxy_*_auto_reload_secs are deprecated; set general.update_every"
);
}
}
// Validate secrets. // Validate secrets.
for (user, secret) in &config.access.users { for (user, secret) in &config.access.users {
if !secret.chars().all(|c| c.is_ascii_hexdigit()) || secret.len() != 32 { if !secret.chars().all(|c| c.is_ascii_hexdigit()) || secret.len() != 32 {
@ -347,4 +375,68 @@ mod tests {
.unwrap_or(false)); .unwrap_or(false));
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }
#[test]
fn update_every_overrides_legacy_fields() {
let toml = r#"
[general]
update_every = 123
proxy_secret_auto_reload_secs = 700
proxy_config_auto_reload_secs = 800
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_update_every_override_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert_eq!(cfg.general.effective_update_every_secs(), 123);
let _ = std::fs::remove_file(path);
}
#[test]
fn update_every_fallback_to_legacy_min() {
let toml = r#"
[general]
proxy_secret_auto_reload_secs = 600
proxy_config_auto_reload_secs = 120
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_update_every_legacy_min_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert_eq!(cfg.general.update_every, None);
assert_eq!(cfg.general.effective_update_every_secs(), 120);
let _ = std::fs::remove_file(path);
}
#[test]
fn update_every_zero_is_rejected() {
let toml = r#"
[general]
update_every = 0
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_update_every_zero_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.update_every must be > 0"));
let _ = std::fs::remove_file(path);
}
} }

View File

@ -201,6 +201,11 @@ pub struct GeneralConfig {
#[serde(default = "default_max_client_frame")] #[serde(default = "default_max_client_frame")]
pub max_client_frame: usize, pub max_client_frame: usize,
/// Emit full crypto-desync forensic logs for every event.
/// When false, full forensic details are emitted once per key window.
#[serde(default = "default_desync_all_full")]
pub desync_all_full: bool,
/// Enable staggered warmup of extra ME writers. /// Enable staggered warmup of extra ME writers.
#[serde(default = "default_true")] #[serde(default = "default_true")]
pub me_warmup_stagger_enabled: bool, pub me_warmup_stagger_enabled: bool,
@ -252,11 +257,23 @@ pub struct GeneralConfig {
#[serde(default = "default_fast_mode_min_tls_record")] #[serde(default = "default_fast_mode_min_tls_record")]
pub fast_mode_min_tls_record: usize, pub fast_mode_min_tls_record: usize,
/// Automatically reload proxy-secret every N seconds. /// Unified ME updater interval in seconds for getProxyConfig/getProxyConfigV6/getProxySecret.
/// When omitted, effective value falls back to legacy proxy_*_auto_reload_secs fields.
#[serde(default)]
pub update_every: Option<u64>,
/// Drain timeout in seconds for stale ME writers after endpoint map changes.
/// Set to 0 to keep stale writers draining indefinitely (no force-close).
#[serde(default = "default_me_reinit_drain_timeout_secs")]
pub me_reinit_drain_timeout_secs: u64,
/// Deprecated legacy setting; kept for backward compatibility fallback.
/// Use `update_every` instead.
#[serde(default = "default_proxy_secret_reload_secs")] #[serde(default = "default_proxy_secret_reload_secs")]
pub proxy_secret_auto_reload_secs: u64, pub proxy_secret_auto_reload_secs: u64,
/// Automatically reload proxy-multi.conf every N seconds. /// Deprecated legacy setting; kept for backward compatibility fallback.
/// Use `update_every` instead.
#[serde(default = "default_proxy_config_reload_secs")] #[serde(default = "default_proxy_config_reload_secs")]
pub proxy_config_auto_reload_secs: u64, pub proxy_config_auto_reload_secs: u64,
@ -310,7 +327,10 @@ impl Default for GeneralConfig {
links: LinksConfig::default(), links: LinksConfig::default(),
crypto_pending_buffer: default_crypto_pending_buffer(), crypto_pending_buffer: default_crypto_pending_buffer(),
max_client_frame: default_max_client_frame(), max_client_frame: default_max_client_frame(),
desync_all_full: default_desync_all_full(),
fast_mode_min_tls_record: default_fast_mode_min_tls_record(), fast_mode_min_tls_record: default_fast_mode_min_tls_record(),
update_every: Some(default_update_every_secs()),
me_reinit_drain_timeout_secs: default_me_reinit_drain_timeout_secs(),
proxy_secret_auto_reload_secs: default_proxy_secret_reload_secs(), proxy_secret_auto_reload_secs: default_proxy_secret_reload_secs(),
proxy_config_auto_reload_secs: default_proxy_config_reload_secs(), proxy_config_auto_reload_secs: default_proxy_config_reload_secs(),
ntp_check: default_ntp_check(), ntp_check: default_ntp_check(),
@ -321,6 +341,15 @@ impl Default for GeneralConfig {
} }
} }
impl GeneralConfig {
/// Resolve the active updater interval for ME infrastructure refresh tasks.
/// `update_every` has priority, otherwise legacy proxy_*_auto_reload_secs are used.
pub fn effective_update_every_secs(&self) -> u64 {
self.update_every
.unwrap_or_else(|| self.proxy_secret_auto_reload_secs.min(self.proxy_config_auto_reload_secs))
}
}
/// `[general.links]` — proxy link generation settings. /// `[general.links]` — proxy link generation settings.
#[derive(Debug, Clone, Serialize, Deserialize, Default)] #[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LinksConfig { pub struct LinksConfig {

View File

@ -392,18 +392,6 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
.await; .await;
}); });
// Periodic updater: getProxyConfig + proxy-secret
let pool_clone2 = pool.clone();
let rng_clone2 = rng.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_config_updater(
pool_clone2,
rng_clone2,
std::time::Duration::from_secs(12 * 3600),
)
.await;
});
Some(pool) Some(pool)
} }
Err(e) => { Err(e) => {
@ -702,6 +690,20 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
detected_ip_v6, detected_ip_v6,
); );
if let Some(ref pool) = me_pool {
let pool_clone = pool.clone();
let rng_clone = rng.clone();
let config_rx_clone = config_rx.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_config_updater(
pool_clone,
rng_clone,
config_rx_clone,
)
.await;
});
}
let mut listeners = Vec::new(); let mut listeners = Vec::new();
for listener_conf in &config.server.listeners { for listener_conf in &config.server.listeners {

View File

@ -140,6 +140,41 @@ fn render_metrics(stats: &Stats) -> String {
let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter"); let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter");
let _ = writeln!(out, "telemt_secure_padding_invalid_total {}", stats.get_secure_padding_invalid()); let _ = writeln!(out, "telemt_secure_padding_invalid_total {}", stats.get_secure_padding_invalid());
let _ = writeln!(out, "# HELP telemt_desync_total Total crypto-desync detections");
let _ = writeln!(out, "# TYPE telemt_desync_total counter");
let _ = writeln!(out, "telemt_desync_total {}", stats.get_desync_total());
let _ = writeln!(out, "# HELP telemt_desync_full_logged_total Full forensic desync logs emitted");
let _ = writeln!(out, "# TYPE telemt_desync_full_logged_total counter");
let _ = writeln!(out, "telemt_desync_full_logged_total {}", stats.get_desync_full_logged());
let _ = writeln!(out, "# HELP telemt_desync_suppressed_total Suppressed desync forensic events");
let _ = writeln!(out, "# TYPE telemt_desync_suppressed_total counter");
let _ = writeln!(out, "telemt_desync_suppressed_total {}", stats.get_desync_suppressed());
let _ = writeln!(out, "# HELP telemt_desync_frames_bucket_total Desync count by frames_ok bucket");
let _ = writeln!(out, "# TYPE telemt_desync_frames_bucket_total counter");
let _ = writeln!(
out,
"telemt_desync_frames_bucket_total{{bucket=\"0\"}} {}",
stats.get_desync_frames_bucket_0()
);
let _ = writeln!(
out,
"telemt_desync_frames_bucket_total{{bucket=\"1_2\"}} {}",
stats.get_desync_frames_bucket_1_2()
);
let _ = writeln!(
out,
"telemt_desync_frames_bucket_total{{bucket=\"3_10\"}} {}",
stats.get_desync_frames_bucket_3_10()
);
let _ = writeln!(
out,
"telemt_desync_frames_bucket_total{{bucket=\"gt_10\"}} {}",
stats.get_desync_frames_bucket_gt_10()
);
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections"); let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter"); let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections"); let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");

View File

@ -1,5 +1,10 @@
use std::net::SocketAddr; use std::collections::HashMap;
use std::sync::Arc; use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
@ -19,6 +24,148 @@ enum C2MeCommand {
Close, Close,
} }
const DESYNC_DEDUP_WINDOW: Duration = Duration::from_secs(60);
const DESYNC_ERROR_CLASS: &str = "frame_too_large_crypto_desync";
static DESYNC_DEDUP: OnceLock<Mutex<HashMap<u64, Instant>>> = OnceLock::new();
struct RelayForensicsState {
trace_id: u64,
conn_id: u64,
user: String,
peer: SocketAddr,
peer_hash: u64,
started_at: Instant,
bytes_c2me: u64,
bytes_me2c: Arc<AtomicU64>,
desync_all_full: bool,
}
fn hash_value<T: Hash>(value: &T) -> u64 {
let mut hasher = DefaultHasher::new();
value.hash(&mut hasher);
hasher.finish()
}
fn hash_ip(ip: IpAddr) -> u64 {
hash_value(&ip)
}
fn should_emit_full_desync(key: u64, all_full: bool, now: Instant) -> bool {
if all_full {
return true;
}
let dedup = DESYNC_DEDUP.get_or_init(|| Mutex::new(HashMap::new()));
let mut guard = dedup.lock().expect("desync dedup mutex poisoned");
guard.retain(|_, seen_at| now.duration_since(*seen_at) < DESYNC_DEDUP_WINDOW);
match guard.get_mut(&key) {
Some(seen_at) => {
if now.duration_since(*seen_at) >= DESYNC_DEDUP_WINDOW {
*seen_at = now;
true
} else {
false
}
}
None => {
guard.insert(key, now);
true
}
}
}
fn report_desync_frame_too_large(
state: &RelayForensicsState,
proto_tag: ProtoTag,
frame_counter: u64,
max_frame: usize,
len: usize,
raw_len_bytes: Option<[u8; 4]>,
stats: &Stats,
) -> ProxyError {
let len_buf = raw_len_bytes.unwrap_or((len as u32).to_le_bytes());
let looks_like_tls = raw_len_bytes
.map(|b| b[0] == 0x16 && b[1] == 0x03)
.unwrap_or(false);
let looks_like_http = raw_len_bytes
.map(|b| matches!(b[0], b'G' | b'P' | b'H' | b'C' | b'D'))
.unwrap_or(false);
let now = Instant::now();
let dedup_key = hash_value(&(
state.user.as_str(),
state.peer_hash,
proto_tag,
DESYNC_ERROR_CLASS,
));
let emit_full = should_emit_full_desync(dedup_key, state.desync_all_full, now);
let duration_ms = state.started_at.elapsed().as_millis() as u64;
let bytes_me2c = state.bytes_me2c.load(Ordering::Relaxed);
stats.increment_desync_total();
stats.observe_desync_frames_ok(frame_counter);
if emit_full {
stats.increment_desync_full_logged();
warn!(
trace_id = format_args!("0x{:016x}", state.trace_id),
conn_id = state.conn_id,
user = %state.user,
peer_hash = format_args!("0x{:016x}", state.peer_hash),
proto = ?proto_tag,
mode = "middle_proxy",
is_tls = true,
duration_ms,
bytes_c2me = state.bytes_c2me,
bytes_me2c,
raw_len = len,
raw_len_hex = format_args!("0x{:08x}", len),
raw_bytes = format_args!(
"{:02x} {:02x} {:02x} {:02x}",
len_buf[0], len_buf[1], len_buf[2], len_buf[3]
),
max_frame,
tls_like = looks_like_tls,
http_like = looks_like_http,
frames_ok = frame_counter,
dedup_window_secs = DESYNC_DEDUP_WINDOW.as_secs(),
desync_all_full = state.desync_all_full,
full_reason = if state.desync_all_full { "desync_all_full" } else { "first_in_dedup_window" },
error_class = DESYNC_ERROR_CLASS,
"Frame too large — crypto desync forensics"
);
debug!(
trace_id = format_args!("0x{:016x}", state.trace_id),
conn_id = state.conn_id,
user = %state.user,
peer = %state.peer,
"Frame too large forensic peer detail"
);
} else {
stats.increment_desync_suppressed();
debug!(
trace_id = format_args!("0x{:016x}", state.trace_id),
conn_id = state.conn_id,
user = %state.user,
peer_hash = format_args!("0x{:016x}", state.peer_hash),
proto = ?proto_tag,
duration_ms,
bytes_c2me = state.bytes_c2me,
bytes_me2c,
raw_len = len,
frames_ok = frame_counter,
dedup_window_secs = DESYNC_DEDUP_WINDOW.as_secs(),
error_class = DESYNC_ERROR_CLASS,
"Frame too large — crypto desync forensic suppressed"
);
}
ProxyError::Proxy(format!(
"Frame too large: {len} (max {max_frame}), frames_ok={frame_counter}, conn_id={}, trace_id=0x{:016x}",
state.conn_id,
state.trace_id
))
}
pub(crate) async fn handle_via_middle_proxy<R, W>( pub(crate) async fn handle_via_middle_proxy<R, W>(
mut crypto_reader: CryptoReader<R>, mut crypto_reader: CryptoReader<R>,
crypto_writer: CryptoWriter<W>, crypto_writer: CryptoWriter<W>,
@ -48,14 +195,30 @@ where
); );
let (conn_id, me_rx) = me_pool.registry().register().await; let (conn_id, me_rx) = me_pool.registry().register().await;
let trace_id = conn_id;
let bytes_me2c = Arc::new(AtomicU64::new(0));
let mut forensics = RelayForensicsState {
trace_id,
conn_id,
user: user.clone(),
peer,
peer_hash: hash_ip(peer.ip()),
started_at: Instant::now(),
bytes_c2me: 0,
bytes_me2c: bytes_me2c.clone(),
desync_all_full: config.general.desync_all_full,
};
stats.increment_user_connects(&user); stats.increment_user_connects(&user);
stats.increment_user_curr_connects(&user); stats.increment_user_curr_connects(&user);
let proto_flags = proto_flags_for_tag(proto_tag, me_pool.has_proxy_tag()); let proto_flags = proto_flags_for_tag(proto_tag, me_pool.has_proxy_tag());
debug!( debug!(
trace_id = format_args!("0x{:016x}", trace_id),
user = %user, user = %user,
conn_id, conn_id,
peer_hash = format_args!("0x{:016x}", forensics.peer_hash),
desync_all_full = forensics.desync_all_full,
proto_flags = format_args!("0x{:08x}", proto_flags), proto_flags = format_args!("0x{:08x}", proto_flags),
"ME relay started" "ME relay started"
); );
@ -93,6 +256,7 @@ where
let stats_clone = stats.clone(); let stats_clone = stats.clone();
let rng_clone = rng.clone(); let rng_clone = rng.clone();
let user_clone = user.clone(); let user_clone = user.clone();
let bytes_me2c_clone = bytes_me2c.clone();
let me_writer = tokio::spawn(async move { let me_writer = tokio::spawn(async move {
let mut writer = crypto_writer; let mut writer = crypto_writer;
let mut frame_buf = Vec::with_capacity(16 * 1024); let mut frame_buf = Vec::with_capacity(16 * 1024);
@ -102,6 +266,7 @@ where
match msg { match msg {
Some(MeResponse::Data { flags, data }) => { Some(MeResponse::Data { flags, data }) => {
trace!(conn_id, bytes = data.len(), flags, "ME->C data"); trace!(conn_id, bytes = data.len(), flags, "ME->C data");
bytes_me2c_clone.fetch_add(data.len() as u64, Ordering::Relaxed);
stats_clone.add_user_octets_to(&user_clone, data.len() as u64); stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
write_client_payload( write_client_payload(
&mut writer, &mut writer,
@ -118,6 +283,7 @@ where
match next { match next {
MeResponse::Data { flags, data } => { MeResponse::Data { flags, data } => {
trace!(conn_id, bytes = data.len(), flags, "ME->C data (batched)"); trace!(conn_id, bytes = data.len(), flags, "ME->C data (batched)");
bytes_me2c_clone.fetch_add(data.len() as u64, Ordering::Relaxed);
stats_clone.add_user_octets_to(&user_clone, data.len() as u64); stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
write_client_payload( write_client_payload(
&mut writer, &mut writer,
@ -173,12 +339,15 @@ where
&mut crypto_reader, &mut crypto_reader,
proto_tag, proto_tag,
frame_limit, frame_limit,
&user, &forensics,
&mut frame_counter, &mut frame_counter,
&stats, &stats,
).await { ).await {
Ok(Some((payload, quickack))) => { Ok(Some((payload, quickack))) => {
trace!(conn_id, bytes = payload.len(), "C->ME frame"); trace!(conn_id, bytes = payload.len(), "C->ME frame");
forensics.bytes_c2me = forensics
.bytes_c2me
.saturating_add(payload.len() as u64);
stats.add_user_octets_from(&user, payload.len() as u64); stats.add_user_octets_from(&user, payload.len() as u64);
let mut flags = proto_flags; let mut flags = proto_flags;
if quickack { if quickack {
@ -237,7 +406,16 @@ where
(_, _, Err(e)) => Err(e), (_, _, Err(e)) => Err(e),
}; };
debug!(user = %user, conn_id, "ME relay cleanup"); debug!(
user = %user,
conn_id,
trace_id = format_args!("0x{:016x}", trace_id),
duration_ms = forensics.started_at.elapsed().as_millis() as u64,
bytes_c2me = forensics.bytes_c2me,
bytes_me2c = forensics.bytes_me2c.load(Ordering::Relaxed),
frames_ok = frame_counter,
"ME relay cleanup"
);
me_pool.registry().unregister(conn_id).await; me_pool.registry().unregister(conn_id).await;
stats.decrement_user_curr_connects(&user); stats.decrement_user_curr_connects(&user);
result result
@ -247,7 +425,7 @@ async fn read_client_payload<R>(
client_reader: &mut CryptoReader<R>, client_reader: &mut CryptoReader<R>,
proto_tag: ProtoTag, proto_tag: ProtoTag,
max_frame: usize, max_frame: usize,
user: &str, forensics: &RelayForensicsState,
frame_counter: &mut u64, frame_counter: &mut u64,
stats: &Stats, stats: &Stats,
) -> Result<Option<(Vec<u8>, bool)>> ) -> Result<Option<(Vec<u8>, bool)>>
@ -302,7 +480,9 @@ where
} }
if len < 4 && proto_tag != ProtoTag::Abridged { if len < 4 && proto_tag != ProtoTag::Abridged {
warn!( warn!(
user = %user, trace_id = format_args!("0x{:016x}", forensics.trace_id),
conn_id = forensics.conn_id,
user = %forensics.user,
len, len,
proto = ?proto_tag, proto = ?proto_tag,
"Frame too small — corrupt or probe" "Frame too small — corrupt or probe"
@ -311,31 +491,15 @@ where
} }
if len > max_frame { if len > max_frame {
let len_buf = raw_len_bytes.unwrap_or((len as u32).to_le_bytes()); return Err(report_desync_frame_too_large(
let looks_like_tls = raw_len_bytes forensics,
.map(|b| b[0] == 0x16 && b[1] == 0x03) proto_tag,
.unwrap_or(false); *frame_counter,
let looks_like_http = raw_len_bytes max_frame,
.map(|b| matches!(b[0], b'G' | b'P' | b'H' | b'C' | b'D')) len,
.unwrap_or(false); raw_len_bytes,
warn!( stats,
user = %user, ));
raw_len = len,
raw_len_hex = format_args!("0x{:08x}", len),
raw_bytes = format_args!(
"{:02x} {:02x} {:02x} {:02x}",
len_buf[0], len_buf[1], len_buf[2], len_buf[3]
),
proto = ?proto_tag,
tls_like = looks_like_tls,
http_like = looks_like_http,
frames_ok = *frame_counter,
"Frame too large — crypto desync forensics"
);
return Err(ProxyError::Proxy(format!(
"Frame too large: {len} (max {max_frame}), frames_ok={}",
*frame_counter
)));
} }
let secure_payload_len = if proto_tag == ProtoTag::Secure { let secure_payload_len = if proto_tag == ProtoTag::Secure {

View File

@ -31,6 +31,13 @@ pub struct Stats {
me_route_drop_channel_closed: AtomicU64, me_route_drop_channel_closed: AtomicU64,
me_route_drop_queue_full: AtomicU64, me_route_drop_queue_full: AtomicU64,
secure_padding_invalid: AtomicU64, secure_padding_invalid: AtomicU64,
desync_total: AtomicU64,
desync_full_logged: AtomicU64,
desync_suppressed: AtomicU64,
desync_frames_bucket_0: AtomicU64,
desync_frames_bucket_1_2: AtomicU64,
desync_frames_bucket_3_10: AtomicU64,
desync_frames_bucket_gt_10: AtomicU64,
user_stats: DashMap<String, UserStats>, user_stats: DashMap<String, UserStats>,
start_time: parking_lot::RwLock<Option<Instant>>, start_time: parking_lot::RwLock<Option<Instant>>,
} }
@ -76,6 +83,31 @@ impl Stats {
pub fn increment_secure_padding_invalid(&self) { pub fn increment_secure_padding_invalid(&self) {
self.secure_padding_invalid.fetch_add(1, Ordering::Relaxed); self.secure_padding_invalid.fetch_add(1, Ordering::Relaxed);
} }
pub fn increment_desync_total(&self) {
self.desync_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_desync_full_logged(&self) {
self.desync_full_logged.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_desync_suppressed(&self) {
self.desync_suppressed.fetch_add(1, Ordering::Relaxed);
}
pub fn observe_desync_frames_ok(&self, frames_ok: u64) {
match frames_ok {
0 => {
self.desync_frames_bucket_0.fetch_add(1, Ordering::Relaxed);
}
1..=2 => {
self.desync_frames_bucket_1_2.fetch_add(1, Ordering::Relaxed);
}
3..=10 => {
self.desync_frames_bucket_3_10.fetch_add(1, Ordering::Relaxed);
}
_ => {
self.desync_frames_bucket_gt_10.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
@ -96,6 +128,27 @@ impl Stats {
pub fn get_secure_padding_invalid(&self) -> u64 { pub fn get_secure_padding_invalid(&self) -> u64 {
self.secure_padding_invalid.load(Ordering::Relaxed) self.secure_padding_invalid.load(Ordering::Relaxed)
} }
pub fn get_desync_total(&self) -> u64 {
self.desync_total.load(Ordering::Relaxed)
}
pub fn get_desync_full_logged(&self) -> u64 {
self.desync_full_logged.load(Ordering::Relaxed)
}
pub fn get_desync_suppressed(&self) -> u64 {
self.desync_suppressed.load(Ordering::Relaxed)
}
pub fn get_desync_frames_bucket_0(&self) -> u64 {
self.desync_frames_bucket_0.load(Ordering::Relaxed)
}
pub fn get_desync_frames_bucket_1_2(&self) -> u64 {
self.desync_frames_bucket_1_2.load(Ordering::Relaxed)
}
pub fn get_desync_frames_bucket_3_10(&self) -> u64 {
self.desync_frames_bucket_3_10.load(Ordering::Relaxed)
}
pub fn get_desync_frames_bucket_gt_10(&self) -> u64 {
self.desync_frames_bucket_gt_10.load(Ordering::Relaxed)
}
pub fn increment_user_connects(&self, user: &str) { pub fn increment_user_connects(&self, user: &str) {
self.user_stats.entry(user.to_string()).or_default() self.user_stats.entry(user.to_string()).or_default()

View File

@ -4,8 +4,10 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use httpdate; use httpdate;
use tokio::sync::watch;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use crate::config::ProxyConfig;
use crate::error::Result; use crate::error::Result;
use super::MePool; use super::MePool;
@ -128,23 +130,20 @@ pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
Ok(ProxyConfigData { map, default_dc }) Ok(ProxyConfigData { map, default_dc })
} }
pub async fn me_config_updater(pool: Arc<MePool>, rng: Arc<SecureRandom>, interval: Duration) { async fn run_update_cycle(pool: &Arc<MePool>, rng: &Arc<SecureRandom>, cfg: &ProxyConfig) {
let mut tick = tokio::time::interval(interval); let mut maps_changed = false;
// skip immediate tick to avoid double-fetch right after startup
tick.tick().await;
loop {
tick.tick().await;
// Update proxy config v4 // Update proxy config v4
let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await; let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await;
if let Some(cfg) = cfg_v4 { if let Some(cfg_v4) = cfg_v4 {
let changed = pool.update_proxy_maps(cfg.map.clone(), None).await; let changed = pool.update_proxy_maps(cfg_v4.map.clone(), None).await;
if let Some(dc) = cfg.default_dc { if let Some(dc) = cfg_v4.default_dc {
pool.default_dc.store(dc, std::sync::atomic::Ordering::Relaxed); pool.default_dc
.store(dc, std::sync::atomic::Ordering::Relaxed);
} }
if changed { if changed {
info!("ME config updated (v4), reconciling connections"); maps_changed = true;
pool.reconcile_connections(&rng).await; info!("ME config updated (v4)");
} else { } else {
debug!("ME config v4 unchanged"); debug!("ME config v4 unchanged");
} }
@ -155,12 +154,23 @@ pub async fn me_config_updater(pool: Arc<MePool>, rng: Arc<SecureRandom>, interv
if let Some(cfg_v6) = cfg_v6 { if let Some(cfg_v6) = cfg_v6 {
let changed = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await; let changed = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await;
if changed { if changed {
info!("ME config updated (v6), reconciling connections"); maps_changed = true;
pool.reconcile_connections(&rng).await; info!("ME config updated (v6)");
} else { } else {
debug!("ME config v6 unchanged"); debug!("ME config v6 unchanged");
} }
} }
if maps_changed {
let drain_timeout = if cfg.general.me_reinit_drain_timeout_secs == 0 {
None
} else {
Some(Duration::from_secs(cfg.general.me_reinit_drain_timeout_secs))
};
pool.zero_downtime_reinit_after_map_change(rng.as_ref(), drain_timeout)
.await;
}
pool.reset_stun_state(); pool.reset_stun_state();
// Update proxy-secret // Update proxy-secret
@ -172,6 +182,75 @@ pub async fn me_config_updater(pool: Arc<MePool>, rng: Arc<SecureRandom>, interv
} }
Err(e) => warn!(error = %e, "proxy-secret update failed"), Err(e) => warn!(error = %e, "proxy-secret update failed"),
} }
}
pub async fn me_config_updater(
pool: Arc<MePool>,
rng: Arc<SecureRandom>,
mut config_rx: watch::Receiver<Arc<ProxyConfig>>,
) {
let mut update_every_secs = config_rx
.borrow()
.general
.effective_update_every_secs()
.max(1);
let mut update_every = Duration::from_secs(update_every_secs);
let mut next_tick = tokio::time::Instant::now() + update_every;
info!(update_every_secs, "ME config updater started");
loop {
let sleep = tokio::time::sleep_until(next_tick);
tokio::pin!(sleep);
tokio::select! {
_ = &mut sleep => {
let cfg = config_rx.borrow().clone();
run_update_cycle(&pool, &rng, cfg.as_ref()).await;
let refreshed_secs = cfg.general.effective_update_every_secs().max(1);
if refreshed_secs != update_every_secs {
info!(
old_update_every_secs = update_every_secs,
new_update_every_secs = refreshed_secs,
"ME config updater interval changed"
);
update_every_secs = refreshed_secs;
update_every = Duration::from_secs(update_every_secs);
}
next_tick = tokio::time::Instant::now() + update_every;
}
changed = config_rx.changed() => {
if changed.is_err() {
warn!("ME config updater stopped: config channel closed");
break;
}
let cfg = config_rx.borrow().clone();
let new_secs = cfg.general.effective_update_every_secs().max(1);
if new_secs == update_every_secs {
continue;
}
if new_secs < update_every_secs {
info!(
old_update_every_secs = update_every_secs,
new_update_every_secs = new_secs,
"ME config updater interval decreased, running immediate refresh"
);
update_every_secs = new_secs;
update_every = Duration::from_secs(update_every_secs);
run_update_cycle(&pool, &rng, cfg.as_ref()).await;
next_tick = tokio::time::Instant::now() + update_every;
} else {
info!(
old_update_every_secs = update_every_secs,
new_update_every_secs = new_secs,
"ME config updater interval increased"
);
update_every_secs = new_secs;
update_every = Duration::from_secs(update_every_secs);
next_tick = tokio::time::Instant::now() + update_every;
}
}
}
} }
} }

View File

@ -1,4 +1,4 @@
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicUsize, Ordering};
@ -178,7 +178,6 @@ impl MePool {
} }
pub async fn reconcile_connections(self: &Arc<Self>, rng: &SecureRandom) { pub async fn reconcile_connections(self: &Arc<Self>, rng: &SecureRandom) {
use std::collections::HashSet;
let writers = self.writers.read().await; let writers = self.writers.read().await;
let current: HashSet<SocketAddr> = writers let current: HashSet<SocketAddr> = writers
.iter() .iter()
@ -210,6 +209,103 @@ impl MePool {
} }
} }
async fn desired_dc_endpoints(&self) -> HashMap<i32, HashSet<SocketAddr>> {
let mut out: HashMap<i32, HashSet<SocketAddr>> = HashMap::new();
if self.decision.ipv4_me {
let map_v4 = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map_v4 {
let entry = out.entry(dc.abs()).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
}
if self.decision.ipv6_me {
let map_v6 = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map_v6 {
let entry = out.entry(dc.abs()).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
}
out
}
pub async fn zero_downtime_reinit_after_map_change(
self: &Arc<Self>,
rng: &SecureRandom,
drain_timeout: Option<Duration>,
) {
// Stage 1: prewarm writers for new endpoint maps before draining old ones.
self.reconcile_connections(rng).await;
let desired_by_dc = self.desired_dc_endpoints().await;
if desired_by_dc.is_empty() {
warn!("ME endpoint map is empty after update; skipping stale writer drain");
return;
}
let writers = self.writers.read().await;
let active_writer_addrs: HashSet<SocketAddr> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.map(|w| w.addr)
.collect();
let mut missing_dc = Vec::<i32>::new();
for (dc, endpoints) in &desired_by_dc {
if endpoints.is_empty() {
continue;
}
if !endpoints.iter().any(|addr| active_writer_addrs.contains(addr)) {
missing_dc.push(*dc);
}
}
if !missing_dc.is_empty() {
missing_dc.sort_unstable();
warn!(
missing_dc = ?missing_dc,
// Keep stale writers alive when fresh coverage is incomplete.
"ME reinit coverage incomplete after map update; keeping stale writers"
);
return;
}
let desired_addrs: HashSet<SocketAddr> = desired_by_dc
.values()
.flat_map(|set| set.iter().copied())
.collect();
let stale_writer_ids: Vec<u64> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| !desired_addrs.contains(&w.addr))
.map(|w| w.id)
.collect();
drop(writers);
if stale_writer_ids.is_empty() {
debug!("ME map update completed with no stale writers");
return;
}
let drain_timeout_secs = drain_timeout.map(|d| d.as_secs()).unwrap_or(0);
info!(
stale_writers = stale_writer_ids.len(),
drain_timeout_secs,
"ME map update covered; draining stale writers"
);
for writer_id in stale_writer_ids {
self.mark_writer_draining_with_timeout(writer_id, drain_timeout)
.await;
}
}
pub async fn update_proxy_maps( pub async fn update_proxy_maps(
&self, &self,
new_v4: HashMap<i32, Vec<(IpAddr, u16)>>, new_v4: HashMap<i32, Vec<(IpAddr, u16)>>,
@ -631,24 +727,41 @@ impl MePool {
self.registry.writer_lost(writer_id).await self.registry.writer_lost(writer_id).await
} }
pub(crate) async fn mark_writer_draining(self: &Arc<Self>, writer_id: u64) { pub(crate) async fn mark_writer_draining_with_timeout(
{ self: &Arc<Self>,
writer_id: u64,
timeout: Option<Duration>,
) {
let timeout = timeout.filter(|d| !d.is_zero());
let found = {
let mut ws = self.writers.write().await; let mut ws = self.writers.write().await;
if let Some(w) = ws.iter_mut().find(|w| w.id == writer_id) { if let Some(w) = ws.iter_mut().find(|w| w.id == writer_id) {
w.draining.store(true, Ordering::Relaxed); w.draining.store(true, Ordering::Relaxed);
true
} else {
false
} }
};
if !found {
return;
} }
let timeout_secs = timeout.map(|d| d.as_secs()).unwrap_or(0);
debug!(writer_id, timeout_secs, "ME writer marked draining");
let pool = Arc::downgrade(self); let pool = Arc::downgrade(self);
tokio::spawn(async move { tokio::spawn(async move {
let deadline = Instant::now() + Duration::from_secs(300); let deadline = timeout.map(|t| Instant::now() + t);
loop { loop {
if let Some(p) = pool.upgrade() { if let Some(p) = pool.upgrade() {
if Instant::now() >= deadline { if let Some(deadline_at) = deadline {
if Instant::now() >= deadline_at {
warn!(writer_id, "Drain timeout, force-closing"); warn!(writer_id, "Drain timeout, force-closing");
let _ = p.remove_writer_and_close_clients(writer_id).await; let _ = p.remove_writer_and_close_clients(writer_id).await;
break; break;
} }
}
if p.registry.is_writer_empty(writer_id).await { if p.registry.is_writer_empty(writer_id).await {
let _ = p.remove_writer_only(writer_id).await; let _ = p.remove_writer_only(writer_id).await;
break; break;
@ -661,6 +774,11 @@ impl MePool {
}); });
} }
pub(crate) async fn mark_writer_draining(self: &Arc<Self>, writer_id: u64) {
self.mark_writer_draining_with_timeout(writer_id, Some(Duration::from_secs(300)))
.await;
}
} }
fn hex_dump(data: &[u8]) -> String { fn hex_dump(data: &[u8]) -> String {