From 7f0057acd7e5e5413c368a22670e1fbd263fe78a Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 4 Apr 2026 11:28:32 +0300 Subject: [PATCH] Conntrack Control Method Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/api/model.rs | 11 + src/api/runtime_stats.rs | 11 + src/config/defaults.rs | 20 ++ src/config/load.rs | 195 +++++++++++ src/config/types.rs | 117 +++++++ src/conntrack_control.rs | 704 ++++++++++++++++++++++++++++++++++++++ src/maestro/listeners.rs | 2 + src/maestro/mod.rs | 6 + src/main.rs | 1 + src/metrics.rs | 128 +++++++ src/proxy/client.rs | 62 +++- src/proxy/direct_relay.rs | 100 +++++- src/proxy/middle_relay.rs | 80 ++++- src/proxy/relay.rs | 40 ++- src/proxy/shared_state.rs | 92 ++++- src/service/mod.rs | 4 +- src/stats/mod.rs | 114 ++++++ 17 files changed, 1658 insertions(+), 29 deletions(-) create mode 100644 src/conntrack_control.rs diff --git a/src/api/model.rs b/src/api/model.rs index ebc67d7..66de644 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -81,10 +81,21 @@ pub(super) struct ZeroCoreData { pub(super) connections_total: u64, pub(super) connections_bad_total: u64, pub(super) handshake_timeouts_total: u64, + pub(super) accept_permit_timeout_total: u64, pub(super) configured_users: usize, pub(super) telemetry_core_enabled: bool, pub(super) telemetry_user_enabled: bool, pub(super) telemetry_me_level: String, + pub(super) conntrack_control_enabled: bool, + pub(super) conntrack_control_available: bool, + pub(super) conntrack_pressure_active: bool, + pub(super) conntrack_event_queue_depth: u64, + pub(super) conntrack_rule_apply_ok: bool, + pub(super) conntrack_delete_attempt_total: u64, + pub(super) conntrack_delete_success_total: u64, + pub(super) conntrack_delete_not_found_total: u64, + pub(super) conntrack_delete_error_total: u64, + pub(super) conntrack_close_event_drop_total: u64, } #[derive(Serialize, Clone)] diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index b66d1a5..131acef 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -39,10 +39,21 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer connections_total: stats.get_connects_all(), connections_bad_total: stats.get_connects_bad(), handshake_timeouts_total: stats.get_handshake_timeouts(), + accept_permit_timeout_total: stats.get_accept_permit_timeout_total(), configured_users, telemetry_core_enabled: telemetry.core_enabled, telemetry_user_enabled: telemetry.user_enabled, telemetry_me_level: telemetry.me_level.to_string(), + conntrack_control_enabled: stats.get_conntrack_control_enabled(), + conntrack_control_available: stats.get_conntrack_control_available(), + conntrack_pressure_active: stats.get_conntrack_pressure_active(), + conntrack_event_queue_depth: stats.get_conntrack_event_queue_depth(), + conntrack_rule_apply_ok: stats.get_conntrack_rule_apply_ok(), + conntrack_delete_attempt_total: stats.get_conntrack_delete_attempt_total(), + conntrack_delete_success_total: stats.get_conntrack_delete_success_total(), + conntrack_delete_not_found_total: stats.get_conntrack_delete_not_found_total(), + conntrack_delete_error_total: stats.get_conntrack_delete_error_total(), + conntrack_close_event_drop_total: stats.get_conntrack_close_event_drop_total(), }, upstream: build_zero_upstream_data(stats), middle_proxy: ZeroMiddleProxyData { diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 89e72bb..f9b0a14 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -48,6 +48,10 @@ const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 16; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 1000; const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30; const DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS: u64 = 250; +const DEFAULT_CONNTRACK_CONTROL_ENABLED: bool = true; +const DEFAULT_CONNTRACK_PRESSURE_HIGH_WATERMARK_PCT: u8 = 85; +const DEFAULT_CONNTRACK_PRESSURE_LOW_WATERMARK_PCT: u8 = 70; +const DEFAULT_CONNTRACK_DELETE_BUDGET_PER_SEC: u64 = 4096; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5; const DEFAULT_UPSTREAM_CONNECT_BUDGET_MS: u64 = 3000; @@ -221,6 +225,22 @@ pub(crate) fn default_accept_permit_timeout_ms() -> u64 { DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS } +pub(crate) fn default_conntrack_control_enabled() -> bool { + DEFAULT_CONNTRACK_CONTROL_ENABLED +} + +pub(crate) fn default_conntrack_pressure_high_watermark_pct() -> u8 { + DEFAULT_CONNTRACK_PRESSURE_HIGH_WATERMARK_PCT +} + +pub(crate) fn default_conntrack_pressure_low_watermark_pct() -> u8 { + DEFAULT_CONNTRACK_PRESSURE_LOW_WATERMARK_PCT +} + +pub(crate) fn default_conntrack_delete_budget_per_sec() -> u64 { + DEFAULT_CONNTRACK_DELETE_BUDGET_PER_SEC +} + pub(crate) fn default_prefer_4() -> u8 { 4 } diff --git a/src/config/load.rs b/src/config/load.rs index cc95f34..268db13 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -922,6 +922,39 @@ impl ProxyConfig { )); } + if config.server.conntrack_control.pressure_high_watermark_pct == 0 + || config.server.conntrack_control.pressure_high_watermark_pct > 100 + { + return Err(ProxyError::Config( + "server.conntrack_control.pressure_high_watermark_pct must be within [1, 100]" + .to_string(), + )); + } + + if config.server.conntrack_control.pressure_low_watermark_pct + >= config.server.conntrack_control.pressure_high_watermark_pct + { + return Err(ProxyError::Config( + "server.conntrack_control.pressure_low_watermark_pct must be < pressure_high_watermark_pct" + .to_string(), + )); + } + + if config.server.conntrack_control.delete_budget_per_sec == 0 { + return Err(ProxyError::Config( + "server.conntrack_control.delete_budget_per_sec must be > 0".to_string(), + )); + } + + if matches!(config.server.conntrack_control.mode, ConntrackMode::Hybrid) + && config.server.conntrack_control.hybrid_listener_ips.is_empty() + { + return Err(ProxyError::Config( + "server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid" + .to_string(), + )); + } + if config.general.effective_me_pool_force_close_secs() > 0 && config.general.effective_me_pool_force_close_secs() < config.general.me_pool_drain_ttl_secs @@ -1327,6 +1360,31 @@ mod tests { cfg.server.api.runtime_edge_events_capacity, default_api_runtime_edge_events_capacity() ); + assert_eq!( + cfg.server.conntrack_control.inline_conntrack_control, + default_conntrack_control_enabled() + ); + assert_eq!(cfg.server.conntrack_control.mode, ConntrackMode::default()); + assert_eq!( + cfg.server.conntrack_control.backend, + ConntrackBackend::default() + ); + assert_eq!( + cfg.server.conntrack_control.profile, + ConntrackPressureProfile::default() + ); + assert_eq!( + cfg.server.conntrack_control.pressure_high_watermark_pct, + default_conntrack_pressure_high_watermark_pct() + ); + assert_eq!( + cfg.server.conntrack_control.pressure_low_watermark_pct, + default_conntrack_pressure_low_watermark_pct() + ); + assert_eq!( + cfg.server.conntrack_control.delete_budget_per_sec, + default_conntrack_delete_budget_per_sec() + ); assert_eq!(cfg.access.users, default_access_users()); assert_eq!( cfg.access.user_max_tcp_conns_global_each, @@ -1472,6 +1530,31 @@ mod tests { server.api.runtime_edge_events_capacity, default_api_runtime_edge_events_capacity() ); + assert_eq!( + server.conntrack_control.inline_conntrack_control, + default_conntrack_control_enabled() + ); + assert_eq!(server.conntrack_control.mode, ConntrackMode::default()); + assert_eq!( + server.conntrack_control.backend, + ConntrackBackend::default() + ); + assert_eq!( + server.conntrack_control.profile, + ConntrackPressureProfile::default() + ); + assert_eq!( + server.conntrack_control.pressure_high_watermark_pct, + default_conntrack_pressure_high_watermark_pct() + ); + assert_eq!( + server.conntrack_control.pressure_low_watermark_pct, + default_conntrack_pressure_low_watermark_pct() + ); + assert_eq!( + server.conntrack_control.delete_budget_per_sec, + default_conntrack_delete_budget_per_sec() + ); let access = AccessConfig::default(); assert_eq!(access.users, default_access_users()); @@ -2404,6 +2487,118 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn conntrack_pressure_high_watermark_out_of_range_is_rejected() { + let toml = r#" + [server.conntrack_control] + pressure_high_watermark_pct = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_conntrack_high_watermark_invalid_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!( + err.contains("server.conntrack_control.pressure_high_watermark_pct must be within [1, 100]") + ); + let _ = std::fs::remove_file(path); + } + + #[test] + fn conntrack_pressure_low_watermark_must_be_below_high() { + let toml = r#" + [server.conntrack_control] + pressure_high_watermark_pct = 50 + pressure_low_watermark_pct = 50 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_conntrack_low_watermark_invalid_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!( + err.contains( + "server.conntrack_control.pressure_low_watermark_pct must be < pressure_high_watermark_pct" + ) + ); + let _ = std::fs::remove_file(path); + } + + #[test] + fn conntrack_delete_budget_zero_is_rejected() { + let toml = r#" + [server.conntrack_control] + delete_budget_per_sec = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_conntrack_delete_budget_invalid_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("server.conntrack_control.delete_budget_per_sec must be > 0")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn conntrack_hybrid_mode_requires_listener_allow_list() { + let toml = r#" + [server.conntrack_control] + mode = "hybrid" + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_conntrack_hybrid_requires_ips_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!( + err.contains("server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid") + ); + let _ = std::fs::remove_file(path); + } + + #[test] + fn conntrack_profile_is_loaded_from_config() { + let toml = r#" + [server.conntrack_control] + profile = "aggressive" + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_conntrack_profile_parse_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert_eq!( + cfg.server.conntrack_control.profile, + ConntrackPressureProfile::Aggressive + ); + let _ = std::fs::remove_file(path); + } + #[test] fn force_close_default_matches_drain_ttl() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index 41b0c2e..a1ffd13 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -1216,6 +1216,118 @@ impl Default for ApiConfig { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum ConntrackMode { + #[default] + Tracked, + Notrack, + Hybrid, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum ConntrackBackend { + #[default] + Auto, + Nftables, + Iptables, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum ConntrackPressureProfile { + Conservative, + #[default] + Balanced, + Aggressive, +} + +impl ConntrackPressureProfile { + pub fn client_first_byte_idle_cap_secs(self) -> u64 { + match self { + Self::Conservative => 30, + Self::Balanced => 20, + Self::Aggressive => 10, + } + } + + pub fn direct_activity_timeout_secs(self) -> u64 { + match self { + Self::Conservative => 180, + Self::Balanced => 120, + Self::Aggressive => 60, + } + } + + pub fn middle_soft_idle_cap_secs(self) -> u64 { + match self { + Self::Conservative => 60, + Self::Balanced => 30, + Self::Aggressive => 20, + } + } + + pub fn middle_hard_idle_cap_secs(self) -> u64 { + match self { + Self::Conservative => 180, + Self::Balanced => 90, + Self::Aggressive => 60, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConntrackControlConfig { + /// Enables runtime conntrack-control worker for pressure mitigation. + #[serde(default = "default_conntrack_control_enabled")] + pub inline_conntrack_control: bool, + + /// Conntrack mode for listener ingress traffic. + #[serde(default)] + pub mode: ConntrackMode, + + /// Netfilter backend used to reconcile notrack rules. + #[serde(default)] + pub backend: ConntrackBackend, + + /// Pressure profile for timeout caps under resource saturation. + #[serde(default)] + pub profile: ConntrackPressureProfile, + + /// Listener IP allow-list for hybrid mode. + /// Ignored in tracked/notrack mode. + #[serde(default)] + pub hybrid_listener_ips: Vec, + + /// Pressure high watermark as percentage. + #[serde(default = "default_conntrack_pressure_high_watermark_pct")] + pub pressure_high_watermark_pct: u8, + + /// Pressure low watermark as percentage. + #[serde(default = "default_conntrack_pressure_low_watermark_pct")] + pub pressure_low_watermark_pct: u8, + + /// Maximum conntrack delete operations per second. + #[serde(default = "default_conntrack_delete_budget_per_sec")] + pub delete_budget_per_sec: u64, +} + +impl Default for ConntrackControlConfig { + fn default() -> Self { + Self { + inline_conntrack_control: default_conntrack_control_enabled(), + mode: ConntrackMode::default(), + backend: ConntrackBackend::default(), + profile: ConntrackPressureProfile::default(), + hybrid_listener_ips: Vec::new(), + pressure_high_watermark_pct: default_conntrack_pressure_high_watermark_pct(), + pressure_low_watermark_pct: default_conntrack_pressure_low_watermark_pct(), + delete_budget_per_sec: default_conntrack_delete_budget_per_sec(), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ServerConfig { #[serde(default = "default_port")] @@ -1291,6 +1403,10 @@ pub struct ServerConfig { /// `0` keeps legacy unbounded wait behavior. #[serde(default = "default_accept_permit_timeout_ms")] pub accept_permit_timeout_ms: u64, + + /// Runtime conntrack control and pressure policy. + #[serde(default)] + pub conntrack_control: ConntrackControlConfig, } impl Default for ServerConfig { @@ -1313,6 +1429,7 @@ impl Default for ServerConfig { listen_backlog: default_listen_backlog(), max_connections: default_server_max_connections(), accept_permit_timeout_ms: default_accept_permit_timeout_ms(), + conntrack_control: ConntrackControlConfig::default(), } } } diff --git a/src/conntrack_control.rs b/src/conntrack_control.rs new file mode 100644 index 0000000..5083877 --- /dev/null +++ b/src/conntrack_control.rs @@ -0,0 +1,704 @@ +use std::collections::BTreeSet; +use std::net::IpAddr; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use tokio::io::AsyncWriteExt; +use tokio::process::Command; +use tokio::sync::{mpsc, watch}; +use tracing::{debug, info, warn}; + +use crate::config::{ConntrackBackend, ConntrackMode, ProxyConfig}; +use crate::proxy::middle_relay::note_global_relay_pressure; +use crate::proxy::shared_state::{ConntrackCloseEvent, ConntrackCloseReason, ProxySharedState}; +use crate::stats::Stats; + +const CONNTRACK_EVENT_QUEUE_CAPACITY: usize = 32_768; +const PRESSURE_RELEASE_TICKS: u8 = 3; +const PRESSURE_SAMPLE_INTERVAL: Duration = Duration::from_secs(1); + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum NetfilterBackend { + Nftables, + Iptables, +} + +#[derive(Clone, Copy)] +struct PressureSample { + conn_pct: Option, + fd_pct: Option, + accept_timeout_delta: u64, + me_queue_pressure_delta: u64, +} + +struct PressureState { + active: bool, + low_streak: u8, + prev_accept_timeout_total: u64, + prev_me_queue_pressure_total: u64, +} + +impl PressureState { + fn new(stats: &Stats) -> Self { + Self { + active: false, + low_streak: 0, + prev_accept_timeout_total: stats.get_accept_permit_timeout_total(), + prev_me_queue_pressure_total: stats.get_me_c2me_send_full_total(), + } + } +} + +pub(crate) fn spawn_conntrack_controller( + config_rx: watch::Receiver>, + stats: Arc, + shared: Arc, +) { + if !cfg!(target_os = "linux") { + let enabled = config_rx.borrow().server.conntrack_control.inline_conntrack_control; + stats.set_conntrack_control_enabled(enabled); + stats.set_conntrack_control_available(false); + stats.set_conntrack_pressure_active(false); + stats.set_conntrack_event_queue_depth(0); + stats.set_conntrack_rule_apply_ok(false); + shared.disable_conntrack_close_sender(); + shared.set_conntrack_pressure_active(false); + if enabled { + warn!("conntrack control is configured but unsupported on this OS; disabling runtime worker"); + } + return; + } + + let (tx, rx) = mpsc::channel(CONNTRACK_EVENT_QUEUE_CAPACITY); + shared.set_conntrack_close_sender(tx); + tokio::spawn(async move { + run_conntrack_controller(config_rx, stats, shared, rx).await; + }); +} + +async fn run_conntrack_controller( + mut config_rx: watch::Receiver>, + stats: Arc, + shared: Arc, + mut close_rx: mpsc::Receiver, +) { + let mut cfg = config_rx.borrow().clone(); + let mut pressure_state = PressureState::new(stats.as_ref()); + let mut delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec; + let mut backend = pick_backend(cfg.server.conntrack_control.backend); + + apply_runtime_state(stats.as_ref(), shared.as_ref(), &cfg, backend.is_some(), false); + reconcile_rules(&cfg, backend, stats.as_ref()).await; + + loop { + tokio::select! { + changed = config_rx.changed() => { + if changed.is_err() { + break; + } + cfg = config_rx.borrow_and_update().clone(); + backend = pick_backend(cfg.server.conntrack_control.backend); + delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec; + apply_runtime_state(stats.as_ref(), shared.as_ref(), &cfg, backend.is_some(), pressure_state.active); + reconcile_rules(&cfg, backend, stats.as_ref()).await; + } + event = close_rx.recv() => { + let Some(event) = event else { + break; + }; + stats.set_conntrack_event_queue_depth(close_rx.len() as u64); + if !cfg.server.conntrack_control.inline_conntrack_control { + continue; + } + if !pressure_state.active { + continue; + } + if !matches!(event.reason, ConntrackCloseReason::Timeout | ConntrackCloseReason::Pressure | ConntrackCloseReason::Reset) { + continue; + } + if delete_budget_tokens == 0 { + continue; + } + stats.increment_conntrack_delete_attempt_total(); + match delete_conntrack_entry(event).await { + DeleteOutcome::Deleted => { + delete_budget_tokens = delete_budget_tokens.saturating_sub(1); + stats.increment_conntrack_delete_success_total(); + } + DeleteOutcome::NotFound => { + delete_budget_tokens = delete_budget_tokens.saturating_sub(1); + stats.increment_conntrack_delete_not_found_total(); + } + DeleteOutcome::Error => { + delete_budget_tokens = delete_budget_tokens.saturating_sub(1); + stats.increment_conntrack_delete_error_total(); + } + } + } + _ = tokio::time::sleep(PRESSURE_SAMPLE_INTERVAL) => { + delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec; + stats.set_conntrack_event_queue_depth(close_rx.len() as u64); + let sample = collect_pressure_sample(stats.as_ref(), &cfg, &mut pressure_state); + update_pressure_state( + stats.as_ref(), + shared.as_ref(), + &cfg, + &sample, + &mut pressure_state, + ); + if pressure_state.active { + note_global_relay_pressure(shared.as_ref()); + } + } + } + } + + shared.disable_conntrack_close_sender(); + shared.set_conntrack_pressure_active(false); + stats.set_conntrack_pressure_active(false); +} + +fn apply_runtime_state( + stats: &Stats, + shared: &ProxySharedState, + cfg: &ProxyConfig, + backend_available: bool, + pressure_active: bool, +) { + let enabled = cfg.server.conntrack_control.inline_conntrack_control; + let available = enabled && backend_available && has_cap_net_admin(); + if enabled && !available { + warn!( + "conntrack control enabled but unavailable (missing CAP_NET_ADMIN or backend binaries)" + ); + } + stats.set_conntrack_control_enabled(enabled); + stats.set_conntrack_control_available(available); + shared.set_conntrack_pressure_active(enabled && pressure_active); + stats.set_conntrack_pressure_active(enabled && pressure_active); +} + +fn collect_pressure_sample( + stats: &Stats, + cfg: &ProxyConfig, + state: &mut PressureState, +) -> PressureSample { + let current_connections = stats.get_current_connections_total(); + let conn_pct = if cfg.server.max_connections == 0 { + None + } else { + Some( + ((current_connections.saturating_mul(100)) / u64::from(cfg.server.max_connections)) + .min(100) as u8, + ) + }; + + let fd_pct = fd_usage_pct(); + + let accept_total = stats.get_accept_permit_timeout_total(); + let accept_delta = accept_total.saturating_sub(state.prev_accept_timeout_total); + state.prev_accept_timeout_total = accept_total; + + let me_total = stats.get_me_c2me_send_full_total(); + let me_delta = me_total.saturating_sub(state.prev_me_queue_pressure_total); + state.prev_me_queue_pressure_total = me_total; + + PressureSample { + conn_pct, + fd_pct, + accept_timeout_delta: accept_delta, + me_queue_pressure_delta: me_delta, + } +} + +fn update_pressure_state( + stats: &Stats, + shared: &ProxySharedState, + cfg: &ProxyConfig, + sample: &PressureSample, + state: &mut PressureState, +) { + if !cfg.server.conntrack_control.inline_conntrack_control { + if state.active { + state.active = false; + state.low_streak = 0; + shared.set_conntrack_pressure_active(false); + stats.set_conntrack_pressure_active(false); + info!("Conntrack pressure mode deactivated (feature disabled)"); + } + return; + } + + let high = cfg.server.conntrack_control.pressure_high_watermark_pct; + let low = cfg.server.conntrack_control.pressure_low_watermark_pct; + + let high_hit = sample.conn_pct.is_some_and(|v| v >= high) + || sample.fd_pct.is_some_and(|v| v >= high) + || sample.accept_timeout_delta > 0 + || sample.me_queue_pressure_delta > 0; + + let low_clear = sample.conn_pct.is_none_or(|v| v <= low) + && sample.fd_pct.is_none_or(|v| v <= low) + && sample.accept_timeout_delta == 0 + && sample.me_queue_pressure_delta == 0; + + if !state.active && high_hit { + state.active = true; + state.low_streak = 0; + shared.set_conntrack_pressure_active(true); + stats.set_conntrack_pressure_active(true); + info!( + conn_pct = ?sample.conn_pct, + fd_pct = ?sample.fd_pct, + accept_timeout_delta = sample.accept_timeout_delta, + me_queue_pressure_delta = sample.me_queue_pressure_delta, + "Conntrack pressure mode activated" + ); + return; + } + + if state.active && low_clear { + state.low_streak = state.low_streak.saturating_add(1); + if state.low_streak >= PRESSURE_RELEASE_TICKS { + state.active = false; + state.low_streak = 0; + shared.set_conntrack_pressure_active(false); + stats.set_conntrack_pressure_active(false); + info!("Conntrack pressure mode deactivated"); + } + return; + } + + state.low_streak = 0; +} + +async fn reconcile_rules(cfg: &ProxyConfig, backend: Option, stats: &Stats) { + if !cfg.server.conntrack_control.inline_conntrack_control { + clear_notrack_rules_all_backends().await; + stats.set_conntrack_rule_apply_ok(true); + return; + } + + if !has_cap_net_admin() { + stats.set_conntrack_rule_apply_ok(false); + return; + } + + let Some(backend) = backend else { + stats.set_conntrack_rule_apply_ok(false); + return; + }; + + let apply_result = match backend { + NetfilterBackend::Nftables => apply_nft_rules(cfg).await, + NetfilterBackend::Iptables => apply_iptables_rules(cfg).await, + }; + + if let Err(error) = apply_result { + warn!(error = %error, "Failed to reconcile conntrack/notrack rules"); + stats.set_conntrack_rule_apply_ok(false); + } else { + stats.set_conntrack_rule_apply_ok(true); + } +} + +fn pick_backend(configured: ConntrackBackend) -> Option { + match configured { + ConntrackBackend::Auto => { + if command_exists("nft") { + Some(NetfilterBackend::Nftables) + } else if command_exists("iptables") { + Some(NetfilterBackend::Iptables) + } else { + None + } + } + ConntrackBackend::Nftables => command_exists("nft").then_some(NetfilterBackend::Nftables), + ConntrackBackend::Iptables => command_exists("iptables").then_some(NetfilterBackend::Iptables), + } +} + +fn command_exists(binary: &str) -> bool { + let Some(path_var) = std::env::var_os("PATH") else { + return false; + }; + std::env::split_paths(&path_var).any(|dir| { + let candidate: PathBuf = dir.join(binary); + candidate.exists() && candidate.is_file() + }) +} + +fn notrack_targets(cfg: &ProxyConfig) -> (Vec>, Vec>) { + let mode = cfg.server.conntrack_control.mode; + let mut v4_targets: BTreeSet> = BTreeSet::new(); + let mut v6_targets: BTreeSet> = BTreeSet::new(); + + match mode { + ConntrackMode::Tracked => {} + ConntrackMode::Notrack => { + if cfg.server.listeners.is_empty() { + if let Some(ipv4) = cfg + .server + .listen_addr_ipv4 + .as_ref() + .and_then(|s| s.parse::().ok()) + { + if ipv4.is_unspecified() { + v4_targets.insert(None); + } else { + v4_targets.insert(Some(ipv4)); + } + } + if let Some(ipv6) = cfg + .server + .listen_addr_ipv6 + .as_ref() + .and_then(|s| s.parse::().ok()) + { + if ipv6.is_unspecified() { + v6_targets.insert(None); + } else { + v6_targets.insert(Some(ipv6)); + } + } + } else { + for listener in &cfg.server.listeners { + if listener.ip.is_ipv4() { + if listener.ip.is_unspecified() { + v4_targets.insert(None); + } else { + v4_targets.insert(Some(listener.ip)); + } + } else if listener.ip.is_unspecified() { + v6_targets.insert(None); + } else { + v6_targets.insert(Some(listener.ip)); + } + } + } + } + ConntrackMode::Hybrid => { + for ip in &cfg.server.conntrack_control.hybrid_listener_ips { + if ip.is_ipv4() { + v4_targets.insert(Some(*ip)); + } else { + v6_targets.insert(Some(*ip)); + } + } + } + } + + ( + v4_targets.into_iter().collect(), + v6_targets.into_iter().collect(), + ) +} + +async fn apply_nft_rules(cfg: &ProxyConfig) -> Result<(), String> { + let _ = run_command("nft", &["delete", "table", "inet", "telemt_conntrack"], None).await; + if matches!(cfg.server.conntrack_control.mode, ConntrackMode::Tracked) { + return Ok(()); + } + + let (v4_targets, v6_targets) = notrack_targets(cfg); + let mut rules = Vec::new(); + for ip in v4_targets { + let rule = if let Some(ip) = ip { + format!("tcp dport {} ip daddr {} notrack", cfg.server.port, ip) + } else { + format!("tcp dport {} notrack", cfg.server.port) + }; + rules.push(rule); + } + for ip in v6_targets { + let rule = if let Some(ip) = ip { + format!("tcp dport {} ip6 daddr {} notrack", cfg.server.port, ip) + } else { + format!("tcp dport {} notrack", cfg.server.port) + }; + rules.push(rule); + } + + let rule_blob = if rules.is_empty() { + String::new() + } else { + format!(" {}\n", rules.join("\n ")) + }; + let script = format!( + "table inet telemt_conntrack {{\n chain preraw {{\n type filter hook prerouting priority raw; policy accept;\n{rule_blob} }}\n}}\n" + ); + run_command("nft", &["-f", "-"], Some(script)).await +} + +async fn apply_iptables_rules(cfg: &ProxyConfig) -> Result<(), String> { + apply_iptables_rules_for_binary("iptables", cfg, true).await?; + apply_iptables_rules_for_binary("ip6tables", cfg, false).await?; + Ok(()) +} + +async fn apply_iptables_rules_for_binary( + binary: &str, + cfg: &ProxyConfig, + ipv4: bool, +) -> Result<(), String> { + if !command_exists(binary) { + return Ok(()); + } + let chain = "TELEMT_NOTRACK"; + let _ = run_command(binary, &["-t", "raw", "-D", "PREROUTING", "-j", chain], None).await; + let _ = run_command(binary, &["-t", "raw", "-F", chain], None).await; + let _ = run_command(binary, &["-t", "raw", "-X", chain], None).await; + + if matches!(cfg.server.conntrack_control.mode, ConntrackMode::Tracked) { + return Ok(()); + } + + run_command(binary, &["-t", "raw", "-N", chain], None).await?; + run_command(binary, &["-t", "raw", "-F", chain], None).await?; + if run_command(binary, &["-t", "raw", "-C", "PREROUTING", "-j", chain], None).await.is_err() { + run_command(binary, &["-t", "raw", "-I", "PREROUTING", "1", "-j", chain], None).await?; + } + + let (v4_targets, v6_targets) = notrack_targets(cfg); + let selected = if ipv4 { v4_targets } else { v6_targets }; + for ip in selected { + let mut args = vec![ + "-t".to_string(), + "raw".to_string(), + "-A".to_string(), + chain.to_string(), + "-p".to_string(), + "tcp".to_string(), + "--dport".to_string(), + cfg.server.port.to_string(), + ]; + if let Some(ip) = ip { + args.push("-d".to_string()); + args.push(ip.to_string()); + } + args.push("-j".to_string()); + args.push("CT".to_string()); + args.push("--notrack".to_string()); + let arg_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + run_command(binary, &arg_refs, None).await?; + } + Ok(()) +} + +async fn clear_notrack_rules_all_backends() { + let _ = run_command("nft", &["delete", "table", "inet", "telemt_conntrack"], None).await; + let _ = run_command("iptables", &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], None).await; + let _ = run_command("iptables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await; + let _ = run_command("iptables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await; + let _ = run_command("ip6tables", &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], None).await; + let _ = run_command("ip6tables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await; + let _ = run_command("ip6tables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await; +} + +enum DeleteOutcome { + Deleted, + NotFound, + Error, +} + +async fn delete_conntrack_entry(event: ConntrackCloseEvent) -> DeleteOutcome { + if !command_exists("conntrack") { + return DeleteOutcome::Error; + } + let args = vec![ + "-D".to_string(), + "-p".to_string(), + "tcp".to_string(), + "-s".to_string(), + event.src.ip().to_string(), + "--sport".to_string(), + event.src.port().to_string(), + "-d".to_string(), + event.dst.ip().to_string(), + "--dport".to_string(), + event.dst.port().to_string(), + ]; + let arg_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + match run_command("conntrack", &arg_refs, None).await { + Ok(()) => DeleteOutcome::Deleted, + Err(error) => { + if error.contains("0 flow entries have been deleted") { + DeleteOutcome::NotFound + } else { + debug!(error = %error, "conntrack delete failed"); + DeleteOutcome::Error + } + } + } +} + +async fn run_command(binary: &str, args: &[&str], stdin: Option) -> Result<(), String> { + if !command_exists(binary) { + return Err(format!("{binary} is not available")); + } + let mut command = Command::new(binary); + command.args(args); + if stdin.is_some() { + command.stdin(std::process::Stdio::piped()); + } + command.stdout(std::process::Stdio::null()); + command.stderr(std::process::Stdio::piped()); + let mut child = command + .spawn() + .map_err(|e| format!("spawn {binary} failed: {e}"))?; + if let Some(blob) = stdin + && let Some(mut writer) = child.stdin.take() + { + writer + .write_all(blob.as_bytes()) + .await + .map_err(|e| format!("stdin write {binary} failed: {e}"))?; + } + let output = child + .wait_with_output() + .await + .map_err(|e| format!("wait {binary} failed: {e}"))?; + if output.status.success() { + return Ok(()); + } + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + Err(if stderr.is_empty() { + format!("{binary} exited with status {}", output.status) + } else { + stderr + }) +} + +fn fd_usage_pct() -> Option { + let soft_limit = nofile_soft_limit()?; + if soft_limit == 0 { + return None; + } + let fd_count = std::fs::read_dir("/proc/self/fd").ok()?.count() as u64; + Some(((fd_count.saturating_mul(100)) / soft_limit).min(100) as u8) +} + +fn nofile_soft_limit() -> Option { + #[cfg(target_os = "linux")] + { + let mut lim = libc::rlimit { + rlim_cur: 0, + rlim_max: 0, + }; + let rc = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut lim) }; + if rc != 0 { + return None; + } + return Some(lim.rlim_cur); + } + #[cfg(not(target_os = "linux"))] + { + None + } +} + +fn has_cap_net_admin() -> bool { + #[cfg(target_os = "linux")] + { + let Ok(status) = std::fs::read_to_string("/proc/self/status") else { + return false; + }; + for line in status.lines() { + if let Some(raw) = line.strip_prefix("CapEff:") { + let caps = raw.trim(); + if let Ok(bits) = u64::from_str_radix(caps, 16) { + const CAP_NET_ADMIN_BIT: u64 = 12; + return (bits & (1u64 << CAP_NET_ADMIN_BIT)) != 0; + } + } + } + false + } + #[cfg(not(target_os = "linux"))] + { + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::ProxyConfig; + + #[test] + fn pressure_activates_on_accept_timeout_spike() { + let stats = Stats::new(); + let shared = ProxySharedState::new(); + let mut cfg = ProxyConfig::default(); + cfg.server.conntrack_control.inline_conntrack_control = true; + let mut state = PressureState::new(&stats); + let sample = PressureSample { + conn_pct: Some(10), + fd_pct: Some(10), + accept_timeout_delta: 1, + me_queue_pressure_delta: 0, + }; + + update_pressure_state(&stats, shared.as_ref(), &cfg, &sample, &mut state); + + assert!(state.active); + assert!(shared.conntrack_pressure_active()); + assert!(stats.get_conntrack_pressure_active()); + } + + #[test] + fn pressure_releases_after_hysteresis_window() { + let stats = Stats::new(); + let shared = ProxySharedState::new(); + let mut cfg = ProxyConfig::default(); + cfg.server.conntrack_control.inline_conntrack_control = true; + let mut state = PressureState::new(&stats); + + let high_sample = PressureSample { + conn_pct: Some(95), + fd_pct: Some(95), + accept_timeout_delta: 0, + me_queue_pressure_delta: 0, + }; + update_pressure_state(&stats, shared.as_ref(), &cfg, &high_sample, &mut state); + assert!(state.active); + + let low_sample = PressureSample { + conn_pct: Some(10), + fd_pct: Some(10), + accept_timeout_delta: 0, + me_queue_pressure_delta: 0, + }; + update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state); + assert!(state.active); + update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state); + assert!(state.active); + update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state); + + assert!(!state.active); + assert!(!shared.conntrack_pressure_active()); + assert!(!stats.get_conntrack_pressure_active()); + } + + #[test] + fn pressure_does_not_activate_when_disabled() { + let stats = Stats::new(); + let shared = ProxySharedState::new(); + let mut cfg = ProxyConfig::default(); + cfg.server.conntrack_control.inline_conntrack_control = false; + let mut state = PressureState::new(&stats); + let sample = PressureSample { + conn_pct: Some(100), + fd_pct: Some(100), + accept_timeout_delta: 10, + me_queue_pressure_delta: 10, + }; + + update_pressure_state(&stats, shared.as_ref(), &cfg, &sample, &mut state); + + assert!(!state.active); + assert!(!shared.conntrack_pressure_active()); + assert!(!stats.get_conntrack_pressure_active()); + } +} diff --git a/src/maestro/listeners.rs b/src/maestro/listeners.rs index 014f69c..96d4cd9 100644 --- a/src/maestro/listeners.rs +++ b/src/maestro/listeners.rs @@ -262,6 +262,7 @@ pub(crate) async fn bind_listeners( break; } Err(_) => { + stats.increment_accept_permit_timeout_total(); debug!( timeout_ms = accept_permit_timeout_ms, "Dropping accepted unix connection: permit wait timeout" @@ -407,6 +408,7 @@ pub(crate) fn spawn_tcp_accept_loops( break; } Err(_) => { + stats.increment_accept_permit_timeout_total(); debug!( peer = %peer_addr, timeout_ms = accept_permit_timeout_ms, diff --git a/src/maestro/mod.rs b/src/maestro/mod.rs index dd11dc2..164071e 100644 --- a/src/maestro/mod.rs +++ b/src/maestro/mod.rs @@ -28,6 +28,7 @@ use tracing::{error, info, warn}; use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload}; use crate::api; +use crate::conntrack_control; use crate::config::{LogLevel, ProxyConfig}; use crate::crypto::SecureRandom; use crate::ip_tracker::UserIpTracker; @@ -633,6 +634,11 @@ async fn run_inner( .await; let _admission_tx_hold = admission_tx; let shared_state = ProxySharedState::new(); + conntrack_control::spawn_conntrack_controller( + config_rx.clone(), + stats.clone(), + shared_state.clone(), + ); let bound = listeners::bind_listeners( &config, diff --git a/src/main.rs b/src/main.rs index 68c89fc..05dc058 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod api; mod cli; +mod conntrack_control; mod config; mod crypto; #[cfg(unix)] diff --git a/src/metrics.rs b/src/metrics.rs index 091ce52..5cb1e77 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -359,6 +359,134 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_accept_permit_timeout_total Accepted connections dropped due to permit wait timeout" + ); + let _ = writeln!(out, "# TYPE telemt_accept_permit_timeout_total counter"); + let _ = writeln!( + out, + "telemt_accept_permit_timeout_total {}", + if core_enabled { + stats.get_accept_permit_timeout_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_conntrack_control_state Runtime conntrack control state flags" + ); + let _ = writeln!(out, "# TYPE telemt_conntrack_control_state gauge"); + let _ = writeln!( + out, + "telemt_conntrack_control_state{{flag=\"enabled\"}} {}", + if stats.get_conntrack_control_enabled() { + 1 + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_conntrack_control_state{{flag=\"available\"}} {}", + if stats.get_conntrack_control_available() { + 1 + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_conntrack_control_state{{flag=\"pressure_active\"}} {}", + if stats.get_conntrack_pressure_active() { + 1 + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_conntrack_control_state{{flag=\"rule_apply_ok\"}} {}", + if stats.get_conntrack_rule_apply_ok() { + 1 + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_conntrack_event_queue_depth Pending close events in conntrack control queue" + ); + let _ = writeln!(out, "# TYPE telemt_conntrack_event_queue_depth gauge"); + let _ = writeln!( + out, + "telemt_conntrack_event_queue_depth {}", + stats.get_conntrack_event_queue_depth() + ); + + let _ = writeln!( + out, + "# HELP telemt_conntrack_delete_total Conntrack delete attempts by outcome" + ); + let _ = writeln!(out, "# TYPE telemt_conntrack_delete_total counter"); + let _ = writeln!( + out, + "telemt_conntrack_delete_total{{result=\"attempt\"}} {}", + if core_enabled { + stats.get_conntrack_delete_attempt_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_conntrack_delete_total{{result=\"success\"}} {}", + if core_enabled { + stats.get_conntrack_delete_success_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_conntrack_delete_total{{result=\"not_found\"}} {}", + if core_enabled { + stats.get_conntrack_delete_not_found_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_conntrack_delete_total{{result=\"error\"}} {}", + if core_enabled { + stats.get_conntrack_delete_error_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_conntrack_close_event_drop_total Dropped conntrack close events due to queue pressure or unavailable sender" + ); + let _ = writeln!( + out, + "# TYPE telemt_conntrack_close_event_drop_total counter" + ); + let _ = writeln!( + out, + "telemt_conntrack_close_event_drop_total {}", + if core_enabled { + stats.get_conntrack_close_event_drop_total() + } else { + 0 + } + ); + let _ = writeln!( out, "# HELP telemt_upstream_connect_attempt_total Upstream connect attempts across all requests" diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 7942906..1fb49b0 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -80,7 +80,7 @@ use crate::transport::middle_proxy::MePool; use crate::transport::socket::normalize_ip; use crate::transport::{UpstreamManager, configure_client_socket, parse_proxy_protocol}; -use crate::proxy::direct_relay::handle_via_direct; +use crate::proxy::direct_relay::handle_via_direct_with_shared; use crate::proxy::handshake::{ HandshakeSuccess, handle_mtproto_handshake_with_shared, handle_tls_handshake_with_shared, }; @@ -191,6 +191,24 @@ fn handshake_timeout_with_mask_grace(config: &ProxyConfig) -> Duration { } } +fn effective_client_first_byte_idle_secs(config: &ProxyConfig, shared: &ProxySharedState) -> u64 { + let idle_secs = config.timeouts.client_first_byte_idle_secs; + if idle_secs == 0 { + return 0; + } + if shared.conntrack_pressure_active() { + idle_secs.min( + config + .server + .conntrack_control + .profile + .client_first_byte_idle_cap_secs(), + ) + } else { + idle_secs + } +} + const MASK_CLASSIFIER_PREFETCH_WINDOW: usize = 16; #[cfg(test)] const MASK_CLASSIFIER_PREFETCH_TIMEOUT: Duration = Duration::from_millis(5); @@ -463,10 +481,11 @@ where debug!(peer = %real_peer, "New connection (generic stream)"); - let first_byte = if config.timeouts.client_first_byte_idle_secs == 0 { + let first_byte_idle_secs = effective_client_first_byte_idle_secs(&config, shared.as_ref()); + let first_byte = if first_byte_idle_secs == 0 { None } else { - let idle_timeout = Duration::from_secs(config.timeouts.client_first_byte_idle_secs); + let idle_timeout = Duration::from_secs(first_byte_idle_secs); let mut first_byte = [0u8; 1]; match timeout(idle_timeout, stream.read(&mut first_byte)).await { Ok(Ok(0)) => { @@ -499,15 +518,15 @@ where ); return Err(ProxyError::Io(e)); } - Err(_) => { - debug!( - peer = %real_peer, - idle_secs = config.timeouts.client_first_byte_idle_secs, - "Closing idle pooled connection before first client byte" - ); - return Ok(()); + Err(_) => { + debug!( + peer = %real_peer, + idle_secs = first_byte_idle_secs, + "Closing idle pooled connection before first client byte" + ); + return Ok(()); + } } - } }; let handshake_timeout = handshake_timeout_with_mask_grace(&config); @@ -968,11 +987,12 @@ impl RunningClientHandler { } } - let first_byte = if self.config.timeouts.client_first_byte_idle_secs == 0 { + let first_byte_idle_secs = + effective_client_first_byte_idle_secs(&self.config, self.shared.as_ref()); + let first_byte = if first_byte_idle_secs == 0 { None } else { - let idle_timeout = - Duration::from_secs(self.config.timeouts.client_first_byte_idle_secs); + let idle_timeout = Duration::from_secs(first_byte_idle_secs); let mut first_byte = [0u8; 1]; match timeout(idle_timeout, self.stream.read(&mut first_byte)).await { Ok(Ok(0)) => { @@ -1008,7 +1028,7 @@ impl RunningClientHandler { Err(_) => { debug!( peer = %self.peer, - idle_secs = self.config.timeouts.client_first_byte_idle_secs, + idle_secs = first_byte_idle_secs, "Closing idle pooled connection before first client byte" ); return Ok(None); @@ -1395,7 +1415,7 @@ impl RunningClientHandler { local_addr: SocketAddr, peer_addr: SocketAddr, ip_tracker: Arc, - _shared: Arc, + shared: Arc, ) -> Result<()> where R: AsyncRead + Unpin + Send + 'static, @@ -1438,12 +1458,12 @@ impl RunningClientHandler { route_runtime.subscribe(), route_snapshot, session_id, - _shared, + shared.clone(), ) .await } else { warn!("use_middle_proxy=true but MePool not initialized, falling back to direct"); - handle_via_direct( + handle_via_direct_with_shared( client_reader, client_writer, success, @@ -1455,12 +1475,14 @@ impl RunningClientHandler { route_runtime.subscribe(), route_snapshot, session_id, + local_addr, + shared.clone(), ) .await } } else { // Direct mode (original behavior) - handle_via_direct( + handle_via_direct_with_shared( client_reader, client_writer, success, @@ -1472,6 +1494,8 @@ impl RunningClientHandler { route_runtime.subscribe(), route_snapshot, session_id, + local_addr, + shared.clone(), ) .await }; diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index 71ebf38..7674f6b 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -6,6 +6,7 @@ use std::net::SocketAddr; use std::path::{Component, Path, PathBuf}; use std::sync::Arc; use std::sync::{Mutex, OnceLock}; +use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf, split}; use tokio::sync::watch; @@ -16,7 +17,9 @@ use crate::crypto::SecureRandom; use crate::error::{ProxyError, Result}; use crate::protocol::constants::*; use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce}; -use crate::proxy::relay::relay_bidirectional; +use crate::proxy::shared_state::{ + ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState, +}; use crate::proxy::route_mode::{ ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state, cutover_stagger_delay, @@ -225,7 +228,43 @@ fn unknown_dc_test_lock() -> &'static Mutex<()> { TEST_LOCK.get_or_init(|| Mutex::new(())) } +#[allow(dead_code)] pub(crate) async fn handle_via_direct( + client_reader: CryptoReader, + client_writer: CryptoWriter, + success: HandshakeSuccess, + upstream_manager: Arc, + stats: Arc, + config: Arc, + buffer_pool: Arc, + rng: Arc, + route_rx: watch::Receiver, + route_snapshot: RouteCutoverState, + session_id: u64, +) -> Result<()> +where + R: AsyncRead + Unpin + Send + 'static, + W: AsyncWrite + Unpin + Send + 'static, +{ + handle_via_direct_with_shared( + client_reader, + client_writer, + success, + upstream_manager, + stats, + config.clone(), + buffer_pool, + rng, + route_rx, + route_snapshot, + session_id, + SocketAddr::from(([0, 0, 0, 0], config.server.port)), + ProxySharedState::new(), + ) + .await +} + +pub(crate) async fn handle_via_direct_with_shared( client_reader: CryptoReader, client_writer: CryptoWriter, success: HandshakeSuccess, @@ -237,6 +276,8 @@ pub(crate) async fn handle_via_direct( mut route_rx: watch::Receiver, route_snapshot: RouteCutoverState, session_id: u64, + local_addr: SocketAddr, + shared: Arc, ) -> Result<()> where R: AsyncRead + Unpin + Send + 'static, @@ -277,7 +318,18 @@ where let _direct_connection_lease = stats.acquire_direct_connection_lease(); let buffer_pool_trim = Arc::clone(&buffer_pool); - let relay_result = relay_bidirectional( + let relay_activity_timeout = if shared.conntrack_pressure_active() { + Duration::from_secs( + config + .server + .conntrack_control + .profile + .direct_activity_timeout_secs(), + ) + } else { + Duration::from_secs(1800) + }; + let relay_result = crate::proxy::relay::relay_bidirectional_with_activity_timeout( client_reader, client_writer, tg_reader, @@ -288,6 +340,7 @@ where Arc::clone(&stats), config.access.user_data_quota.get(user).copied(), buffer_pool, + relay_activity_timeout, ); tokio::pin!(relay_result); let relay_result = loop { @@ -329,9 +382,52 @@ where pool_snapshot.allocated, pool_snapshot.allocated.saturating_sub(pool_snapshot.pooled), ); + + let close_reason = classify_conntrack_close_reason(&relay_result); + let publish_result = shared.publish_conntrack_close_event(ConntrackCloseEvent { + src: success.peer, + dst: local_addr, + reason: close_reason, + }); + if !matches!( + publish_result, + ConntrackClosePublishResult::Sent | ConntrackClosePublishResult::Disabled + ) { + stats.increment_conntrack_close_event_drop_total(); + } + relay_result } +fn classify_conntrack_close_reason(result: &Result<()>) -> ConntrackCloseReason { + match result { + Ok(()) => ConntrackCloseReason::NormalEof, + Err(crate::error::ProxyError::Io(error)) + if matches!(error.kind(), std::io::ErrorKind::TimedOut) => + { + ConntrackCloseReason::Timeout + } + Err(crate::error::ProxyError::Io(error)) + if matches!( + error.kind(), + std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::NotConnected + | std::io::ErrorKind::UnexpectedEof + ) => + { + ConntrackCloseReason::Reset + } + Err(crate::error::ProxyError::Proxy(message)) + if message.contains("pressure") || message.contains("evicted") => + { + ConntrackCloseReason::Pressure + } + Err(_) => ConntrackCloseReason::Other, + } +} + fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result { let prefer_v6 = config.network.prefer == 6 && config.network.ipv6.unwrap_or(true); let datacenters = if prefer_v6 { diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 6d3fd1a..7d94bba 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -16,12 +16,14 @@ use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::timeout; use tracing::{debug, info, trace, warn}; -use crate::config::ProxyConfig; +use crate::config::{ConntrackPressureProfile, ProxyConfig}; use crate::crypto::SecureRandom; use crate::error::{ProxyError, Result}; use crate::protocol::constants::{secure_padding_len, *}; use crate::proxy::handshake::HandshakeSuccess; -use crate::proxy::shared_state::ProxySharedState; +use crate::proxy::shared_state::{ + ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState, +}; use crate::proxy::route_mode::{ ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state, cutover_stagger_delay, @@ -135,6 +137,10 @@ fn note_relay_pressure_event_in(shared: &ProxySharedState) { guard.pressure_event_seq = guard.pressure_event_seq.wrapping_add(1); } +pub(crate) fn note_global_relay_pressure(shared: &ProxySharedState) { + note_relay_pressure_event_in(shared); +} + fn relay_pressure_event_seq_in(shared: &ProxySharedState) -> u64 { let guard = relay_idle_candidate_registry_lock_in(shared); guard.pressure_event_seq @@ -241,6 +247,23 @@ impl RelayClientIdlePolicy { legacy_frame_read_timeout: frame_read_timeout, } } + + fn apply_pressure_caps(&mut self, profile: ConntrackPressureProfile) { + let pressure_soft_idle_cap = Duration::from_secs(profile.middle_soft_idle_cap_secs()); + let pressure_hard_idle_cap = Duration::from_secs(profile.middle_hard_idle_cap_secs()); + + self.soft_idle = self.soft_idle.min(pressure_soft_idle_cap); + self.hard_idle = self.hard_idle.min(pressure_hard_idle_cap); + if self.soft_idle > self.hard_idle { + self.soft_idle = self.hard_idle; + } + self.legacy_frame_read_timeout = self + .legacy_frame_read_timeout + .min(pressure_hard_idle_cap); + if self.grace_after_downstream_activity > self.hard_idle { + self.grace_after_downstream_activity = self.hard_idle; + } + } } #[derive(Clone, Copy)] @@ -1027,7 +1050,12 @@ where let translated_local_addr = me_pool.translate_our_addr(local_addr); let frame_limit = config.general.max_client_frame; - let relay_idle_policy = RelayClientIdlePolicy::from_config(&config); + let mut relay_idle_policy = RelayClientIdlePolicy::from_config(&config); + let mut pressure_caps_applied = false; + if shared.conntrack_pressure_active() { + relay_idle_policy.apply_pressure_caps(config.server.conntrack_control.profile); + pressure_caps_applied = true; + } let session_started_at = forensics.started_at; let mut relay_idle_state = RelayClientIdleState::new(session_started_at); let last_downstream_activity_ms = Arc::new(AtomicU64::new(0)); @@ -1421,6 +1449,11 @@ where let mut route_watch_open = true; let mut seen_pressure_seq = relay_pressure_event_seq_in(shared.as_ref()); loop { + if shared.conntrack_pressure_active() && !pressure_caps_applied { + relay_idle_policy.apply_pressure_caps(config.server.conntrack_control.profile); + pressure_caps_applied = true; + } + if relay_idle_policy.enabled && maybe_evict_idle_candidate_on_pressure_in( shared.as_ref(), @@ -1600,6 +1633,20 @@ where frames_ok = frame_counter, "ME relay cleanup" ); + + let close_reason = classify_conntrack_close_reason(&result); + let publish_result = shared.publish_conntrack_close_event(ConntrackCloseEvent { + src: peer, + dst: local_addr, + reason: close_reason, + }); + if !matches!( + publish_result, + ConntrackClosePublishResult::Sent | ConntrackClosePublishResult::Disabled + ) { + stats.increment_conntrack_close_event_drop_total(); + } + clear_relay_idle_candidate_in(shared.as_ref(), conn_id); me_pool.registry().unregister(conn_id).await; buffer_pool.trim_to(buffer_pool.max_buffers().min(64)); @@ -1612,6 +1659,33 @@ where result } +fn classify_conntrack_close_reason(result: &Result<()>) -> ConntrackCloseReason { + match result { + Ok(()) => ConntrackCloseReason::NormalEof, + Err(ProxyError::Io(error)) if matches!(error.kind(), std::io::ErrorKind::TimedOut) => { + ConntrackCloseReason::Timeout + } + Err(ProxyError::Io(error)) + if matches!( + error.kind(), + std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::NotConnected + | std::io::ErrorKind::UnexpectedEof + ) => + { + ConntrackCloseReason::Reset + } + Err(ProxyError::Proxy(message)) + if message.contains("pressure") || message.contains("evicted") => + { + ConntrackCloseReason::Pressure + } + Err(_) => ConntrackCloseReason::Other, + } +} + async fn read_client_payload_with_idle_policy_in( client_reader: &mut CryptoReader, proto_tag: ProtoTag, diff --git a/src/proxy/relay.rs b/src/proxy/relay.rs index 38224ad..60b432b 100644 --- a/src/proxy/relay.rs +++ b/src/proxy/relay.rs @@ -70,6 +70,7 @@ use tracing::{debug, trace, warn}; /// /// iOS keeps Telegram connections alive in background for up to 30 minutes. /// Closing earlier causes unnecessary reconnects and handshake overhead. +#[allow(dead_code)] const ACTIVITY_TIMEOUT: Duration = Duration::from_secs(1800); /// Watchdog check interval — also used for periodic rate logging. @@ -453,6 +454,7 @@ impl AsyncWrite for StatsIo { /// - Clean shutdown: both write sides are shut down on exit /// - Error propagation: quota exits return `ProxyError::DataQuotaExceeded`, /// other I/O failures are returned as `ProxyError::Io` +#[allow(dead_code)] pub async fn relay_bidirectional( client_reader: CR, client_writer: CW, @@ -471,6 +473,42 @@ where SR: AsyncRead + Unpin + Send + 'static, SW: AsyncWrite + Unpin + Send + 'static, { + relay_bidirectional_with_activity_timeout( + client_reader, + client_writer, + server_reader, + server_writer, + c2s_buf_size, + s2c_buf_size, + user, + stats, + quota_limit, + _buffer_pool, + ACTIVITY_TIMEOUT, + ) + .await +} + +pub async fn relay_bidirectional_with_activity_timeout( + client_reader: CR, + client_writer: CW, + server_reader: SR, + server_writer: SW, + c2s_buf_size: usize, + s2c_buf_size: usize, + user: &str, + stats: Arc, + quota_limit: Option, + _buffer_pool: Arc, + activity_timeout: Duration, +) -> Result<()> +where + CR: AsyncRead + Unpin + Send + 'static, + CW: AsyncWrite + Unpin + Send + 'static, + SR: AsyncRead + Unpin + Send + 'static, + SW: AsyncWrite + Unpin + Send + 'static, +{ + let activity_timeout = activity_timeout.max(Duration::from_secs(1)); let epoch = Instant::now(); let counters = Arc::new(SharedCounters::new()); let quota_exceeded = Arc::new(AtomicBool::new(false)); @@ -512,7 +550,7 @@ where } // ── Activity timeout ──────────────────────────────────── - if idle >= ACTIVITY_TIMEOUT { + if idle >= activity_timeout { let c2s = wd_counters.c2s_bytes.load(Ordering::Relaxed); let s2c = wd_counters.s2c_bytes.load(Ordering::Relaxed); warn!( diff --git a/src/proxy/shared_state.rs b/src/proxy/shared_state.rs index 2928b82..784d666 100644 --- a/src/proxy/shared_state.rs +++ b/src/proxy/shared_state.rs @@ -1,15 +1,40 @@ use std::collections::HashSet; use std::collections::hash_map::RandomState; -use std::net::IpAddr; -use std::sync::atomic::AtomicU64; +use std::net::{IpAddr, SocketAddr}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Instant; use dashmap::DashMap; +use tokio::sync::mpsc; use crate::proxy::handshake::{AuthProbeState, AuthProbeSaturationState}; use crate::proxy::middle_relay::{DesyncDedupRotationState, RelayIdleCandidateRegistry}; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ConntrackCloseReason { + NormalEof, + Timeout, + Pressure, + Reset, + Other, +} + +#[derive(Debug, Clone, Copy)] +pub(crate) struct ConntrackCloseEvent { + pub(crate) src: SocketAddr, + pub(crate) dst: SocketAddr, + pub(crate) reason: ConntrackCloseReason, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ConntrackClosePublishResult { + Sent, + Disabled, + QueueFull, + QueueClosed, +} + pub(crate) struct HandshakeSharedState { pub(crate) auth_probe: DashMap, pub(crate) auth_probe_saturation: Mutex>, @@ -31,6 +56,8 @@ pub(crate) struct MiddleRelaySharedState { pub(crate) struct ProxySharedState { pub(crate) handshake: HandshakeSharedState, pub(crate) middle_relay: MiddleRelaySharedState, + pub(crate) conntrack_pressure_active: AtomicBool, + pub(crate) conntrack_close_tx: Mutex>>, } impl ProxySharedState { @@ -52,6 +79,67 @@ impl ProxySharedState { relay_idle_registry: Mutex::new(RelayIdleCandidateRegistry::default()), relay_idle_mark_seq: AtomicU64::new(0), }, + conntrack_pressure_active: AtomicBool::new(false), + conntrack_close_tx: Mutex::new(None), }) } + + pub(crate) fn set_conntrack_close_sender(&self, tx: mpsc::Sender) { + match self.conntrack_close_tx.lock() { + Ok(mut guard) => { + *guard = Some(tx); + } + Err(poisoned) => { + let mut guard = poisoned.into_inner(); + *guard = Some(tx); + self.conntrack_close_tx.clear_poison(); + } + } + } + + pub(crate) fn disable_conntrack_close_sender(&self) { + match self.conntrack_close_tx.lock() { + Ok(mut guard) => { + *guard = None; + } + Err(poisoned) => { + let mut guard = poisoned.into_inner(); + *guard = None; + self.conntrack_close_tx.clear_poison(); + } + } + } + + pub(crate) fn publish_conntrack_close_event( + &self, + event: ConntrackCloseEvent, + ) -> ConntrackClosePublishResult { + let tx = match self.conntrack_close_tx.lock() { + Ok(guard) => guard.clone(), + Err(poisoned) => { + let guard = poisoned.into_inner(); + let cloned = guard.clone(); + self.conntrack_close_tx.clear_poison(); + cloned + } + }; + + let Some(tx) = tx else { + return ConntrackClosePublishResult::Disabled; + }; + + match tx.try_send(event) { + Ok(()) => ConntrackClosePublishResult::Sent, + Err(mpsc::error::TrySendError::Full(_)) => ConntrackClosePublishResult::QueueFull, + Err(mpsc::error::TrySendError::Closed(_)) => ConntrackClosePublishResult::QueueClosed, + } + } + + pub(crate) fn set_conntrack_pressure_active(&self, active: bool) { + self.conntrack_pressure_active.store(active, Ordering::Relaxed); + } + + pub(crate) fn conntrack_pressure_active(&self) -> bool { + self.conntrack_pressure_active.load(Ordering::Relaxed) + } } diff --git a/src/service/mod.rs b/src/service/mod.rs index 7a6e4f6..bb8e38b 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -159,8 +159,8 @@ MemoryDenyWriteExecute=true LockPersonality=true # Allow binding to privileged ports and writing to specific paths -AmbientCapabilities=CAP_NET_BIND_SERVICE -CapabilityBoundingSet=CAP_NET_BIND_SERVICE +AmbientCapabilities=CAP_NET_BIND_SERVICE CAP_NET_ADMIN +CapabilityBoundingSet=CAP_NET_BIND_SERVICE CAP_NET_ADMIN ReadWritePaths=/etc/telemt /var/run /var/lib/telemt [Install] diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 6f3d3f8..42c42ff 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -91,6 +91,17 @@ pub struct Stats { current_connections_direct: AtomicU64, current_connections_me: AtomicU64, handshake_timeouts: AtomicU64, + accept_permit_timeout_total: AtomicU64, + conntrack_control_enabled_gauge: AtomicBool, + conntrack_control_available_gauge: AtomicBool, + conntrack_pressure_active_gauge: AtomicBool, + conntrack_event_queue_depth_gauge: AtomicU64, + conntrack_rule_apply_ok_gauge: AtomicBool, + conntrack_delete_attempt_total: AtomicU64, + conntrack_delete_success_total: AtomicU64, + conntrack_delete_not_found_total: AtomicU64, + conntrack_delete_error_total: AtomicU64, + conntrack_close_event_drop_total: AtomicU64, upstream_connect_attempt_total: AtomicU64, upstream_connect_success_total: AtomicU64, upstream_connect_fail_total: AtomicU64, @@ -528,6 +539,74 @@ impl Stats { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); } } + + pub fn increment_accept_permit_timeout_total(&self) { + if self.telemetry_core_enabled() { + self.accept_permit_timeout_total + .fetch_add(1, Ordering::Relaxed); + } + } + + pub fn set_conntrack_control_enabled(&self, enabled: bool) { + self.conntrack_control_enabled_gauge + .store(enabled, Ordering::Relaxed); + } + + pub fn set_conntrack_control_available(&self, available: bool) { + self.conntrack_control_available_gauge + .store(available, Ordering::Relaxed); + } + + pub fn set_conntrack_pressure_active(&self, active: bool) { + self.conntrack_pressure_active_gauge + .store(active, Ordering::Relaxed); + } + + pub fn set_conntrack_event_queue_depth(&self, depth: u64) { + self.conntrack_event_queue_depth_gauge + .store(depth, Ordering::Relaxed); + } + + pub fn set_conntrack_rule_apply_ok(&self, ok: bool) { + self.conntrack_rule_apply_ok_gauge + .store(ok, Ordering::Relaxed); + } + + pub fn increment_conntrack_delete_attempt_total(&self) { + if self.telemetry_core_enabled() { + self.conntrack_delete_attempt_total + .fetch_add(1, Ordering::Relaxed); + } + } + + pub fn increment_conntrack_delete_success_total(&self) { + if self.telemetry_core_enabled() { + self.conntrack_delete_success_total + .fetch_add(1, Ordering::Relaxed); + } + } + + pub fn increment_conntrack_delete_not_found_total(&self) { + if self.telemetry_core_enabled() { + self.conntrack_delete_not_found_total + .fetch_add(1, Ordering::Relaxed); + } + } + + pub fn increment_conntrack_delete_error_total(&self) { + if self.telemetry_core_enabled() { + self.conntrack_delete_error_total + .fetch_add(1, Ordering::Relaxed); + } + } + + pub fn increment_conntrack_close_event_drop_total(&self) { + if self.telemetry_core_enabled() { + self.conntrack_close_event_drop_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_upstream_connect_attempt_total(&self) { if self.telemetry_core_enabled() { self.upstream_connect_attempt_total @@ -1477,6 +1556,9 @@ impl Stats { pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } + pub fn get_accept_permit_timeout_total(&self) -> u64 { + self.accept_permit_timeout_total.load(Ordering::Relaxed) + } pub fn get_current_connections_direct(&self) -> u64 { self.current_connections_direct.load(Ordering::Relaxed) } @@ -1487,6 +1569,38 @@ impl Stats { self.get_current_connections_direct() .saturating_add(self.get_current_connections_me()) } + pub fn get_conntrack_control_enabled(&self) -> bool { + self.conntrack_control_enabled_gauge.load(Ordering::Relaxed) + } + pub fn get_conntrack_control_available(&self) -> bool { + self.conntrack_control_available_gauge + .load(Ordering::Relaxed) + } + pub fn get_conntrack_pressure_active(&self) -> bool { + self.conntrack_pressure_active_gauge.load(Ordering::Relaxed) + } + pub fn get_conntrack_event_queue_depth(&self) -> u64 { + self.conntrack_event_queue_depth_gauge + .load(Ordering::Relaxed) + } + pub fn get_conntrack_rule_apply_ok(&self) -> bool { + self.conntrack_rule_apply_ok_gauge.load(Ordering::Relaxed) + } + pub fn get_conntrack_delete_attempt_total(&self) -> u64 { + self.conntrack_delete_attempt_total.load(Ordering::Relaxed) + } + pub fn get_conntrack_delete_success_total(&self) -> u64 { + self.conntrack_delete_success_total.load(Ordering::Relaxed) + } + pub fn get_conntrack_delete_not_found_total(&self) -> u64 { + self.conntrack_delete_not_found_total.load(Ordering::Relaxed) + } + pub fn get_conntrack_delete_error_total(&self) -> u64 { + self.conntrack_delete_error_total.load(Ordering::Relaxed) + } + pub fn get_conntrack_close_event_drop_total(&self) -> u64 { + self.conntrack_close_event_drop_total.load(Ordering::Relaxed) + } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }