Conntrack Control Method

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-04-04 11:28:32 +03:00
parent 7fe38f1b9f
commit 7f0057acd7
No known key found for this signature in database
17 changed files with 1658 additions and 29 deletions

View File

@ -81,10 +81,21 @@ pub(super) struct ZeroCoreData {
pub(super) connections_total: u64,
pub(super) connections_bad_total: u64,
pub(super) handshake_timeouts_total: u64,
pub(super) accept_permit_timeout_total: u64,
pub(super) configured_users: usize,
pub(super) telemetry_core_enabled: bool,
pub(super) telemetry_user_enabled: bool,
pub(super) telemetry_me_level: String,
pub(super) conntrack_control_enabled: bool,
pub(super) conntrack_control_available: bool,
pub(super) conntrack_pressure_active: bool,
pub(super) conntrack_event_queue_depth: u64,
pub(super) conntrack_rule_apply_ok: bool,
pub(super) conntrack_delete_attempt_total: u64,
pub(super) conntrack_delete_success_total: u64,
pub(super) conntrack_delete_not_found_total: u64,
pub(super) conntrack_delete_error_total: u64,
pub(super) conntrack_close_event_drop_total: u64,
}
#[derive(Serialize, Clone)]

View File

@ -39,10 +39,21 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
connections_total: stats.get_connects_all(),
connections_bad_total: stats.get_connects_bad(),
handshake_timeouts_total: stats.get_handshake_timeouts(),
accept_permit_timeout_total: stats.get_accept_permit_timeout_total(),
configured_users,
telemetry_core_enabled: telemetry.core_enabled,
telemetry_user_enabled: telemetry.user_enabled,
telemetry_me_level: telemetry.me_level.to_string(),
conntrack_control_enabled: stats.get_conntrack_control_enabled(),
conntrack_control_available: stats.get_conntrack_control_available(),
conntrack_pressure_active: stats.get_conntrack_pressure_active(),
conntrack_event_queue_depth: stats.get_conntrack_event_queue_depth(),
conntrack_rule_apply_ok: stats.get_conntrack_rule_apply_ok(),
conntrack_delete_attempt_total: stats.get_conntrack_delete_attempt_total(),
conntrack_delete_success_total: stats.get_conntrack_delete_success_total(),
conntrack_delete_not_found_total: stats.get_conntrack_delete_not_found_total(),
conntrack_delete_error_total: stats.get_conntrack_delete_error_total(),
conntrack_close_event_drop_total: stats.get_conntrack_close_event_drop_total(),
},
upstream: build_zero_upstream_data(stats),
middle_proxy: ZeroMiddleProxyData {

View File

@ -48,6 +48,10 @@ const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 16;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 1000;
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
const DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS: u64 = 250;
const DEFAULT_CONNTRACK_CONTROL_ENABLED: bool = true;
const DEFAULT_CONNTRACK_PRESSURE_HIGH_WATERMARK_PCT: u8 = 85;
const DEFAULT_CONNTRACK_PRESSURE_LOW_WATERMARK_PCT: u8 = 70;
const DEFAULT_CONNTRACK_DELETE_BUDGET_PER_SEC: u64 = 4096;
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
const DEFAULT_UPSTREAM_CONNECT_BUDGET_MS: u64 = 3000;
@ -221,6 +225,22 @@ pub(crate) fn default_accept_permit_timeout_ms() -> u64 {
DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS
}
pub(crate) fn default_conntrack_control_enabled() -> bool {
DEFAULT_CONNTRACK_CONTROL_ENABLED
}
pub(crate) fn default_conntrack_pressure_high_watermark_pct() -> u8 {
DEFAULT_CONNTRACK_PRESSURE_HIGH_WATERMARK_PCT
}
pub(crate) fn default_conntrack_pressure_low_watermark_pct() -> u8 {
DEFAULT_CONNTRACK_PRESSURE_LOW_WATERMARK_PCT
}
pub(crate) fn default_conntrack_delete_budget_per_sec() -> u64 {
DEFAULT_CONNTRACK_DELETE_BUDGET_PER_SEC
}
pub(crate) fn default_prefer_4() -> u8 {
4
}

View File

@ -922,6 +922,39 @@ impl ProxyConfig {
));
}
if config.server.conntrack_control.pressure_high_watermark_pct == 0
|| config.server.conntrack_control.pressure_high_watermark_pct > 100
{
return Err(ProxyError::Config(
"server.conntrack_control.pressure_high_watermark_pct must be within [1, 100]"
.to_string(),
));
}
if config.server.conntrack_control.pressure_low_watermark_pct
>= config.server.conntrack_control.pressure_high_watermark_pct
{
return Err(ProxyError::Config(
"server.conntrack_control.pressure_low_watermark_pct must be < pressure_high_watermark_pct"
.to_string(),
));
}
if config.server.conntrack_control.delete_budget_per_sec == 0 {
return Err(ProxyError::Config(
"server.conntrack_control.delete_budget_per_sec must be > 0".to_string(),
));
}
if matches!(config.server.conntrack_control.mode, ConntrackMode::Hybrid)
&& config.server.conntrack_control.hybrid_listener_ips.is_empty()
{
return Err(ProxyError::Config(
"server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid"
.to_string(),
));
}
if config.general.effective_me_pool_force_close_secs() > 0
&& config.general.effective_me_pool_force_close_secs()
< config.general.me_pool_drain_ttl_secs
@ -1327,6 +1360,31 @@ mod tests {
cfg.server.api.runtime_edge_events_capacity,
default_api_runtime_edge_events_capacity()
);
assert_eq!(
cfg.server.conntrack_control.inline_conntrack_control,
default_conntrack_control_enabled()
);
assert_eq!(cfg.server.conntrack_control.mode, ConntrackMode::default());
assert_eq!(
cfg.server.conntrack_control.backend,
ConntrackBackend::default()
);
assert_eq!(
cfg.server.conntrack_control.profile,
ConntrackPressureProfile::default()
);
assert_eq!(
cfg.server.conntrack_control.pressure_high_watermark_pct,
default_conntrack_pressure_high_watermark_pct()
);
assert_eq!(
cfg.server.conntrack_control.pressure_low_watermark_pct,
default_conntrack_pressure_low_watermark_pct()
);
assert_eq!(
cfg.server.conntrack_control.delete_budget_per_sec,
default_conntrack_delete_budget_per_sec()
);
assert_eq!(cfg.access.users, default_access_users());
assert_eq!(
cfg.access.user_max_tcp_conns_global_each,
@ -1472,6 +1530,31 @@ mod tests {
server.api.runtime_edge_events_capacity,
default_api_runtime_edge_events_capacity()
);
assert_eq!(
server.conntrack_control.inline_conntrack_control,
default_conntrack_control_enabled()
);
assert_eq!(server.conntrack_control.mode, ConntrackMode::default());
assert_eq!(
server.conntrack_control.backend,
ConntrackBackend::default()
);
assert_eq!(
server.conntrack_control.profile,
ConntrackPressureProfile::default()
);
assert_eq!(
server.conntrack_control.pressure_high_watermark_pct,
default_conntrack_pressure_high_watermark_pct()
);
assert_eq!(
server.conntrack_control.pressure_low_watermark_pct,
default_conntrack_pressure_low_watermark_pct()
);
assert_eq!(
server.conntrack_control.delete_budget_per_sec,
default_conntrack_delete_budget_per_sec()
);
let access = AccessConfig::default();
assert_eq!(access.users, default_access_users());
@ -2404,6 +2487,118 @@ mod tests {
let _ = std::fs::remove_file(path);
}
#[test]
fn conntrack_pressure_high_watermark_out_of_range_is_rejected() {
let toml = r#"
[server.conntrack_control]
pressure_high_watermark_pct = 0
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_conntrack_high_watermark_invalid_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(
err.contains("server.conntrack_control.pressure_high_watermark_pct must be within [1, 100]")
);
let _ = std::fs::remove_file(path);
}
#[test]
fn conntrack_pressure_low_watermark_must_be_below_high() {
let toml = r#"
[server.conntrack_control]
pressure_high_watermark_pct = 50
pressure_low_watermark_pct = 50
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_conntrack_low_watermark_invalid_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(
err.contains(
"server.conntrack_control.pressure_low_watermark_pct must be < pressure_high_watermark_pct"
)
);
let _ = std::fs::remove_file(path);
}
#[test]
fn conntrack_delete_budget_zero_is_rejected() {
let toml = r#"
[server.conntrack_control]
delete_budget_per_sec = 0
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_conntrack_delete_budget_invalid_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("server.conntrack_control.delete_budget_per_sec must be > 0"));
let _ = std::fs::remove_file(path);
}
#[test]
fn conntrack_hybrid_mode_requires_listener_allow_list() {
let toml = r#"
[server.conntrack_control]
mode = "hybrid"
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_conntrack_hybrid_requires_ips_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(
err.contains("server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid")
);
let _ = std::fs::remove_file(path);
}
#[test]
fn conntrack_profile_is_loaded_from_config() {
let toml = r#"
[server.conntrack_control]
profile = "aggressive"
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_conntrack_profile_parse_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert_eq!(
cfg.server.conntrack_control.profile,
ConntrackPressureProfile::Aggressive
);
let _ = std::fs::remove_file(path);
}
#[test]
fn force_close_default_matches_drain_ttl() {
let toml = r#"

View File

@ -1216,6 +1216,118 @@ impl Default for ApiConfig {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum ConntrackMode {
#[default]
Tracked,
Notrack,
Hybrid,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum ConntrackBackend {
#[default]
Auto,
Nftables,
Iptables,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum ConntrackPressureProfile {
Conservative,
#[default]
Balanced,
Aggressive,
}
impl ConntrackPressureProfile {
pub fn client_first_byte_idle_cap_secs(self) -> u64 {
match self {
Self::Conservative => 30,
Self::Balanced => 20,
Self::Aggressive => 10,
}
}
pub fn direct_activity_timeout_secs(self) -> u64 {
match self {
Self::Conservative => 180,
Self::Balanced => 120,
Self::Aggressive => 60,
}
}
pub fn middle_soft_idle_cap_secs(self) -> u64 {
match self {
Self::Conservative => 60,
Self::Balanced => 30,
Self::Aggressive => 20,
}
}
pub fn middle_hard_idle_cap_secs(self) -> u64 {
match self {
Self::Conservative => 180,
Self::Balanced => 90,
Self::Aggressive => 60,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConntrackControlConfig {
/// Enables runtime conntrack-control worker for pressure mitigation.
#[serde(default = "default_conntrack_control_enabled")]
pub inline_conntrack_control: bool,
/// Conntrack mode for listener ingress traffic.
#[serde(default)]
pub mode: ConntrackMode,
/// Netfilter backend used to reconcile notrack rules.
#[serde(default)]
pub backend: ConntrackBackend,
/// Pressure profile for timeout caps under resource saturation.
#[serde(default)]
pub profile: ConntrackPressureProfile,
/// Listener IP allow-list for hybrid mode.
/// Ignored in tracked/notrack mode.
#[serde(default)]
pub hybrid_listener_ips: Vec<IpAddr>,
/// Pressure high watermark as percentage.
#[serde(default = "default_conntrack_pressure_high_watermark_pct")]
pub pressure_high_watermark_pct: u8,
/// Pressure low watermark as percentage.
#[serde(default = "default_conntrack_pressure_low_watermark_pct")]
pub pressure_low_watermark_pct: u8,
/// Maximum conntrack delete operations per second.
#[serde(default = "default_conntrack_delete_budget_per_sec")]
pub delete_budget_per_sec: u64,
}
impl Default for ConntrackControlConfig {
fn default() -> Self {
Self {
inline_conntrack_control: default_conntrack_control_enabled(),
mode: ConntrackMode::default(),
backend: ConntrackBackend::default(),
profile: ConntrackPressureProfile::default(),
hybrid_listener_ips: Vec::new(),
pressure_high_watermark_pct: default_conntrack_pressure_high_watermark_pct(),
pressure_low_watermark_pct: default_conntrack_pressure_low_watermark_pct(),
delete_budget_per_sec: default_conntrack_delete_budget_per_sec(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
#[serde(default = "default_port")]
@ -1291,6 +1403,10 @@ pub struct ServerConfig {
/// `0` keeps legacy unbounded wait behavior.
#[serde(default = "default_accept_permit_timeout_ms")]
pub accept_permit_timeout_ms: u64,
/// Runtime conntrack control and pressure policy.
#[serde(default)]
pub conntrack_control: ConntrackControlConfig,
}
impl Default for ServerConfig {
@ -1313,6 +1429,7 @@ impl Default for ServerConfig {
listen_backlog: default_listen_backlog(),
max_connections: default_server_max_connections(),
accept_permit_timeout_ms: default_accept_permit_timeout_ms(),
conntrack_control: ConntrackControlConfig::default(),
}
}
}

704
src/conntrack_control.rs Normal file
View File

@ -0,0 +1,704 @@
use std::collections::BTreeSet;
use std::net::IpAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio::sync::{mpsc, watch};
use tracing::{debug, info, warn};
use crate::config::{ConntrackBackend, ConntrackMode, ProxyConfig};
use crate::proxy::middle_relay::note_global_relay_pressure;
use crate::proxy::shared_state::{ConntrackCloseEvent, ConntrackCloseReason, ProxySharedState};
use crate::stats::Stats;
const CONNTRACK_EVENT_QUEUE_CAPACITY: usize = 32_768;
const PRESSURE_RELEASE_TICKS: u8 = 3;
const PRESSURE_SAMPLE_INTERVAL: Duration = Duration::from_secs(1);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum NetfilterBackend {
Nftables,
Iptables,
}
#[derive(Clone, Copy)]
struct PressureSample {
conn_pct: Option<u8>,
fd_pct: Option<u8>,
accept_timeout_delta: u64,
me_queue_pressure_delta: u64,
}
struct PressureState {
active: bool,
low_streak: u8,
prev_accept_timeout_total: u64,
prev_me_queue_pressure_total: u64,
}
impl PressureState {
fn new(stats: &Stats) -> Self {
Self {
active: false,
low_streak: 0,
prev_accept_timeout_total: stats.get_accept_permit_timeout_total(),
prev_me_queue_pressure_total: stats.get_me_c2me_send_full_total(),
}
}
}
pub(crate) fn spawn_conntrack_controller(
config_rx: watch::Receiver<Arc<ProxyConfig>>,
stats: Arc<Stats>,
shared: Arc<ProxySharedState>,
) {
if !cfg!(target_os = "linux") {
let enabled = config_rx.borrow().server.conntrack_control.inline_conntrack_control;
stats.set_conntrack_control_enabled(enabled);
stats.set_conntrack_control_available(false);
stats.set_conntrack_pressure_active(false);
stats.set_conntrack_event_queue_depth(0);
stats.set_conntrack_rule_apply_ok(false);
shared.disable_conntrack_close_sender();
shared.set_conntrack_pressure_active(false);
if enabled {
warn!("conntrack control is configured but unsupported on this OS; disabling runtime worker");
}
return;
}
let (tx, rx) = mpsc::channel(CONNTRACK_EVENT_QUEUE_CAPACITY);
shared.set_conntrack_close_sender(tx);
tokio::spawn(async move {
run_conntrack_controller(config_rx, stats, shared, rx).await;
});
}
async fn run_conntrack_controller(
mut config_rx: watch::Receiver<Arc<ProxyConfig>>,
stats: Arc<Stats>,
shared: Arc<ProxySharedState>,
mut close_rx: mpsc::Receiver<ConntrackCloseEvent>,
) {
let mut cfg = config_rx.borrow().clone();
let mut pressure_state = PressureState::new(stats.as_ref());
let mut delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec;
let mut backend = pick_backend(cfg.server.conntrack_control.backend);
apply_runtime_state(stats.as_ref(), shared.as_ref(), &cfg, backend.is_some(), false);
reconcile_rules(&cfg, backend, stats.as_ref()).await;
loop {
tokio::select! {
changed = config_rx.changed() => {
if changed.is_err() {
break;
}
cfg = config_rx.borrow_and_update().clone();
backend = pick_backend(cfg.server.conntrack_control.backend);
delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec;
apply_runtime_state(stats.as_ref(), shared.as_ref(), &cfg, backend.is_some(), pressure_state.active);
reconcile_rules(&cfg, backend, stats.as_ref()).await;
}
event = close_rx.recv() => {
let Some(event) = event else {
break;
};
stats.set_conntrack_event_queue_depth(close_rx.len() as u64);
if !cfg.server.conntrack_control.inline_conntrack_control {
continue;
}
if !pressure_state.active {
continue;
}
if !matches!(event.reason, ConntrackCloseReason::Timeout | ConntrackCloseReason::Pressure | ConntrackCloseReason::Reset) {
continue;
}
if delete_budget_tokens == 0 {
continue;
}
stats.increment_conntrack_delete_attempt_total();
match delete_conntrack_entry(event).await {
DeleteOutcome::Deleted => {
delete_budget_tokens = delete_budget_tokens.saturating_sub(1);
stats.increment_conntrack_delete_success_total();
}
DeleteOutcome::NotFound => {
delete_budget_tokens = delete_budget_tokens.saturating_sub(1);
stats.increment_conntrack_delete_not_found_total();
}
DeleteOutcome::Error => {
delete_budget_tokens = delete_budget_tokens.saturating_sub(1);
stats.increment_conntrack_delete_error_total();
}
}
}
_ = tokio::time::sleep(PRESSURE_SAMPLE_INTERVAL) => {
delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec;
stats.set_conntrack_event_queue_depth(close_rx.len() as u64);
let sample = collect_pressure_sample(stats.as_ref(), &cfg, &mut pressure_state);
update_pressure_state(
stats.as_ref(),
shared.as_ref(),
&cfg,
&sample,
&mut pressure_state,
);
if pressure_state.active {
note_global_relay_pressure(shared.as_ref());
}
}
}
}
shared.disable_conntrack_close_sender();
shared.set_conntrack_pressure_active(false);
stats.set_conntrack_pressure_active(false);
}
fn apply_runtime_state(
stats: &Stats,
shared: &ProxySharedState,
cfg: &ProxyConfig,
backend_available: bool,
pressure_active: bool,
) {
let enabled = cfg.server.conntrack_control.inline_conntrack_control;
let available = enabled && backend_available && has_cap_net_admin();
if enabled && !available {
warn!(
"conntrack control enabled but unavailable (missing CAP_NET_ADMIN or backend binaries)"
);
}
stats.set_conntrack_control_enabled(enabled);
stats.set_conntrack_control_available(available);
shared.set_conntrack_pressure_active(enabled && pressure_active);
stats.set_conntrack_pressure_active(enabled && pressure_active);
}
fn collect_pressure_sample(
stats: &Stats,
cfg: &ProxyConfig,
state: &mut PressureState,
) -> PressureSample {
let current_connections = stats.get_current_connections_total();
let conn_pct = if cfg.server.max_connections == 0 {
None
} else {
Some(
((current_connections.saturating_mul(100)) / u64::from(cfg.server.max_connections))
.min(100) as u8,
)
};
let fd_pct = fd_usage_pct();
let accept_total = stats.get_accept_permit_timeout_total();
let accept_delta = accept_total.saturating_sub(state.prev_accept_timeout_total);
state.prev_accept_timeout_total = accept_total;
let me_total = stats.get_me_c2me_send_full_total();
let me_delta = me_total.saturating_sub(state.prev_me_queue_pressure_total);
state.prev_me_queue_pressure_total = me_total;
PressureSample {
conn_pct,
fd_pct,
accept_timeout_delta: accept_delta,
me_queue_pressure_delta: me_delta,
}
}
fn update_pressure_state(
stats: &Stats,
shared: &ProxySharedState,
cfg: &ProxyConfig,
sample: &PressureSample,
state: &mut PressureState,
) {
if !cfg.server.conntrack_control.inline_conntrack_control {
if state.active {
state.active = false;
state.low_streak = 0;
shared.set_conntrack_pressure_active(false);
stats.set_conntrack_pressure_active(false);
info!("Conntrack pressure mode deactivated (feature disabled)");
}
return;
}
let high = cfg.server.conntrack_control.pressure_high_watermark_pct;
let low = cfg.server.conntrack_control.pressure_low_watermark_pct;
let high_hit = sample.conn_pct.is_some_and(|v| v >= high)
|| sample.fd_pct.is_some_and(|v| v >= high)
|| sample.accept_timeout_delta > 0
|| sample.me_queue_pressure_delta > 0;
let low_clear = sample.conn_pct.is_none_or(|v| v <= low)
&& sample.fd_pct.is_none_or(|v| v <= low)
&& sample.accept_timeout_delta == 0
&& sample.me_queue_pressure_delta == 0;
if !state.active && high_hit {
state.active = true;
state.low_streak = 0;
shared.set_conntrack_pressure_active(true);
stats.set_conntrack_pressure_active(true);
info!(
conn_pct = ?sample.conn_pct,
fd_pct = ?sample.fd_pct,
accept_timeout_delta = sample.accept_timeout_delta,
me_queue_pressure_delta = sample.me_queue_pressure_delta,
"Conntrack pressure mode activated"
);
return;
}
if state.active && low_clear {
state.low_streak = state.low_streak.saturating_add(1);
if state.low_streak >= PRESSURE_RELEASE_TICKS {
state.active = false;
state.low_streak = 0;
shared.set_conntrack_pressure_active(false);
stats.set_conntrack_pressure_active(false);
info!("Conntrack pressure mode deactivated");
}
return;
}
state.low_streak = 0;
}
async fn reconcile_rules(cfg: &ProxyConfig, backend: Option<NetfilterBackend>, stats: &Stats) {
if !cfg.server.conntrack_control.inline_conntrack_control {
clear_notrack_rules_all_backends().await;
stats.set_conntrack_rule_apply_ok(true);
return;
}
if !has_cap_net_admin() {
stats.set_conntrack_rule_apply_ok(false);
return;
}
let Some(backend) = backend else {
stats.set_conntrack_rule_apply_ok(false);
return;
};
let apply_result = match backend {
NetfilterBackend::Nftables => apply_nft_rules(cfg).await,
NetfilterBackend::Iptables => apply_iptables_rules(cfg).await,
};
if let Err(error) = apply_result {
warn!(error = %error, "Failed to reconcile conntrack/notrack rules");
stats.set_conntrack_rule_apply_ok(false);
} else {
stats.set_conntrack_rule_apply_ok(true);
}
}
fn pick_backend(configured: ConntrackBackend) -> Option<NetfilterBackend> {
match configured {
ConntrackBackend::Auto => {
if command_exists("nft") {
Some(NetfilterBackend::Nftables)
} else if command_exists("iptables") {
Some(NetfilterBackend::Iptables)
} else {
None
}
}
ConntrackBackend::Nftables => command_exists("nft").then_some(NetfilterBackend::Nftables),
ConntrackBackend::Iptables => command_exists("iptables").then_some(NetfilterBackend::Iptables),
}
}
fn command_exists(binary: &str) -> bool {
let Some(path_var) = std::env::var_os("PATH") else {
return false;
};
std::env::split_paths(&path_var).any(|dir| {
let candidate: PathBuf = dir.join(binary);
candidate.exists() && candidate.is_file()
})
}
fn notrack_targets(cfg: &ProxyConfig) -> (Vec<Option<IpAddr>>, Vec<Option<IpAddr>>) {
let mode = cfg.server.conntrack_control.mode;
let mut v4_targets: BTreeSet<Option<IpAddr>> = BTreeSet::new();
let mut v6_targets: BTreeSet<Option<IpAddr>> = BTreeSet::new();
match mode {
ConntrackMode::Tracked => {}
ConntrackMode::Notrack => {
if cfg.server.listeners.is_empty() {
if let Some(ipv4) = cfg
.server
.listen_addr_ipv4
.as_ref()
.and_then(|s| s.parse::<IpAddr>().ok())
{
if ipv4.is_unspecified() {
v4_targets.insert(None);
} else {
v4_targets.insert(Some(ipv4));
}
}
if let Some(ipv6) = cfg
.server
.listen_addr_ipv6
.as_ref()
.and_then(|s| s.parse::<IpAddr>().ok())
{
if ipv6.is_unspecified() {
v6_targets.insert(None);
} else {
v6_targets.insert(Some(ipv6));
}
}
} else {
for listener in &cfg.server.listeners {
if listener.ip.is_ipv4() {
if listener.ip.is_unspecified() {
v4_targets.insert(None);
} else {
v4_targets.insert(Some(listener.ip));
}
} else if listener.ip.is_unspecified() {
v6_targets.insert(None);
} else {
v6_targets.insert(Some(listener.ip));
}
}
}
}
ConntrackMode::Hybrid => {
for ip in &cfg.server.conntrack_control.hybrid_listener_ips {
if ip.is_ipv4() {
v4_targets.insert(Some(*ip));
} else {
v6_targets.insert(Some(*ip));
}
}
}
}
(
v4_targets.into_iter().collect(),
v6_targets.into_iter().collect(),
)
}
async fn apply_nft_rules(cfg: &ProxyConfig) -> Result<(), String> {
let _ = run_command("nft", &["delete", "table", "inet", "telemt_conntrack"], None).await;
if matches!(cfg.server.conntrack_control.mode, ConntrackMode::Tracked) {
return Ok(());
}
let (v4_targets, v6_targets) = notrack_targets(cfg);
let mut rules = Vec::new();
for ip in v4_targets {
let rule = if let Some(ip) = ip {
format!("tcp dport {} ip daddr {} notrack", cfg.server.port, ip)
} else {
format!("tcp dport {} notrack", cfg.server.port)
};
rules.push(rule);
}
for ip in v6_targets {
let rule = if let Some(ip) = ip {
format!("tcp dport {} ip6 daddr {} notrack", cfg.server.port, ip)
} else {
format!("tcp dport {} notrack", cfg.server.port)
};
rules.push(rule);
}
let rule_blob = if rules.is_empty() {
String::new()
} else {
format!(" {}\n", rules.join("\n "))
};
let script = format!(
"table inet telemt_conntrack {{\n chain preraw {{\n type filter hook prerouting priority raw; policy accept;\n{rule_blob} }}\n}}\n"
);
run_command("nft", &["-f", "-"], Some(script)).await
}
async fn apply_iptables_rules(cfg: &ProxyConfig) -> Result<(), String> {
apply_iptables_rules_for_binary("iptables", cfg, true).await?;
apply_iptables_rules_for_binary("ip6tables", cfg, false).await?;
Ok(())
}
async fn apply_iptables_rules_for_binary(
binary: &str,
cfg: &ProxyConfig,
ipv4: bool,
) -> Result<(), String> {
if !command_exists(binary) {
return Ok(());
}
let chain = "TELEMT_NOTRACK";
let _ = run_command(binary, &["-t", "raw", "-D", "PREROUTING", "-j", chain], None).await;
let _ = run_command(binary, &["-t", "raw", "-F", chain], None).await;
let _ = run_command(binary, &["-t", "raw", "-X", chain], None).await;
if matches!(cfg.server.conntrack_control.mode, ConntrackMode::Tracked) {
return Ok(());
}
run_command(binary, &["-t", "raw", "-N", chain], None).await?;
run_command(binary, &["-t", "raw", "-F", chain], None).await?;
if run_command(binary, &["-t", "raw", "-C", "PREROUTING", "-j", chain], None).await.is_err() {
run_command(binary, &["-t", "raw", "-I", "PREROUTING", "1", "-j", chain], None).await?;
}
let (v4_targets, v6_targets) = notrack_targets(cfg);
let selected = if ipv4 { v4_targets } else { v6_targets };
for ip in selected {
let mut args = vec![
"-t".to_string(),
"raw".to_string(),
"-A".to_string(),
chain.to_string(),
"-p".to_string(),
"tcp".to_string(),
"--dport".to_string(),
cfg.server.port.to_string(),
];
if let Some(ip) = ip {
args.push("-d".to_string());
args.push(ip.to_string());
}
args.push("-j".to_string());
args.push("CT".to_string());
args.push("--notrack".to_string());
let arg_refs: Vec<&str> = args.iter().map(String::as_str).collect();
run_command(binary, &arg_refs, None).await?;
}
Ok(())
}
async fn clear_notrack_rules_all_backends() {
let _ = run_command("nft", &["delete", "table", "inet", "telemt_conntrack"], None).await;
let _ = run_command("iptables", &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], None).await;
let _ = run_command("iptables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await;
let _ = run_command("iptables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await;
let _ = run_command("ip6tables", &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], None).await;
let _ = run_command("ip6tables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await;
let _ = run_command("ip6tables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await;
}
enum DeleteOutcome {
Deleted,
NotFound,
Error,
}
async fn delete_conntrack_entry(event: ConntrackCloseEvent) -> DeleteOutcome {
if !command_exists("conntrack") {
return DeleteOutcome::Error;
}
let args = vec![
"-D".to_string(),
"-p".to_string(),
"tcp".to_string(),
"-s".to_string(),
event.src.ip().to_string(),
"--sport".to_string(),
event.src.port().to_string(),
"-d".to_string(),
event.dst.ip().to_string(),
"--dport".to_string(),
event.dst.port().to_string(),
];
let arg_refs: Vec<&str> = args.iter().map(String::as_str).collect();
match run_command("conntrack", &arg_refs, None).await {
Ok(()) => DeleteOutcome::Deleted,
Err(error) => {
if error.contains("0 flow entries have been deleted") {
DeleteOutcome::NotFound
} else {
debug!(error = %error, "conntrack delete failed");
DeleteOutcome::Error
}
}
}
}
async fn run_command(binary: &str, args: &[&str], stdin: Option<String>) -> Result<(), String> {
if !command_exists(binary) {
return Err(format!("{binary} is not available"));
}
let mut command = Command::new(binary);
command.args(args);
if stdin.is_some() {
command.stdin(std::process::Stdio::piped());
}
command.stdout(std::process::Stdio::null());
command.stderr(std::process::Stdio::piped());
let mut child = command
.spawn()
.map_err(|e| format!("spawn {binary} failed: {e}"))?;
if let Some(blob) = stdin
&& let Some(mut writer) = child.stdin.take()
{
writer
.write_all(blob.as_bytes())
.await
.map_err(|e| format!("stdin write {binary} failed: {e}"))?;
}
let output = child
.wait_with_output()
.await
.map_err(|e| format!("wait {binary} failed: {e}"))?;
if output.status.success() {
return Ok(());
}
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
Err(if stderr.is_empty() {
format!("{binary} exited with status {}", output.status)
} else {
stderr
})
}
fn fd_usage_pct() -> Option<u8> {
let soft_limit = nofile_soft_limit()?;
if soft_limit == 0 {
return None;
}
let fd_count = std::fs::read_dir("/proc/self/fd").ok()?.count() as u64;
Some(((fd_count.saturating_mul(100)) / soft_limit).min(100) as u8)
}
fn nofile_soft_limit() -> Option<u64> {
#[cfg(target_os = "linux")]
{
let mut lim = libc::rlimit {
rlim_cur: 0,
rlim_max: 0,
};
let rc = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut lim) };
if rc != 0 {
return None;
}
return Some(lim.rlim_cur);
}
#[cfg(not(target_os = "linux"))]
{
None
}
}
fn has_cap_net_admin() -> bool {
#[cfg(target_os = "linux")]
{
let Ok(status) = std::fs::read_to_string("/proc/self/status") else {
return false;
};
for line in status.lines() {
if let Some(raw) = line.strip_prefix("CapEff:") {
let caps = raw.trim();
if let Ok(bits) = u64::from_str_radix(caps, 16) {
const CAP_NET_ADMIN_BIT: u64 = 12;
return (bits & (1u64 << CAP_NET_ADMIN_BIT)) != 0;
}
}
}
false
}
#[cfg(not(target_os = "linux"))]
{
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ProxyConfig;
#[test]
fn pressure_activates_on_accept_timeout_spike() {
let stats = Stats::new();
let shared = ProxySharedState::new();
let mut cfg = ProxyConfig::default();
cfg.server.conntrack_control.inline_conntrack_control = true;
let mut state = PressureState::new(&stats);
let sample = PressureSample {
conn_pct: Some(10),
fd_pct: Some(10),
accept_timeout_delta: 1,
me_queue_pressure_delta: 0,
};
update_pressure_state(&stats, shared.as_ref(), &cfg, &sample, &mut state);
assert!(state.active);
assert!(shared.conntrack_pressure_active());
assert!(stats.get_conntrack_pressure_active());
}
#[test]
fn pressure_releases_after_hysteresis_window() {
let stats = Stats::new();
let shared = ProxySharedState::new();
let mut cfg = ProxyConfig::default();
cfg.server.conntrack_control.inline_conntrack_control = true;
let mut state = PressureState::new(&stats);
let high_sample = PressureSample {
conn_pct: Some(95),
fd_pct: Some(95),
accept_timeout_delta: 0,
me_queue_pressure_delta: 0,
};
update_pressure_state(&stats, shared.as_ref(), &cfg, &high_sample, &mut state);
assert!(state.active);
let low_sample = PressureSample {
conn_pct: Some(10),
fd_pct: Some(10),
accept_timeout_delta: 0,
me_queue_pressure_delta: 0,
};
update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state);
assert!(state.active);
update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state);
assert!(state.active);
update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state);
assert!(!state.active);
assert!(!shared.conntrack_pressure_active());
assert!(!stats.get_conntrack_pressure_active());
}
#[test]
fn pressure_does_not_activate_when_disabled() {
let stats = Stats::new();
let shared = ProxySharedState::new();
let mut cfg = ProxyConfig::default();
cfg.server.conntrack_control.inline_conntrack_control = false;
let mut state = PressureState::new(&stats);
let sample = PressureSample {
conn_pct: Some(100),
fd_pct: Some(100),
accept_timeout_delta: 10,
me_queue_pressure_delta: 10,
};
update_pressure_state(&stats, shared.as_ref(), &cfg, &sample, &mut state);
assert!(!state.active);
assert!(!shared.conntrack_pressure_active());
assert!(!stats.get_conntrack_pressure_active());
}
}

View File

@ -262,6 +262,7 @@ pub(crate) async fn bind_listeners(
break;
}
Err(_) => {
stats.increment_accept_permit_timeout_total();
debug!(
timeout_ms = accept_permit_timeout_ms,
"Dropping accepted unix connection: permit wait timeout"
@ -407,6 +408,7 @@ pub(crate) fn spawn_tcp_accept_loops(
break;
}
Err(_) => {
stats.increment_accept_permit_timeout_total();
debug!(
peer = %peer_addr,
timeout_ms = accept_permit_timeout_ms,

View File

@ -28,6 +28,7 @@ use tracing::{error, info, warn};
use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload};
use crate::api;
use crate::conntrack_control;
use crate::config::{LogLevel, ProxyConfig};
use crate::crypto::SecureRandom;
use crate::ip_tracker::UserIpTracker;
@ -633,6 +634,11 @@ async fn run_inner(
.await;
let _admission_tx_hold = admission_tx;
let shared_state = ProxySharedState::new();
conntrack_control::spawn_conntrack_controller(
config_rx.clone(),
stats.clone(),
shared_state.clone(),
);
let bound = listeners::bind_listeners(
&config,

View File

@ -2,6 +2,7 @@
mod api;
mod cli;
mod conntrack_control;
mod config;
mod crypto;
#[cfg(unix)]

View File

@ -359,6 +359,134 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
}
);
let _ = writeln!(
out,
"# HELP telemt_accept_permit_timeout_total Accepted connections dropped due to permit wait timeout"
);
let _ = writeln!(out, "# TYPE telemt_accept_permit_timeout_total counter");
let _ = writeln!(
out,
"telemt_accept_permit_timeout_total {}",
if core_enabled {
stats.get_accept_permit_timeout_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_conntrack_control_state Runtime conntrack control state flags"
);
let _ = writeln!(out, "# TYPE telemt_conntrack_control_state gauge");
let _ = writeln!(
out,
"telemt_conntrack_control_state{{flag=\"enabled\"}} {}",
if stats.get_conntrack_control_enabled() {
1
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_control_state{{flag=\"available\"}} {}",
if stats.get_conntrack_control_available() {
1
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_control_state{{flag=\"pressure_active\"}} {}",
if stats.get_conntrack_pressure_active() {
1
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_control_state{{flag=\"rule_apply_ok\"}} {}",
if stats.get_conntrack_rule_apply_ok() {
1
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_conntrack_event_queue_depth Pending close events in conntrack control queue"
);
let _ = writeln!(out, "# TYPE telemt_conntrack_event_queue_depth gauge");
let _ = writeln!(
out,
"telemt_conntrack_event_queue_depth {}",
stats.get_conntrack_event_queue_depth()
);
let _ = writeln!(
out,
"# HELP telemt_conntrack_delete_total Conntrack delete attempts by outcome"
);
let _ = writeln!(out, "# TYPE telemt_conntrack_delete_total counter");
let _ = writeln!(
out,
"telemt_conntrack_delete_total{{result=\"attempt\"}} {}",
if core_enabled {
stats.get_conntrack_delete_attempt_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_delete_total{{result=\"success\"}} {}",
if core_enabled {
stats.get_conntrack_delete_success_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_delete_total{{result=\"not_found\"}} {}",
if core_enabled {
stats.get_conntrack_delete_not_found_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_delete_total{{result=\"error\"}} {}",
if core_enabled {
stats.get_conntrack_delete_error_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_conntrack_close_event_drop_total Dropped conntrack close events due to queue pressure or unavailable sender"
);
let _ = writeln!(
out,
"# TYPE telemt_conntrack_close_event_drop_total counter"
);
let _ = writeln!(
out,
"telemt_conntrack_close_event_drop_total {}",
if core_enabled {
stats.get_conntrack_close_event_drop_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_upstream_connect_attempt_total Upstream connect attempts across all requests"

View File

@ -80,7 +80,7 @@ use crate::transport::middle_proxy::MePool;
use crate::transport::socket::normalize_ip;
use crate::transport::{UpstreamManager, configure_client_socket, parse_proxy_protocol};
use crate::proxy::direct_relay::handle_via_direct;
use crate::proxy::direct_relay::handle_via_direct_with_shared;
use crate::proxy::handshake::{
HandshakeSuccess, handle_mtproto_handshake_with_shared, handle_tls_handshake_with_shared,
};
@ -191,6 +191,24 @@ fn handshake_timeout_with_mask_grace(config: &ProxyConfig) -> Duration {
}
}
fn effective_client_first_byte_idle_secs(config: &ProxyConfig, shared: &ProxySharedState) -> u64 {
let idle_secs = config.timeouts.client_first_byte_idle_secs;
if idle_secs == 0 {
return 0;
}
if shared.conntrack_pressure_active() {
idle_secs.min(
config
.server
.conntrack_control
.profile
.client_first_byte_idle_cap_secs(),
)
} else {
idle_secs
}
}
const MASK_CLASSIFIER_PREFETCH_WINDOW: usize = 16;
#[cfg(test)]
const MASK_CLASSIFIER_PREFETCH_TIMEOUT: Duration = Duration::from_millis(5);
@ -463,10 +481,11 @@ where
debug!(peer = %real_peer, "New connection (generic stream)");
let first_byte = if config.timeouts.client_first_byte_idle_secs == 0 {
let first_byte_idle_secs = effective_client_first_byte_idle_secs(&config, shared.as_ref());
let first_byte = if first_byte_idle_secs == 0 {
None
} else {
let idle_timeout = Duration::from_secs(config.timeouts.client_first_byte_idle_secs);
let idle_timeout = Duration::from_secs(first_byte_idle_secs);
let mut first_byte = [0u8; 1];
match timeout(idle_timeout, stream.read(&mut first_byte)).await {
Ok(Ok(0)) => {
@ -499,15 +518,15 @@ where
);
return Err(ProxyError::Io(e));
}
Err(_) => {
debug!(
peer = %real_peer,
idle_secs = config.timeouts.client_first_byte_idle_secs,
"Closing idle pooled connection before first client byte"
);
return Ok(());
Err(_) => {
debug!(
peer = %real_peer,
idle_secs = first_byte_idle_secs,
"Closing idle pooled connection before first client byte"
);
return Ok(());
}
}
}
};
let handshake_timeout = handshake_timeout_with_mask_grace(&config);
@ -968,11 +987,12 @@ impl RunningClientHandler {
}
}
let first_byte = if self.config.timeouts.client_first_byte_idle_secs == 0 {
let first_byte_idle_secs =
effective_client_first_byte_idle_secs(&self.config, self.shared.as_ref());
let first_byte = if first_byte_idle_secs == 0 {
None
} else {
let idle_timeout =
Duration::from_secs(self.config.timeouts.client_first_byte_idle_secs);
let idle_timeout = Duration::from_secs(first_byte_idle_secs);
let mut first_byte = [0u8; 1];
match timeout(idle_timeout, self.stream.read(&mut first_byte)).await {
Ok(Ok(0)) => {
@ -1008,7 +1028,7 @@ impl RunningClientHandler {
Err(_) => {
debug!(
peer = %self.peer,
idle_secs = self.config.timeouts.client_first_byte_idle_secs,
idle_secs = first_byte_idle_secs,
"Closing idle pooled connection before first client byte"
);
return Ok(None);
@ -1395,7 +1415,7 @@ impl RunningClientHandler {
local_addr: SocketAddr,
peer_addr: SocketAddr,
ip_tracker: Arc<UserIpTracker>,
_shared: Arc<ProxySharedState>,
shared: Arc<ProxySharedState>,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
@ -1438,12 +1458,12 @@ impl RunningClientHandler {
route_runtime.subscribe(),
route_snapshot,
session_id,
_shared,
shared.clone(),
)
.await
} else {
warn!("use_middle_proxy=true but MePool not initialized, falling back to direct");
handle_via_direct(
handle_via_direct_with_shared(
client_reader,
client_writer,
success,
@ -1455,12 +1475,14 @@ impl RunningClientHandler {
route_runtime.subscribe(),
route_snapshot,
session_id,
local_addr,
shared.clone(),
)
.await
}
} else {
// Direct mode (original behavior)
handle_via_direct(
handle_via_direct_with_shared(
client_reader,
client_writer,
success,
@ -1472,6 +1494,8 @@ impl RunningClientHandler {
route_runtime.subscribe(),
route_snapshot,
session_id,
local_addr,
shared.clone(),
)
.await
};

View File

@ -6,6 +6,7 @@ use std::net::SocketAddr;
use std::path::{Component, Path, PathBuf};
use std::sync::Arc;
use std::sync::{Mutex, OnceLock};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf, split};
use tokio::sync::watch;
@ -16,7 +17,9 @@ use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result};
use crate::protocol::constants::*;
use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce};
use crate::proxy::relay::relay_bidirectional;
use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
};
use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay,
@ -225,7 +228,43 @@ fn unknown_dc_test_lock() -> &'static Mutex<()> {
TEST_LOCK.get_or_init(|| Mutex::new(()))
}
#[allow(dead_code)]
pub(crate) async fn handle_via_direct<R, W>(
client_reader: CryptoReader<R>,
client_writer: CryptoWriter<W>,
success: HandshakeSuccess,
upstream_manager: Arc<UpstreamManager>,
stats: Arc<Stats>,
config: Arc<ProxyConfig>,
buffer_pool: Arc<BufferPool>,
rng: Arc<SecureRandom>,
route_rx: watch::Receiver<RouteCutoverState>,
route_snapshot: RouteCutoverState,
session_id: u64,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
handle_via_direct_with_shared(
client_reader,
client_writer,
success,
upstream_manager,
stats,
config.clone(),
buffer_pool,
rng,
route_rx,
route_snapshot,
session_id,
SocketAddr::from(([0, 0, 0, 0], config.server.port)),
ProxySharedState::new(),
)
.await
}
pub(crate) async fn handle_via_direct_with_shared<R, W>(
client_reader: CryptoReader<R>,
client_writer: CryptoWriter<W>,
success: HandshakeSuccess,
@ -237,6 +276,8 @@ pub(crate) async fn handle_via_direct<R, W>(
mut route_rx: watch::Receiver<RouteCutoverState>,
route_snapshot: RouteCutoverState,
session_id: u64,
local_addr: SocketAddr,
shared: Arc<ProxySharedState>,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
@ -277,7 +318,18 @@ where
let _direct_connection_lease = stats.acquire_direct_connection_lease();
let buffer_pool_trim = Arc::clone(&buffer_pool);
let relay_result = relay_bidirectional(
let relay_activity_timeout = if shared.conntrack_pressure_active() {
Duration::from_secs(
config
.server
.conntrack_control
.profile
.direct_activity_timeout_secs(),
)
} else {
Duration::from_secs(1800)
};
let relay_result = crate::proxy::relay::relay_bidirectional_with_activity_timeout(
client_reader,
client_writer,
tg_reader,
@ -288,6 +340,7 @@ where
Arc::clone(&stats),
config.access.user_data_quota.get(user).copied(),
buffer_pool,
relay_activity_timeout,
);
tokio::pin!(relay_result);
let relay_result = loop {
@ -329,9 +382,52 @@ where
pool_snapshot.allocated,
pool_snapshot.allocated.saturating_sub(pool_snapshot.pooled),
);
let close_reason = classify_conntrack_close_reason(&relay_result);
let publish_result = shared.publish_conntrack_close_event(ConntrackCloseEvent {
src: success.peer,
dst: local_addr,
reason: close_reason,
});
if !matches!(
publish_result,
ConntrackClosePublishResult::Sent | ConntrackClosePublishResult::Disabled
) {
stats.increment_conntrack_close_event_drop_total();
}
relay_result
}
fn classify_conntrack_close_reason(result: &Result<()>) -> ConntrackCloseReason {
match result {
Ok(()) => ConntrackCloseReason::NormalEof,
Err(crate::error::ProxyError::Io(error))
if matches!(error.kind(), std::io::ErrorKind::TimedOut) =>
{
ConntrackCloseReason::Timeout
}
Err(crate::error::ProxyError::Io(error))
if matches!(
error.kind(),
std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::NotConnected
| std::io::ErrorKind::UnexpectedEof
) =>
{
ConntrackCloseReason::Reset
}
Err(crate::error::ProxyError::Proxy(message))
if message.contains("pressure") || message.contains("evicted") =>
{
ConntrackCloseReason::Pressure
}
Err(_) => ConntrackCloseReason::Other,
}
}
fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> {
let prefer_v6 = config.network.prefer == 6 && config.network.ipv6.unwrap_or(true);
let datacenters = if prefer_v6 {

View File

@ -16,12 +16,14 @@ use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::timeout;
use tracing::{debug, info, trace, warn};
use crate::config::ProxyConfig;
use crate::config::{ConntrackPressureProfile, ProxyConfig};
use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result};
use crate::protocol::constants::{secure_padding_len, *};
use crate::proxy::handshake::HandshakeSuccess;
use crate::proxy::shared_state::ProxySharedState;
use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
};
use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay,
@ -135,6 +137,10 @@ fn note_relay_pressure_event_in(shared: &ProxySharedState) {
guard.pressure_event_seq = guard.pressure_event_seq.wrapping_add(1);
}
pub(crate) fn note_global_relay_pressure(shared: &ProxySharedState) {
note_relay_pressure_event_in(shared);
}
fn relay_pressure_event_seq_in(shared: &ProxySharedState) -> u64 {
let guard = relay_idle_candidate_registry_lock_in(shared);
guard.pressure_event_seq
@ -241,6 +247,23 @@ impl RelayClientIdlePolicy {
legacy_frame_read_timeout: frame_read_timeout,
}
}
fn apply_pressure_caps(&mut self, profile: ConntrackPressureProfile) {
let pressure_soft_idle_cap = Duration::from_secs(profile.middle_soft_idle_cap_secs());
let pressure_hard_idle_cap = Duration::from_secs(profile.middle_hard_idle_cap_secs());
self.soft_idle = self.soft_idle.min(pressure_soft_idle_cap);
self.hard_idle = self.hard_idle.min(pressure_hard_idle_cap);
if self.soft_idle > self.hard_idle {
self.soft_idle = self.hard_idle;
}
self.legacy_frame_read_timeout = self
.legacy_frame_read_timeout
.min(pressure_hard_idle_cap);
if self.grace_after_downstream_activity > self.hard_idle {
self.grace_after_downstream_activity = self.hard_idle;
}
}
}
#[derive(Clone, Copy)]
@ -1027,7 +1050,12 @@ where
let translated_local_addr = me_pool.translate_our_addr(local_addr);
let frame_limit = config.general.max_client_frame;
let relay_idle_policy = RelayClientIdlePolicy::from_config(&config);
let mut relay_idle_policy = RelayClientIdlePolicy::from_config(&config);
let mut pressure_caps_applied = false;
if shared.conntrack_pressure_active() {
relay_idle_policy.apply_pressure_caps(config.server.conntrack_control.profile);
pressure_caps_applied = true;
}
let session_started_at = forensics.started_at;
let mut relay_idle_state = RelayClientIdleState::new(session_started_at);
let last_downstream_activity_ms = Arc::new(AtomicU64::new(0));
@ -1421,6 +1449,11 @@ where
let mut route_watch_open = true;
let mut seen_pressure_seq = relay_pressure_event_seq_in(shared.as_ref());
loop {
if shared.conntrack_pressure_active() && !pressure_caps_applied {
relay_idle_policy.apply_pressure_caps(config.server.conntrack_control.profile);
pressure_caps_applied = true;
}
if relay_idle_policy.enabled
&& maybe_evict_idle_candidate_on_pressure_in(
shared.as_ref(),
@ -1600,6 +1633,20 @@ where
frames_ok = frame_counter,
"ME relay cleanup"
);
let close_reason = classify_conntrack_close_reason(&result);
let publish_result = shared.publish_conntrack_close_event(ConntrackCloseEvent {
src: peer,
dst: local_addr,
reason: close_reason,
});
if !matches!(
publish_result,
ConntrackClosePublishResult::Sent | ConntrackClosePublishResult::Disabled
) {
stats.increment_conntrack_close_event_drop_total();
}
clear_relay_idle_candidate_in(shared.as_ref(), conn_id);
me_pool.registry().unregister(conn_id).await;
buffer_pool.trim_to(buffer_pool.max_buffers().min(64));
@ -1612,6 +1659,33 @@ where
result
}
fn classify_conntrack_close_reason(result: &Result<()>) -> ConntrackCloseReason {
match result {
Ok(()) => ConntrackCloseReason::NormalEof,
Err(ProxyError::Io(error)) if matches!(error.kind(), std::io::ErrorKind::TimedOut) => {
ConntrackCloseReason::Timeout
}
Err(ProxyError::Io(error))
if matches!(
error.kind(),
std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::NotConnected
| std::io::ErrorKind::UnexpectedEof
) =>
{
ConntrackCloseReason::Reset
}
Err(ProxyError::Proxy(message))
if message.contains("pressure") || message.contains("evicted") =>
{
ConntrackCloseReason::Pressure
}
Err(_) => ConntrackCloseReason::Other,
}
}
async fn read_client_payload_with_idle_policy_in<R>(
client_reader: &mut CryptoReader<R>,
proto_tag: ProtoTag,

View File

@ -70,6 +70,7 @@ use tracing::{debug, trace, warn};
///
/// iOS keeps Telegram connections alive in background for up to 30 minutes.
/// Closing earlier causes unnecessary reconnects and handshake overhead.
#[allow(dead_code)]
const ACTIVITY_TIMEOUT: Duration = Duration::from_secs(1800);
/// Watchdog check interval — also used for periodic rate logging.
@ -453,6 +454,7 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
/// - Clean shutdown: both write sides are shut down on exit
/// - Error propagation: quota exits return `ProxyError::DataQuotaExceeded`,
/// other I/O failures are returned as `ProxyError::Io`
#[allow(dead_code)]
pub async fn relay_bidirectional<CR, CW, SR, SW>(
client_reader: CR,
client_writer: CW,
@ -471,6 +473,42 @@ where
SR: AsyncRead + Unpin + Send + 'static,
SW: AsyncWrite + Unpin + Send + 'static,
{
relay_bidirectional_with_activity_timeout(
client_reader,
client_writer,
server_reader,
server_writer,
c2s_buf_size,
s2c_buf_size,
user,
stats,
quota_limit,
_buffer_pool,
ACTIVITY_TIMEOUT,
)
.await
}
pub async fn relay_bidirectional_with_activity_timeout<CR, CW, SR, SW>(
client_reader: CR,
client_writer: CW,
server_reader: SR,
server_writer: SW,
c2s_buf_size: usize,
s2c_buf_size: usize,
user: &str,
stats: Arc<Stats>,
quota_limit: Option<u64>,
_buffer_pool: Arc<BufferPool>,
activity_timeout: Duration,
) -> Result<()>
where
CR: AsyncRead + Unpin + Send + 'static,
CW: AsyncWrite + Unpin + Send + 'static,
SR: AsyncRead + Unpin + Send + 'static,
SW: AsyncWrite + Unpin + Send + 'static,
{
let activity_timeout = activity_timeout.max(Duration::from_secs(1));
let epoch = Instant::now();
let counters = Arc::new(SharedCounters::new());
let quota_exceeded = Arc::new(AtomicBool::new(false));
@ -512,7 +550,7 @@ where
}
// ── Activity timeout ────────────────────────────────────
if idle >= ACTIVITY_TIMEOUT {
if idle >= activity_timeout {
let c2s = wd_counters.c2s_bytes.load(Ordering::Relaxed);
let s2c = wd_counters.s2c_bytes.load(Ordering::Relaxed);
warn!(

View File

@ -1,15 +1,40 @@
use std::collections::HashSet;
use std::collections::hash_map::RandomState;
use std::net::IpAddr;
use std::sync::atomic::AtomicU64;
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use dashmap::DashMap;
use tokio::sync::mpsc;
use crate::proxy::handshake::{AuthProbeState, AuthProbeSaturationState};
use crate::proxy::middle_relay::{DesyncDedupRotationState, RelayIdleCandidateRegistry};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ConntrackCloseReason {
NormalEof,
Timeout,
Pressure,
Reset,
Other,
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct ConntrackCloseEvent {
pub(crate) src: SocketAddr,
pub(crate) dst: SocketAddr,
pub(crate) reason: ConntrackCloseReason,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ConntrackClosePublishResult {
Sent,
Disabled,
QueueFull,
QueueClosed,
}
pub(crate) struct HandshakeSharedState {
pub(crate) auth_probe: DashMap<IpAddr, AuthProbeState>,
pub(crate) auth_probe_saturation: Mutex<Option<AuthProbeSaturationState>>,
@ -31,6 +56,8 @@ pub(crate) struct MiddleRelaySharedState {
pub(crate) struct ProxySharedState {
pub(crate) handshake: HandshakeSharedState,
pub(crate) middle_relay: MiddleRelaySharedState,
pub(crate) conntrack_pressure_active: AtomicBool,
pub(crate) conntrack_close_tx: Mutex<Option<mpsc::Sender<ConntrackCloseEvent>>>,
}
impl ProxySharedState {
@ -52,6 +79,67 @@ impl ProxySharedState {
relay_idle_registry: Mutex::new(RelayIdleCandidateRegistry::default()),
relay_idle_mark_seq: AtomicU64::new(0),
},
conntrack_pressure_active: AtomicBool::new(false),
conntrack_close_tx: Mutex::new(None),
})
}
pub(crate) fn set_conntrack_close_sender(&self, tx: mpsc::Sender<ConntrackCloseEvent>) {
match self.conntrack_close_tx.lock() {
Ok(mut guard) => {
*guard = Some(tx);
}
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = Some(tx);
self.conntrack_close_tx.clear_poison();
}
}
}
pub(crate) fn disable_conntrack_close_sender(&self) {
match self.conntrack_close_tx.lock() {
Ok(mut guard) => {
*guard = None;
}
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = None;
self.conntrack_close_tx.clear_poison();
}
}
}
pub(crate) fn publish_conntrack_close_event(
&self,
event: ConntrackCloseEvent,
) -> ConntrackClosePublishResult {
let tx = match self.conntrack_close_tx.lock() {
Ok(guard) => guard.clone(),
Err(poisoned) => {
let guard = poisoned.into_inner();
let cloned = guard.clone();
self.conntrack_close_tx.clear_poison();
cloned
}
};
let Some(tx) = tx else {
return ConntrackClosePublishResult::Disabled;
};
match tx.try_send(event) {
Ok(()) => ConntrackClosePublishResult::Sent,
Err(mpsc::error::TrySendError::Full(_)) => ConntrackClosePublishResult::QueueFull,
Err(mpsc::error::TrySendError::Closed(_)) => ConntrackClosePublishResult::QueueClosed,
}
}
pub(crate) fn set_conntrack_pressure_active(&self, active: bool) {
self.conntrack_pressure_active.store(active, Ordering::Relaxed);
}
pub(crate) fn conntrack_pressure_active(&self) -> bool {
self.conntrack_pressure_active.load(Ordering::Relaxed)
}
}

View File

@ -159,8 +159,8 @@ MemoryDenyWriteExecute=true
LockPersonality=true
# Allow binding to privileged ports and writing to specific paths
AmbientCapabilities=CAP_NET_BIND_SERVICE
CapabilityBoundingSet=CAP_NET_BIND_SERVICE
AmbientCapabilities=CAP_NET_BIND_SERVICE CAP_NET_ADMIN
CapabilityBoundingSet=CAP_NET_BIND_SERVICE CAP_NET_ADMIN
ReadWritePaths=/etc/telemt /var/run /var/lib/telemt
[Install]

View File

@ -91,6 +91,17 @@ pub struct Stats {
current_connections_direct: AtomicU64,
current_connections_me: AtomicU64,
handshake_timeouts: AtomicU64,
accept_permit_timeout_total: AtomicU64,
conntrack_control_enabled_gauge: AtomicBool,
conntrack_control_available_gauge: AtomicBool,
conntrack_pressure_active_gauge: AtomicBool,
conntrack_event_queue_depth_gauge: AtomicU64,
conntrack_rule_apply_ok_gauge: AtomicBool,
conntrack_delete_attempt_total: AtomicU64,
conntrack_delete_success_total: AtomicU64,
conntrack_delete_not_found_total: AtomicU64,
conntrack_delete_error_total: AtomicU64,
conntrack_close_event_drop_total: AtomicU64,
upstream_connect_attempt_total: AtomicU64,
upstream_connect_success_total: AtomicU64,
upstream_connect_fail_total: AtomicU64,
@ -528,6 +539,74 @@ impl Stats {
self.handshake_timeouts.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_accept_permit_timeout_total(&self) {
if self.telemetry_core_enabled() {
self.accept_permit_timeout_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn set_conntrack_control_enabled(&self, enabled: bool) {
self.conntrack_control_enabled_gauge
.store(enabled, Ordering::Relaxed);
}
pub fn set_conntrack_control_available(&self, available: bool) {
self.conntrack_control_available_gauge
.store(available, Ordering::Relaxed);
}
pub fn set_conntrack_pressure_active(&self, active: bool) {
self.conntrack_pressure_active_gauge
.store(active, Ordering::Relaxed);
}
pub fn set_conntrack_event_queue_depth(&self, depth: u64) {
self.conntrack_event_queue_depth_gauge
.store(depth, Ordering::Relaxed);
}
pub fn set_conntrack_rule_apply_ok(&self, ok: bool) {
self.conntrack_rule_apply_ok_gauge
.store(ok, Ordering::Relaxed);
}
pub fn increment_conntrack_delete_attempt_total(&self) {
if self.telemetry_core_enabled() {
self.conntrack_delete_attempt_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_conntrack_delete_success_total(&self) {
if self.telemetry_core_enabled() {
self.conntrack_delete_success_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_conntrack_delete_not_found_total(&self) {
if self.telemetry_core_enabled() {
self.conntrack_delete_not_found_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_conntrack_delete_error_total(&self) {
if self.telemetry_core_enabled() {
self.conntrack_delete_error_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_conntrack_close_event_drop_total(&self) {
if self.telemetry_core_enabled() {
self.conntrack_close_event_drop_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_upstream_connect_attempt_total(&self) {
if self.telemetry_core_enabled() {
self.upstream_connect_attempt_total
@ -1477,6 +1556,9 @@ impl Stats {
pub fn get_connects_bad(&self) -> u64 {
self.connects_bad.load(Ordering::Relaxed)
}
pub fn get_accept_permit_timeout_total(&self) -> u64 {
self.accept_permit_timeout_total.load(Ordering::Relaxed)
}
pub fn get_current_connections_direct(&self) -> u64 {
self.current_connections_direct.load(Ordering::Relaxed)
}
@ -1487,6 +1569,38 @@ impl Stats {
self.get_current_connections_direct()
.saturating_add(self.get_current_connections_me())
}
pub fn get_conntrack_control_enabled(&self) -> bool {
self.conntrack_control_enabled_gauge.load(Ordering::Relaxed)
}
pub fn get_conntrack_control_available(&self) -> bool {
self.conntrack_control_available_gauge
.load(Ordering::Relaxed)
}
pub fn get_conntrack_pressure_active(&self) -> bool {
self.conntrack_pressure_active_gauge.load(Ordering::Relaxed)
}
pub fn get_conntrack_event_queue_depth(&self) -> u64 {
self.conntrack_event_queue_depth_gauge
.load(Ordering::Relaxed)
}
pub fn get_conntrack_rule_apply_ok(&self) -> bool {
self.conntrack_rule_apply_ok_gauge.load(Ordering::Relaxed)
}
pub fn get_conntrack_delete_attempt_total(&self) -> u64 {
self.conntrack_delete_attempt_total.load(Ordering::Relaxed)
}
pub fn get_conntrack_delete_success_total(&self) -> u64 {
self.conntrack_delete_success_total.load(Ordering::Relaxed)
}
pub fn get_conntrack_delete_not_found_total(&self) -> u64 {
self.conntrack_delete_not_found_total.load(Ordering::Relaxed)
}
pub fn get_conntrack_delete_error_total(&self) -> u64 {
self.conntrack_delete_error_total.load(Ordering::Relaxed)
}
pub fn get_conntrack_close_event_drop_total(&self) -> u64 {
self.conntrack_close_event_drop_total.load(Ordering::Relaxed)
}
pub fn get_me_keepalive_sent(&self) -> u64 {
self.me_keepalive_sent.load(Ordering::Relaxed)
}