Compare commits

...

13 Commits

Author SHA1 Message Date
mammuthus 24eb507e91
Merge 9b64d2ee17 into 977ee53b72 2026-04-05 16:00:34 +03:00
Alexey 977ee53b72
Config Fallback + Working Directory Setup
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-04-05 14:40:17 +03:00
Alexey 8fe6fcb7eb
ME2DC Fast for unstoppable init 2026-04-05 13:10:35 +03:00
Alexey 486e439ae6
Update Cargo.toml + Cargo.lock 2026-04-05 12:19:24 +03:00
Alexey 8e7b27a16d
Deleting Kilocode 2026-04-04 18:10:09 +03:00
Alexey 7f0057acd7
Conntrack Control Method
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-04-04 11:28:32 +03:00
mammuthus 9b64d2ee17
style(metrics): apply rustfmt for build_info additions 2026-04-03 07:49:37 +00:00
mammuthus 873618ce53
metrics: export telemt_build_info version metric 2026-04-02 18:14:50 +00:00
Alexey f6704d7d65
Update README.md 2026-04-02 10:59:19 +03:00
Alexey 3d20002e56
Update README.md 2026-04-02 10:58:50 +03:00
Alexey 8fcd0fa950
Merge pull request #618 from SysAdminKo/main
Переработка документации CONFIG_PARAMS
2026-04-01 17:24:22 +03:00
SysAdminKo 645e968778
Enhance CONFIG_PARAMS documentation with AI-assisted notes and detailed parameter descriptions. Update formatting for clarity and include examples for key configuration options. 2026-04-01 16:04:11 +03:00
Alexey b46216d357
Update README.md 2026-04-01 11:52:13 +03:00
25 changed files with 4925 additions and 481 deletions

View File

@ -1,58 +0,0 @@
# Architect Mode Rules for Telemt
## Architecture Overview
```mermaid
graph TB
subgraph Entry
Client[Clients] --> Listener[TCP/Unix Listener]
end
subgraph Proxy Layer
Listener --> ClientHandler[ClientHandler]
ClientHandler --> Handshake[Handshake Validator]
Handshake --> |Valid| Relay[Relay Layer]
Handshake --> |Invalid| Masking[Masking/TLS Fronting]
end
subgraph Transport
Relay --> MiddleProxy[Middle-End Proxy Pool]
Relay --> DirectRelay[Direct DC Relay]
MiddleProxy --> TelegramDC[Telegram DCs]
DirectRelay --> TelegramDC
end
```
## Module Dependencies
- [`src/main.rs`](src/main.rs) - Entry point, spawns all async tasks
- [`src/config/`](src/config/) - Configuration loading with auto-migration
- [`src/error.rs`](src/error.rs) - Error types, must be used by all modules
- [`src/crypto/`](src/crypto/) - AES, SHA, random number generation
- [`src/protocol/`](src/protocol/) - MTProto constants, frame encoding, obfuscation
- [`src/stream/`](src/stream/) - Stream wrappers, buffer pool, frame codecs
- [`src/proxy/`](src/proxy/) - Client handling, handshake, relay logic
- [`src/transport/`](src/transport/) - Upstream management, middle-proxy, SOCKS support
- [`src/stats/`](src/stats/) - Statistics and replay protection
- [`src/ip_tracker.rs`](src/ip_tracker.rs) - Per-user IP tracking
## Key Architectural Constraints
### Middle-End Proxy Mode
- Requires public IP on interface OR 1:1 NAT with STUN probing
- Uses separate `proxy-secret` from Telegram (NOT user secrets)
- Falls back to direct mode automatically on STUN mismatch
### TLS Fronting
- Invalid handshakes are transparently proxied to `mask_host`
- This is critical for DPI evasion - do not change this behavior
- `mask_unix_sock` and `mask_host` are mutually exclusive
### Stream Architecture
- Buffer pool is shared globally via Arc - prevents allocation storms
- Frame codecs implement tokio-util Encoder/Decoder traits
- State machine in [`src/stream/state.rs`](src/stream/state.rs) manages stream transitions
### Configuration Migration
- [`ProxyConfig::load()`](src/config/mod.rs:641) mutates config in-place
- New fields must have sensible defaults
- DC203 override is auto-injected for CDN/media support

View File

@ -1,23 +0,0 @@
# Code Mode Rules for Telemt
## Error Handling
- Always use [`ProxyError`](src/error.rs:168) from [`src/error.rs`](src/error.rs) for proxy operations
- [`HandshakeResult<T,R,W>`](src/error.rs:292) returns streams on bad client - these MUST be returned for masking, never dropped
- Use [`Recoverable`](src/error.rs:110) trait to check if errors are retryable
## Configuration Changes
- [`ProxyConfig::load()`](src/config/mod.rs:641) auto-mutates config - new fields should have defaults
- DC203 override is auto-injected if missing - do not remove this behavior
- When adding config fields, add migration logic in [`ProxyConfig::load()`](src/config/mod.rs:641)
## Crypto Code
- [`SecureRandom`](src/crypto/random.rs) from [`src/crypto/random.rs`](src/crypto/random.rs) must be used for all crypto operations
- Never use `rand::thread_rng()` directly - use the shared `Arc<SecureRandom>`
## Stream Handling
- Buffer pool [`BufferPool`](src/stream/buffer_pool.rs) is shared via Arc - always use it instead of allocating
- Frame codecs in [`src/stream/frame_codec.rs`](src/stream/frame_codec.rs) implement tokio-util's Encoder/Decoder traits
## Testing
- Tests are inline in modules using `#[cfg(test)]`
- Use `cargo test --lib <module_name>` to run tests for specific modules

View File

@ -1,27 +0,0 @@
# Debug Mode Rules for Telemt
## Logging
- `RUST_LOG` environment variable takes absolute priority over all config log levels
- Log levels: `trace`, `debug`, `info`, `warn`, `error`
- Use `RUST_LOG=debug cargo run` for detailed operational logs
- Use `RUST_LOG=trace cargo run` for full protocol-level debugging
## Middle-End Proxy Debugging
- Set `ME_DIAG=1` environment variable for high-precision cryptography diagnostics
- STUN probe results are logged at startup - check for mismatch between local and reflected IP
- If Middle-End fails, check `proxy_secret_path` points to valid file from https://core.telegram.org/getProxySecret
## Connection Issues
- DC connectivity is logged at startup with RTT measurements
- If DC ping fails, check `dc_overrides` for custom addresses
- Use `prefer_ipv6=false` in config if IPv6 is unreliable
## TLS Fronting Issues
- Invalid handshakes are proxied to `mask_host` - check this host is reachable
- `mask_unix_sock` and `mask_host` are mutually exclusive - only one can be set
- If `mask_unix_sock` is set, socket must exist before connections arrive
## Common Errors
- `ReplayAttack` - client replayed a handshake nonce, potential attack
- `TimeSkew` - client clock is off, can disable with `ignore_time_skew=true`
- `TgHandshakeTimeout` - upstream DC connection failed, check network

2
Cargo.lock generated
View File

@ -2780,7 +2780,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "telemt"
version = "3.3.36"
version = "3.3.37"
dependencies = [
"aes",
"anyhow",

View File

@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.3.36"
version = "3.3.37"
edition = "2024"
[features]

View File

@ -2,7 +2,7 @@
***Löst Probleme, bevor andere überhaupt wissen, dass sie existieren*** / ***It solves problems before others even realize they exist***
[**Telemt Chat in Telegram**](https://t.me/telemtrs)
### [**Telemt Chat in Telegram**](https://t.me/telemtrs)
**Telemt** is a fast, secure, and feature-rich server written in Rust: it fully implements the official Telegram proxy algo and adds many production-ready improvements such as:
- [ME Pool + Reader/Writer + Registry + Refill + Adaptive Floor + Trio-State + Generation Lifecycle](https://github.com/telemt/telemt/blob/main/docs/model/MODEL.en.md)

File diff suppressed because it is too large Load Diff

View File

@ -81,10 +81,21 @@ pub(super) struct ZeroCoreData {
pub(super) connections_total: u64,
pub(super) connections_bad_total: u64,
pub(super) handshake_timeouts_total: u64,
pub(super) accept_permit_timeout_total: u64,
pub(super) configured_users: usize,
pub(super) telemetry_core_enabled: bool,
pub(super) telemetry_user_enabled: bool,
pub(super) telemetry_me_level: String,
pub(super) conntrack_control_enabled: bool,
pub(super) conntrack_control_available: bool,
pub(super) conntrack_pressure_active: bool,
pub(super) conntrack_event_queue_depth: u64,
pub(super) conntrack_rule_apply_ok: bool,
pub(super) conntrack_delete_attempt_total: u64,
pub(super) conntrack_delete_success_total: u64,
pub(super) conntrack_delete_not_found_total: u64,
pub(super) conntrack_delete_error_total: u64,
pub(super) conntrack_close_event_drop_total: u64,
}
#[derive(Serialize, Clone)]

View File

@ -39,10 +39,21 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
connections_total: stats.get_connects_all(),
connections_bad_total: stats.get_connects_bad(),
handshake_timeouts_total: stats.get_handshake_timeouts(),
accept_permit_timeout_total: stats.get_accept_permit_timeout_total(),
configured_users,
telemetry_core_enabled: telemetry.core_enabled,
telemetry_user_enabled: telemetry.user_enabled,
telemetry_me_level: telemetry.me_level.to_string(),
conntrack_control_enabled: stats.get_conntrack_control_enabled(),
conntrack_control_available: stats.get_conntrack_control_available(),
conntrack_pressure_active: stats.get_conntrack_pressure_active(),
conntrack_event_queue_depth: stats.get_conntrack_event_queue_depth(),
conntrack_rule_apply_ok: stats.get_conntrack_rule_apply_ok(),
conntrack_delete_attempt_total: stats.get_conntrack_delete_attempt_total(),
conntrack_delete_success_total: stats.get_conntrack_delete_success_total(),
conntrack_delete_not_found_total: stats.get_conntrack_delete_not_found_total(),
conntrack_delete_error_total: stats.get_conntrack_delete_error_total(),
conntrack_close_event_drop_total: stats.get_conntrack_close_event_drop_total(),
},
upstream: build_zero_upstream_data(stats),
middle_proxy: ZeroMiddleProxyData {

View File

@ -48,6 +48,10 @@ const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 16;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 1000;
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
const DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS: u64 = 250;
const DEFAULT_CONNTRACK_CONTROL_ENABLED: bool = true;
const DEFAULT_CONNTRACK_PRESSURE_HIGH_WATERMARK_PCT: u8 = 85;
const DEFAULT_CONNTRACK_PRESSURE_LOW_WATERMARK_PCT: u8 = 70;
const DEFAULT_CONNTRACK_DELETE_BUDGET_PER_SEC: u64 = 4096;
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
const DEFAULT_UPSTREAM_CONNECT_BUDGET_MS: u64 = 3000;
@ -96,7 +100,7 @@ pub(crate) fn default_fake_cert_len() -> usize {
}
pub(crate) fn default_tls_front_dir() -> String {
"tlsfront".to_string()
"/etc/telemt/tlsfront".to_string()
}
pub(crate) fn default_replay_check_len() -> usize {
@ -221,6 +225,22 @@ pub(crate) fn default_accept_permit_timeout_ms() -> u64 {
DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS
}
pub(crate) fn default_conntrack_control_enabled() -> bool {
DEFAULT_CONNTRACK_CONTROL_ENABLED
}
pub(crate) fn default_conntrack_pressure_high_watermark_pct() -> u8 {
DEFAULT_CONNTRACK_PRESSURE_HIGH_WATERMARK_PCT
}
pub(crate) fn default_conntrack_pressure_low_watermark_pct() -> u8 {
DEFAULT_CONNTRACK_PRESSURE_LOW_WATERMARK_PCT
}
pub(crate) fn default_conntrack_delete_budget_per_sec() -> u64 {
DEFAULT_CONNTRACK_DELETE_BUDGET_PER_SEC
}
pub(crate) fn default_prefer_4() -> u8 {
4
}
@ -282,7 +302,7 @@ pub(crate) fn default_me2dc_fallback() -> bool {
}
pub(crate) fn default_me2dc_fast() -> bool {
false
true
}
pub(crate) fn default_keepalive_interval() -> u64 {
@ -538,7 +558,7 @@ pub(crate) fn default_beobachten_flush_secs() -> u64 {
}
pub(crate) fn default_beobachten_file() -> String {
"cache/beobachten.txt".to_string()
"/etc/telemt/beobachten.txt".to_string()
}
pub(crate) fn default_tls_new_session_tickets() -> u8 {

View File

@ -922,6 +922,39 @@ impl ProxyConfig {
));
}
if config.server.conntrack_control.pressure_high_watermark_pct == 0
|| config.server.conntrack_control.pressure_high_watermark_pct > 100
{
return Err(ProxyError::Config(
"server.conntrack_control.pressure_high_watermark_pct must be within [1, 100]"
.to_string(),
));
}
if config.server.conntrack_control.pressure_low_watermark_pct
>= config.server.conntrack_control.pressure_high_watermark_pct
{
return Err(ProxyError::Config(
"server.conntrack_control.pressure_low_watermark_pct must be < pressure_high_watermark_pct"
.to_string(),
));
}
if config.server.conntrack_control.delete_budget_per_sec == 0 {
return Err(ProxyError::Config(
"server.conntrack_control.delete_budget_per_sec must be > 0".to_string(),
));
}
if matches!(config.server.conntrack_control.mode, ConntrackMode::Hybrid)
&& config.server.conntrack_control.hybrid_listener_ips.is_empty()
{
return Err(ProxyError::Config(
"server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid"
.to_string(),
));
}
if config.general.effective_me_pool_force_close_secs() > 0
&& config.general.effective_me_pool_force_close_secs()
< config.general.me_pool_drain_ttl_secs
@ -1327,6 +1360,31 @@ mod tests {
cfg.server.api.runtime_edge_events_capacity,
default_api_runtime_edge_events_capacity()
);
assert_eq!(
cfg.server.conntrack_control.inline_conntrack_control,
default_conntrack_control_enabled()
);
assert_eq!(cfg.server.conntrack_control.mode, ConntrackMode::default());
assert_eq!(
cfg.server.conntrack_control.backend,
ConntrackBackend::default()
);
assert_eq!(
cfg.server.conntrack_control.profile,
ConntrackPressureProfile::default()
);
assert_eq!(
cfg.server.conntrack_control.pressure_high_watermark_pct,
default_conntrack_pressure_high_watermark_pct()
);
assert_eq!(
cfg.server.conntrack_control.pressure_low_watermark_pct,
default_conntrack_pressure_low_watermark_pct()
);
assert_eq!(
cfg.server.conntrack_control.delete_budget_per_sec,
default_conntrack_delete_budget_per_sec()
);
assert_eq!(cfg.access.users, default_access_users());
assert_eq!(
cfg.access.user_max_tcp_conns_global_each,
@ -1472,6 +1530,31 @@ mod tests {
server.api.runtime_edge_events_capacity,
default_api_runtime_edge_events_capacity()
);
assert_eq!(
server.conntrack_control.inline_conntrack_control,
default_conntrack_control_enabled()
);
assert_eq!(server.conntrack_control.mode, ConntrackMode::default());
assert_eq!(
server.conntrack_control.backend,
ConntrackBackend::default()
);
assert_eq!(
server.conntrack_control.profile,
ConntrackPressureProfile::default()
);
assert_eq!(
server.conntrack_control.pressure_high_watermark_pct,
default_conntrack_pressure_high_watermark_pct()
);
assert_eq!(
server.conntrack_control.pressure_low_watermark_pct,
default_conntrack_pressure_low_watermark_pct()
);
assert_eq!(
server.conntrack_control.delete_budget_per_sec,
default_conntrack_delete_budget_per_sec()
);
let access = AccessConfig::default();
assert_eq!(access.users, default_access_users());
@ -2404,6 +2487,118 @@ mod tests {
let _ = std::fs::remove_file(path);
}
#[test]
fn conntrack_pressure_high_watermark_out_of_range_is_rejected() {
let toml = r#"
[server.conntrack_control]
pressure_high_watermark_pct = 0
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_conntrack_high_watermark_invalid_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(
err.contains("server.conntrack_control.pressure_high_watermark_pct must be within [1, 100]")
);
let _ = std::fs::remove_file(path);
}
#[test]
fn conntrack_pressure_low_watermark_must_be_below_high() {
let toml = r#"
[server.conntrack_control]
pressure_high_watermark_pct = 50
pressure_low_watermark_pct = 50
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_conntrack_low_watermark_invalid_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(
err.contains(
"server.conntrack_control.pressure_low_watermark_pct must be < pressure_high_watermark_pct"
)
);
let _ = std::fs::remove_file(path);
}
#[test]
fn conntrack_delete_budget_zero_is_rejected() {
let toml = r#"
[server.conntrack_control]
delete_budget_per_sec = 0
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_conntrack_delete_budget_invalid_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("server.conntrack_control.delete_budget_per_sec must be > 0"));
let _ = std::fs::remove_file(path);
}
#[test]
fn conntrack_hybrid_mode_requires_listener_allow_list() {
let toml = r#"
[server.conntrack_control]
mode = "hybrid"
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_conntrack_hybrid_requires_ips_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(
err.contains("server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid")
);
let _ = std::fs::remove_file(path);
}
#[test]
fn conntrack_profile_is_loaded_from_config() {
let toml = r#"
[server.conntrack_control]
profile = "aggressive"
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_conntrack_profile_parse_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert_eq!(
cfg.server.conntrack_control.profile,
ConntrackPressureProfile::Aggressive
);
let _ = std::fs::remove_file(path);
}
#[test]
fn force_close_default_matches_drain_ttl() {
let toml = r#"

View File

@ -1216,6 +1216,118 @@ impl Default for ApiConfig {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum ConntrackMode {
#[default]
Tracked,
Notrack,
Hybrid,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum ConntrackBackend {
#[default]
Auto,
Nftables,
Iptables,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum ConntrackPressureProfile {
Conservative,
#[default]
Balanced,
Aggressive,
}
impl ConntrackPressureProfile {
pub fn client_first_byte_idle_cap_secs(self) -> u64 {
match self {
Self::Conservative => 30,
Self::Balanced => 20,
Self::Aggressive => 10,
}
}
pub fn direct_activity_timeout_secs(self) -> u64 {
match self {
Self::Conservative => 180,
Self::Balanced => 120,
Self::Aggressive => 60,
}
}
pub fn middle_soft_idle_cap_secs(self) -> u64 {
match self {
Self::Conservative => 60,
Self::Balanced => 30,
Self::Aggressive => 20,
}
}
pub fn middle_hard_idle_cap_secs(self) -> u64 {
match self {
Self::Conservative => 180,
Self::Balanced => 90,
Self::Aggressive => 60,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConntrackControlConfig {
/// Enables runtime conntrack-control worker for pressure mitigation.
#[serde(default = "default_conntrack_control_enabled")]
pub inline_conntrack_control: bool,
/// Conntrack mode for listener ingress traffic.
#[serde(default)]
pub mode: ConntrackMode,
/// Netfilter backend used to reconcile notrack rules.
#[serde(default)]
pub backend: ConntrackBackend,
/// Pressure profile for timeout caps under resource saturation.
#[serde(default)]
pub profile: ConntrackPressureProfile,
/// Listener IP allow-list for hybrid mode.
/// Ignored in tracked/notrack mode.
#[serde(default)]
pub hybrid_listener_ips: Vec<IpAddr>,
/// Pressure high watermark as percentage.
#[serde(default = "default_conntrack_pressure_high_watermark_pct")]
pub pressure_high_watermark_pct: u8,
/// Pressure low watermark as percentage.
#[serde(default = "default_conntrack_pressure_low_watermark_pct")]
pub pressure_low_watermark_pct: u8,
/// Maximum conntrack delete operations per second.
#[serde(default = "default_conntrack_delete_budget_per_sec")]
pub delete_budget_per_sec: u64,
}
impl Default for ConntrackControlConfig {
fn default() -> Self {
Self {
inline_conntrack_control: default_conntrack_control_enabled(),
mode: ConntrackMode::default(),
backend: ConntrackBackend::default(),
profile: ConntrackPressureProfile::default(),
hybrid_listener_ips: Vec::new(),
pressure_high_watermark_pct: default_conntrack_pressure_high_watermark_pct(),
pressure_low_watermark_pct: default_conntrack_pressure_low_watermark_pct(),
delete_budget_per_sec: default_conntrack_delete_budget_per_sec(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
#[serde(default = "default_port")]
@ -1291,6 +1403,10 @@ pub struct ServerConfig {
/// `0` keeps legacy unbounded wait behavior.
#[serde(default = "default_accept_permit_timeout_ms")]
pub accept_permit_timeout_ms: u64,
/// Runtime conntrack control and pressure policy.
#[serde(default)]
pub conntrack_control: ConntrackControlConfig,
}
impl Default for ServerConfig {
@ -1313,6 +1429,7 @@ impl Default for ServerConfig {
listen_backlog: default_listen_backlog(),
max_connections: default_server_max_connections(),
accept_permit_timeout_ms: default_accept_permit_timeout_ms(),
conntrack_control: ConntrackControlConfig::default(),
}
}
}

704
src/conntrack_control.rs Normal file
View File

@ -0,0 +1,704 @@
use std::collections::BTreeSet;
use std::net::IpAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio::sync::{mpsc, watch};
use tracing::{debug, info, warn};
use crate::config::{ConntrackBackend, ConntrackMode, ProxyConfig};
use crate::proxy::middle_relay::note_global_relay_pressure;
use crate::proxy::shared_state::{ConntrackCloseEvent, ConntrackCloseReason, ProxySharedState};
use crate::stats::Stats;
const CONNTRACK_EVENT_QUEUE_CAPACITY: usize = 32_768;
const PRESSURE_RELEASE_TICKS: u8 = 3;
const PRESSURE_SAMPLE_INTERVAL: Duration = Duration::from_secs(1);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum NetfilterBackend {
Nftables,
Iptables,
}
#[derive(Clone, Copy)]
struct PressureSample {
conn_pct: Option<u8>,
fd_pct: Option<u8>,
accept_timeout_delta: u64,
me_queue_pressure_delta: u64,
}
struct PressureState {
active: bool,
low_streak: u8,
prev_accept_timeout_total: u64,
prev_me_queue_pressure_total: u64,
}
impl PressureState {
fn new(stats: &Stats) -> Self {
Self {
active: false,
low_streak: 0,
prev_accept_timeout_total: stats.get_accept_permit_timeout_total(),
prev_me_queue_pressure_total: stats.get_me_c2me_send_full_total(),
}
}
}
pub(crate) fn spawn_conntrack_controller(
config_rx: watch::Receiver<Arc<ProxyConfig>>,
stats: Arc<Stats>,
shared: Arc<ProxySharedState>,
) {
if !cfg!(target_os = "linux") {
let enabled = config_rx.borrow().server.conntrack_control.inline_conntrack_control;
stats.set_conntrack_control_enabled(enabled);
stats.set_conntrack_control_available(false);
stats.set_conntrack_pressure_active(false);
stats.set_conntrack_event_queue_depth(0);
stats.set_conntrack_rule_apply_ok(false);
shared.disable_conntrack_close_sender();
shared.set_conntrack_pressure_active(false);
if enabled {
warn!("conntrack control is configured but unsupported on this OS; disabling runtime worker");
}
return;
}
let (tx, rx) = mpsc::channel(CONNTRACK_EVENT_QUEUE_CAPACITY);
shared.set_conntrack_close_sender(tx);
tokio::spawn(async move {
run_conntrack_controller(config_rx, stats, shared, rx).await;
});
}
async fn run_conntrack_controller(
mut config_rx: watch::Receiver<Arc<ProxyConfig>>,
stats: Arc<Stats>,
shared: Arc<ProxySharedState>,
mut close_rx: mpsc::Receiver<ConntrackCloseEvent>,
) {
let mut cfg = config_rx.borrow().clone();
let mut pressure_state = PressureState::new(stats.as_ref());
let mut delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec;
let mut backend = pick_backend(cfg.server.conntrack_control.backend);
apply_runtime_state(stats.as_ref(), shared.as_ref(), &cfg, backend.is_some(), false);
reconcile_rules(&cfg, backend, stats.as_ref()).await;
loop {
tokio::select! {
changed = config_rx.changed() => {
if changed.is_err() {
break;
}
cfg = config_rx.borrow_and_update().clone();
backend = pick_backend(cfg.server.conntrack_control.backend);
delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec;
apply_runtime_state(stats.as_ref(), shared.as_ref(), &cfg, backend.is_some(), pressure_state.active);
reconcile_rules(&cfg, backend, stats.as_ref()).await;
}
event = close_rx.recv() => {
let Some(event) = event else {
break;
};
stats.set_conntrack_event_queue_depth(close_rx.len() as u64);
if !cfg.server.conntrack_control.inline_conntrack_control {
continue;
}
if !pressure_state.active {
continue;
}
if !matches!(event.reason, ConntrackCloseReason::Timeout | ConntrackCloseReason::Pressure | ConntrackCloseReason::Reset) {
continue;
}
if delete_budget_tokens == 0 {
continue;
}
stats.increment_conntrack_delete_attempt_total();
match delete_conntrack_entry(event).await {
DeleteOutcome::Deleted => {
delete_budget_tokens = delete_budget_tokens.saturating_sub(1);
stats.increment_conntrack_delete_success_total();
}
DeleteOutcome::NotFound => {
delete_budget_tokens = delete_budget_tokens.saturating_sub(1);
stats.increment_conntrack_delete_not_found_total();
}
DeleteOutcome::Error => {
delete_budget_tokens = delete_budget_tokens.saturating_sub(1);
stats.increment_conntrack_delete_error_total();
}
}
}
_ = tokio::time::sleep(PRESSURE_SAMPLE_INTERVAL) => {
delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec;
stats.set_conntrack_event_queue_depth(close_rx.len() as u64);
let sample = collect_pressure_sample(stats.as_ref(), &cfg, &mut pressure_state);
update_pressure_state(
stats.as_ref(),
shared.as_ref(),
&cfg,
&sample,
&mut pressure_state,
);
if pressure_state.active {
note_global_relay_pressure(shared.as_ref());
}
}
}
}
shared.disable_conntrack_close_sender();
shared.set_conntrack_pressure_active(false);
stats.set_conntrack_pressure_active(false);
}
fn apply_runtime_state(
stats: &Stats,
shared: &ProxySharedState,
cfg: &ProxyConfig,
backend_available: bool,
pressure_active: bool,
) {
let enabled = cfg.server.conntrack_control.inline_conntrack_control;
let available = enabled && backend_available && has_cap_net_admin();
if enabled && !available {
warn!(
"conntrack control enabled but unavailable (missing CAP_NET_ADMIN or backend binaries)"
);
}
stats.set_conntrack_control_enabled(enabled);
stats.set_conntrack_control_available(available);
shared.set_conntrack_pressure_active(enabled && pressure_active);
stats.set_conntrack_pressure_active(enabled && pressure_active);
}
fn collect_pressure_sample(
stats: &Stats,
cfg: &ProxyConfig,
state: &mut PressureState,
) -> PressureSample {
let current_connections = stats.get_current_connections_total();
let conn_pct = if cfg.server.max_connections == 0 {
None
} else {
Some(
((current_connections.saturating_mul(100)) / u64::from(cfg.server.max_connections))
.min(100) as u8,
)
};
let fd_pct = fd_usage_pct();
let accept_total = stats.get_accept_permit_timeout_total();
let accept_delta = accept_total.saturating_sub(state.prev_accept_timeout_total);
state.prev_accept_timeout_total = accept_total;
let me_total = stats.get_me_c2me_send_full_total();
let me_delta = me_total.saturating_sub(state.prev_me_queue_pressure_total);
state.prev_me_queue_pressure_total = me_total;
PressureSample {
conn_pct,
fd_pct,
accept_timeout_delta: accept_delta,
me_queue_pressure_delta: me_delta,
}
}
fn update_pressure_state(
stats: &Stats,
shared: &ProxySharedState,
cfg: &ProxyConfig,
sample: &PressureSample,
state: &mut PressureState,
) {
if !cfg.server.conntrack_control.inline_conntrack_control {
if state.active {
state.active = false;
state.low_streak = 0;
shared.set_conntrack_pressure_active(false);
stats.set_conntrack_pressure_active(false);
info!("Conntrack pressure mode deactivated (feature disabled)");
}
return;
}
let high = cfg.server.conntrack_control.pressure_high_watermark_pct;
let low = cfg.server.conntrack_control.pressure_low_watermark_pct;
let high_hit = sample.conn_pct.is_some_and(|v| v >= high)
|| sample.fd_pct.is_some_and(|v| v >= high)
|| sample.accept_timeout_delta > 0
|| sample.me_queue_pressure_delta > 0;
let low_clear = sample.conn_pct.is_none_or(|v| v <= low)
&& sample.fd_pct.is_none_or(|v| v <= low)
&& sample.accept_timeout_delta == 0
&& sample.me_queue_pressure_delta == 0;
if !state.active && high_hit {
state.active = true;
state.low_streak = 0;
shared.set_conntrack_pressure_active(true);
stats.set_conntrack_pressure_active(true);
info!(
conn_pct = ?sample.conn_pct,
fd_pct = ?sample.fd_pct,
accept_timeout_delta = sample.accept_timeout_delta,
me_queue_pressure_delta = sample.me_queue_pressure_delta,
"Conntrack pressure mode activated"
);
return;
}
if state.active && low_clear {
state.low_streak = state.low_streak.saturating_add(1);
if state.low_streak >= PRESSURE_RELEASE_TICKS {
state.active = false;
state.low_streak = 0;
shared.set_conntrack_pressure_active(false);
stats.set_conntrack_pressure_active(false);
info!("Conntrack pressure mode deactivated");
}
return;
}
state.low_streak = 0;
}
async fn reconcile_rules(cfg: &ProxyConfig, backend: Option<NetfilterBackend>, stats: &Stats) {
if !cfg.server.conntrack_control.inline_conntrack_control {
clear_notrack_rules_all_backends().await;
stats.set_conntrack_rule_apply_ok(true);
return;
}
if !has_cap_net_admin() {
stats.set_conntrack_rule_apply_ok(false);
return;
}
let Some(backend) = backend else {
stats.set_conntrack_rule_apply_ok(false);
return;
};
let apply_result = match backend {
NetfilterBackend::Nftables => apply_nft_rules(cfg).await,
NetfilterBackend::Iptables => apply_iptables_rules(cfg).await,
};
if let Err(error) = apply_result {
warn!(error = %error, "Failed to reconcile conntrack/notrack rules");
stats.set_conntrack_rule_apply_ok(false);
} else {
stats.set_conntrack_rule_apply_ok(true);
}
}
fn pick_backend(configured: ConntrackBackend) -> Option<NetfilterBackend> {
match configured {
ConntrackBackend::Auto => {
if command_exists("nft") {
Some(NetfilterBackend::Nftables)
} else if command_exists("iptables") {
Some(NetfilterBackend::Iptables)
} else {
None
}
}
ConntrackBackend::Nftables => command_exists("nft").then_some(NetfilterBackend::Nftables),
ConntrackBackend::Iptables => command_exists("iptables").then_some(NetfilterBackend::Iptables),
}
}
fn command_exists(binary: &str) -> bool {
let Some(path_var) = std::env::var_os("PATH") else {
return false;
};
std::env::split_paths(&path_var).any(|dir| {
let candidate: PathBuf = dir.join(binary);
candidate.exists() && candidate.is_file()
})
}
fn notrack_targets(cfg: &ProxyConfig) -> (Vec<Option<IpAddr>>, Vec<Option<IpAddr>>) {
let mode = cfg.server.conntrack_control.mode;
let mut v4_targets: BTreeSet<Option<IpAddr>> = BTreeSet::new();
let mut v6_targets: BTreeSet<Option<IpAddr>> = BTreeSet::new();
match mode {
ConntrackMode::Tracked => {}
ConntrackMode::Notrack => {
if cfg.server.listeners.is_empty() {
if let Some(ipv4) = cfg
.server
.listen_addr_ipv4
.as_ref()
.and_then(|s| s.parse::<IpAddr>().ok())
{
if ipv4.is_unspecified() {
v4_targets.insert(None);
} else {
v4_targets.insert(Some(ipv4));
}
}
if let Some(ipv6) = cfg
.server
.listen_addr_ipv6
.as_ref()
.and_then(|s| s.parse::<IpAddr>().ok())
{
if ipv6.is_unspecified() {
v6_targets.insert(None);
} else {
v6_targets.insert(Some(ipv6));
}
}
} else {
for listener in &cfg.server.listeners {
if listener.ip.is_ipv4() {
if listener.ip.is_unspecified() {
v4_targets.insert(None);
} else {
v4_targets.insert(Some(listener.ip));
}
} else if listener.ip.is_unspecified() {
v6_targets.insert(None);
} else {
v6_targets.insert(Some(listener.ip));
}
}
}
}
ConntrackMode::Hybrid => {
for ip in &cfg.server.conntrack_control.hybrid_listener_ips {
if ip.is_ipv4() {
v4_targets.insert(Some(*ip));
} else {
v6_targets.insert(Some(*ip));
}
}
}
}
(
v4_targets.into_iter().collect(),
v6_targets.into_iter().collect(),
)
}
async fn apply_nft_rules(cfg: &ProxyConfig) -> Result<(), String> {
let _ = run_command("nft", &["delete", "table", "inet", "telemt_conntrack"], None).await;
if matches!(cfg.server.conntrack_control.mode, ConntrackMode::Tracked) {
return Ok(());
}
let (v4_targets, v6_targets) = notrack_targets(cfg);
let mut rules = Vec::new();
for ip in v4_targets {
let rule = if let Some(ip) = ip {
format!("tcp dport {} ip daddr {} notrack", cfg.server.port, ip)
} else {
format!("tcp dport {} notrack", cfg.server.port)
};
rules.push(rule);
}
for ip in v6_targets {
let rule = if let Some(ip) = ip {
format!("tcp dport {} ip6 daddr {} notrack", cfg.server.port, ip)
} else {
format!("tcp dport {} notrack", cfg.server.port)
};
rules.push(rule);
}
let rule_blob = if rules.is_empty() {
String::new()
} else {
format!(" {}\n", rules.join("\n "))
};
let script = format!(
"table inet telemt_conntrack {{\n chain preraw {{\n type filter hook prerouting priority raw; policy accept;\n{rule_blob} }}\n}}\n"
);
run_command("nft", &["-f", "-"], Some(script)).await
}
async fn apply_iptables_rules(cfg: &ProxyConfig) -> Result<(), String> {
apply_iptables_rules_for_binary("iptables", cfg, true).await?;
apply_iptables_rules_for_binary("ip6tables", cfg, false).await?;
Ok(())
}
async fn apply_iptables_rules_for_binary(
binary: &str,
cfg: &ProxyConfig,
ipv4: bool,
) -> Result<(), String> {
if !command_exists(binary) {
return Ok(());
}
let chain = "TELEMT_NOTRACK";
let _ = run_command(binary, &["-t", "raw", "-D", "PREROUTING", "-j", chain], None).await;
let _ = run_command(binary, &["-t", "raw", "-F", chain], None).await;
let _ = run_command(binary, &["-t", "raw", "-X", chain], None).await;
if matches!(cfg.server.conntrack_control.mode, ConntrackMode::Tracked) {
return Ok(());
}
run_command(binary, &["-t", "raw", "-N", chain], None).await?;
run_command(binary, &["-t", "raw", "-F", chain], None).await?;
if run_command(binary, &["-t", "raw", "-C", "PREROUTING", "-j", chain], None).await.is_err() {
run_command(binary, &["-t", "raw", "-I", "PREROUTING", "1", "-j", chain], None).await?;
}
let (v4_targets, v6_targets) = notrack_targets(cfg);
let selected = if ipv4 { v4_targets } else { v6_targets };
for ip in selected {
let mut args = vec![
"-t".to_string(),
"raw".to_string(),
"-A".to_string(),
chain.to_string(),
"-p".to_string(),
"tcp".to_string(),
"--dport".to_string(),
cfg.server.port.to_string(),
];
if let Some(ip) = ip {
args.push("-d".to_string());
args.push(ip.to_string());
}
args.push("-j".to_string());
args.push("CT".to_string());
args.push("--notrack".to_string());
let arg_refs: Vec<&str> = args.iter().map(String::as_str).collect();
run_command(binary, &arg_refs, None).await?;
}
Ok(())
}
async fn clear_notrack_rules_all_backends() {
let _ = run_command("nft", &["delete", "table", "inet", "telemt_conntrack"], None).await;
let _ = run_command("iptables", &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], None).await;
let _ = run_command("iptables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await;
let _ = run_command("iptables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await;
let _ = run_command("ip6tables", &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], None).await;
let _ = run_command("ip6tables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await;
let _ = run_command("ip6tables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await;
}
enum DeleteOutcome {
Deleted,
NotFound,
Error,
}
async fn delete_conntrack_entry(event: ConntrackCloseEvent) -> DeleteOutcome {
if !command_exists("conntrack") {
return DeleteOutcome::Error;
}
let args = vec![
"-D".to_string(),
"-p".to_string(),
"tcp".to_string(),
"-s".to_string(),
event.src.ip().to_string(),
"--sport".to_string(),
event.src.port().to_string(),
"-d".to_string(),
event.dst.ip().to_string(),
"--dport".to_string(),
event.dst.port().to_string(),
];
let arg_refs: Vec<&str> = args.iter().map(String::as_str).collect();
match run_command("conntrack", &arg_refs, None).await {
Ok(()) => DeleteOutcome::Deleted,
Err(error) => {
if error.contains("0 flow entries have been deleted") {
DeleteOutcome::NotFound
} else {
debug!(error = %error, "conntrack delete failed");
DeleteOutcome::Error
}
}
}
}
async fn run_command(binary: &str, args: &[&str], stdin: Option<String>) -> Result<(), String> {
if !command_exists(binary) {
return Err(format!("{binary} is not available"));
}
let mut command = Command::new(binary);
command.args(args);
if stdin.is_some() {
command.stdin(std::process::Stdio::piped());
}
command.stdout(std::process::Stdio::null());
command.stderr(std::process::Stdio::piped());
let mut child = command
.spawn()
.map_err(|e| format!("spawn {binary} failed: {e}"))?;
if let Some(blob) = stdin
&& let Some(mut writer) = child.stdin.take()
{
writer
.write_all(blob.as_bytes())
.await
.map_err(|e| format!("stdin write {binary} failed: {e}"))?;
}
let output = child
.wait_with_output()
.await
.map_err(|e| format!("wait {binary} failed: {e}"))?;
if output.status.success() {
return Ok(());
}
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
Err(if stderr.is_empty() {
format!("{binary} exited with status {}", output.status)
} else {
stderr
})
}
fn fd_usage_pct() -> Option<u8> {
let soft_limit = nofile_soft_limit()?;
if soft_limit == 0 {
return None;
}
let fd_count = std::fs::read_dir("/proc/self/fd").ok()?.count() as u64;
Some(((fd_count.saturating_mul(100)) / soft_limit).min(100) as u8)
}
fn nofile_soft_limit() -> Option<u64> {
#[cfg(target_os = "linux")]
{
let mut lim = libc::rlimit {
rlim_cur: 0,
rlim_max: 0,
};
let rc = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut lim) };
if rc != 0 {
return None;
}
return Some(lim.rlim_cur);
}
#[cfg(not(target_os = "linux"))]
{
None
}
}
fn has_cap_net_admin() -> bool {
#[cfg(target_os = "linux")]
{
let Ok(status) = std::fs::read_to_string("/proc/self/status") else {
return false;
};
for line in status.lines() {
if let Some(raw) = line.strip_prefix("CapEff:") {
let caps = raw.trim();
if let Ok(bits) = u64::from_str_radix(caps, 16) {
const CAP_NET_ADMIN_BIT: u64 = 12;
return (bits & (1u64 << CAP_NET_ADMIN_BIT)) != 0;
}
}
}
false
}
#[cfg(not(target_os = "linux"))]
{
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ProxyConfig;
#[test]
fn pressure_activates_on_accept_timeout_spike() {
let stats = Stats::new();
let shared = ProxySharedState::new();
let mut cfg = ProxyConfig::default();
cfg.server.conntrack_control.inline_conntrack_control = true;
let mut state = PressureState::new(&stats);
let sample = PressureSample {
conn_pct: Some(10),
fd_pct: Some(10),
accept_timeout_delta: 1,
me_queue_pressure_delta: 0,
};
update_pressure_state(&stats, shared.as_ref(), &cfg, &sample, &mut state);
assert!(state.active);
assert!(shared.conntrack_pressure_active());
assert!(stats.get_conntrack_pressure_active());
}
#[test]
fn pressure_releases_after_hysteresis_window() {
let stats = Stats::new();
let shared = ProxySharedState::new();
let mut cfg = ProxyConfig::default();
cfg.server.conntrack_control.inline_conntrack_control = true;
let mut state = PressureState::new(&stats);
let high_sample = PressureSample {
conn_pct: Some(95),
fd_pct: Some(95),
accept_timeout_delta: 0,
me_queue_pressure_delta: 0,
};
update_pressure_state(&stats, shared.as_ref(), &cfg, &high_sample, &mut state);
assert!(state.active);
let low_sample = PressureSample {
conn_pct: Some(10),
fd_pct: Some(10),
accept_timeout_delta: 0,
me_queue_pressure_delta: 0,
};
update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state);
assert!(state.active);
update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state);
assert!(state.active);
update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state);
assert!(!state.active);
assert!(!shared.conntrack_pressure_active());
assert!(!stats.get_conntrack_pressure_active());
}
#[test]
fn pressure_does_not_activate_when_disabled() {
let stats = Stats::new();
let shared = ProxySharedState::new();
let mut cfg = ProxyConfig::default();
cfg.server.conntrack_control.inline_conntrack_control = false;
let mut state = PressureState::new(&stats);
let sample = PressureSample {
conn_pct: Some(100),
fd_pct: Some(100),
accept_timeout_delta: 10,
me_queue_pressure_delta: 10,
};
update_pressure_state(&stats, shared.as_ref(), &cfg, &sample, &mut state);
assert!(!state.active);
assert!(!shared.conntrack_pressure_active());
assert!(!stats.get_conntrack_pressure_active());
}
}

View File

@ -18,19 +18,38 @@ use crate::transport::middle_proxy::{
pub(crate) fn resolve_runtime_config_path(
config_path_cli: &str,
startup_cwd: &std::path::Path,
config_path_explicit: bool,
) -> PathBuf {
let raw = PathBuf::from(config_path_cli);
let absolute = if raw.is_absolute() {
raw
} else {
startup_cwd.join(raw)
};
absolute.canonicalize().unwrap_or(absolute)
if config_path_explicit {
let raw = PathBuf::from(config_path_cli);
let absolute = if raw.is_absolute() {
raw
} else {
startup_cwd.join(raw)
};
return absolute.canonicalize().unwrap_or(absolute);
}
let etc_telemt = std::path::Path::new("/etc/telemt");
let candidates = [
startup_cwd.join("config.toml"),
startup_cwd.join("telemt.toml"),
etc_telemt.join("telemt.toml"),
etc_telemt.join("config.toml"),
];
for candidate in candidates {
if candidate.is_file() {
return candidate.canonicalize().unwrap_or(candidate);
}
}
startup_cwd.join("config.toml")
}
/// Parsed CLI arguments.
pub(crate) struct CliArgs {
pub config_path: String,
pub config_path_explicit: bool,
pub data_path: Option<PathBuf>,
pub silent: bool,
pub log_level: Option<String>,
@ -39,6 +58,7 @@ pub(crate) struct CliArgs {
pub(crate) fn parse_cli() -> CliArgs {
let mut config_path = "config.toml".to_string();
let mut config_path_explicit = false;
let mut data_path: Option<PathBuf> = None;
let mut silent = false;
let mut log_level: Option<String> = None;
@ -74,6 +94,20 @@ pub(crate) fn parse_cli() -> CliArgs {
s.trim_start_matches("--data-path=").to_string(),
));
}
"--working-dir" => {
i += 1;
if i < args.len() {
data_path = Some(PathBuf::from(args[i].clone()));
} else {
eprintln!("Missing value for --working-dir");
std::process::exit(0);
}
}
s if s.starts_with("--working-dir=") => {
data_path = Some(PathBuf::from(
s.trim_start_matches("--working-dir=").to_string(),
));
}
"--silent" | "-s" => {
silent = true;
}
@ -111,13 +145,11 @@ pub(crate) fn parse_cli() -> CliArgs {
i += 1;
}
}
s if s.starts_with("--working-dir") => {
if !s.contains('=') {
i += 1;
}
}
s if !s.starts_with('-') => {
config_path = s.to_string();
if !matches!(s, "run" | "start" | "stop" | "reload" | "status") {
config_path = s.to_string();
config_path_explicit = true;
}
}
other => {
eprintln!("Unknown option: {}", other);
@ -128,6 +160,7 @@ pub(crate) fn parse_cli() -> CliArgs {
CliArgs {
config_path,
config_path_explicit,
data_path,
silent,
log_level,
@ -152,6 +185,7 @@ fn print_help() {
eprintln!(
" --data-path <DIR> Set data directory (absolute path; overrides config value)"
);
eprintln!(" --working-dir <DIR> Alias for --data-path");
eprintln!(" --silent, -s Suppress info logs");
eprintln!(" --log-level <LEVEL> debug|verbose|normal|silent");
eprintln!(" --help, -h Show this help");
@ -210,7 +244,7 @@ mod tests {
let target = startup_cwd.join("config.toml");
std::fs::write(&target, " ").unwrap();
let resolved = resolve_runtime_config_path("config.toml", &startup_cwd);
let resolved = resolve_runtime_config_path("config.toml", &startup_cwd, true);
assert_eq!(resolved, target.canonicalize().unwrap());
let _ = std::fs::remove_file(&target);
@ -226,11 +260,44 @@ mod tests {
let startup_cwd = std::env::temp_dir().join(format!("telemt_cfg_path_missing_{nonce}"));
std::fs::create_dir_all(&startup_cwd).unwrap();
let resolved = resolve_runtime_config_path("missing.toml", &startup_cwd);
let resolved = resolve_runtime_config_path("missing.toml", &startup_cwd, true);
assert_eq!(resolved, startup_cwd.join("missing.toml"));
let _ = std::fs::remove_dir(&startup_cwd);
}
#[test]
fn resolve_runtime_config_path_uses_startup_candidates_when_not_explicit() {
let nonce = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let startup_cwd = std::env::temp_dir().join(format!("telemt_cfg_startup_candidates_{nonce}"));
std::fs::create_dir_all(&startup_cwd).unwrap();
let telemt = startup_cwd.join("telemt.toml");
std::fs::write(&telemt, " ").unwrap();
let resolved = resolve_runtime_config_path("config.toml", &startup_cwd, false);
assert_eq!(resolved, telemt.canonicalize().unwrap());
let _ = std::fs::remove_file(&telemt);
let _ = std::fs::remove_dir(&startup_cwd);
}
#[test]
fn resolve_runtime_config_path_defaults_to_startup_config_when_none_found() {
let nonce = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let startup_cwd = std::env::temp_dir().join(format!("telemt_cfg_startup_default_{nonce}"));
std::fs::create_dir_all(&startup_cwd).unwrap();
let resolved = resolve_runtime_config_path("config.toml", &startup_cwd, false);
assert_eq!(resolved, startup_cwd.join("config.toml"));
let _ = std::fs::remove_dir(&startup_cwd);
}
}
pub(crate) fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {

View File

@ -262,6 +262,7 @@ pub(crate) async fn bind_listeners(
break;
}
Err(_) => {
stats.increment_accept_permit_timeout_total();
debug!(
timeout_ms = accept_permit_timeout_ms,
"Dropping accepted unix connection: permit wait timeout"
@ -407,6 +408,7 @@ pub(crate) fn spawn_tcp_accept_loops(
break;
}
Err(_) => {
stats.increment_accept_permit_timeout_total();
debug!(
peer = %peer_addr,
timeout_ms = accept_permit_timeout_ms,

View File

@ -28,6 +28,7 @@ use tracing::{error, info, warn};
use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload};
use crate::api;
use crate::conntrack_control;
use crate::config::{LogLevel, ProxyConfig};
use crate::crypto::SecureRandom;
use crate::ip_tracker::UserIpTracker;
@ -111,6 +112,7 @@ async fn run_inner(
.await;
let cli_args = parse_cli();
let config_path_cli = cli_args.config_path;
let config_path_explicit = cli_args.config_path_explicit;
let data_path = cli_args.data_path;
let cli_silent = cli_args.silent;
let cli_log_level = cli_args.log_level;
@ -122,7 +124,8 @@ async fn run_inner(
std::process::exit(1);
}
};
let config_path = resolve_runtime_config_path(&config_path_cli, &startup_cwd);
let mut config_path =
resolve_runtime_config_path(&config_path_cli, &startup_cwd, config_path_explicit);
let mut config = match ProxyConfig::load(&config_path) {
Ok(c) => c,
@ -132,11 +135,100 @@ async fn run_inner(
std::process::exit(1);
} else {
let default = ProxyConfig::default();
std::fs::write(&config_path, toml::to_string_pretty(&default).unwrap()).unwrap();
eprintln!(
"[telemt] Created default config at {}",
config_path.display()
);
let serialized = match toml::to_string_pretty(&default)
.or_else(|_| toml::to_string(&default))
{
Ok(value) => Some(value),
Err(serialize_error) => {
eprintln!(
"[telemt] Warning: failed to serialize default config: {}",
serialize_error
);
None
}
};
if config_path_explicit {
if let Some(serialized) = serialized.as_ref() {
if let Err(write_error) = std::fs::write(&config_path, serialized) {
eprintln!(
"[telemt] Error: failed to create explicit config at {}: {}",
config_path.display(),
write_error
);
std::process::exit(1);
}
eprintln!(
"[telemt] Created default config at {}",
config_path.display()
);
} else {
eprintln!(
"[telemt] Warning: running with in-memory default config without writing to disk"
);
}
} else {
let system_dir = std::path::Path::new("/etc/telemt");
let system_config_path = system_dir.join("telemt.toml");
let startup_config_path = startup_cwd.join("config.toml");
let mut persisted = false;
if let Some(serialized) = serialized.as_ref() {
match std::fs::create_dir_all(system_dir) {
Ok(()) => match std::fs::write(&system_config_path, serialized) {
Ok(()) => {
config_path = system_config_path;
eprintln!(
"[telemt] Created default config at {}",
config_path.display()
);
persisted = true;
}
Err(write_error) => {
eprintln!(
"[telemt] Warning: failed to write default config at {}: {}",
system_config_path.display(),
write_error
);
}
},
Err(create_error) => {
eprintln!(
"[telemt] Warning: failed to create {}: {}",
system_dir.display(),
create_error
);
}
}
if !persisted {
match std::fs::write(&startup_config_path, serialized) {
Ok(()) => {
config_path = startup_config_path;
eprintln!(
"[telemt] Created default config at {}",
config_path.display()
);
persisted = true;
}
Err(write_error) => {
eprintln!(
"[telemt] Warning: failed to write default config at {}: {}",
startup_config_path.display(),
write_error
);
}
}
}
}
if !persisted {
eprintln!(
"[telemt] Warning: running with in-memory default config without writing to disk"
);
}
}
default
}
}
@ -633,6 +725,11 @@ async fn run_inner(
.await;
let _admission_tx_hold = admission_tx;
let shared_state = ProxySharedState::new();
conntrack_control::spawn_conntrack_controller(
config_rx.clone(),
stats.clone(),
shared_state.clone(),
);
let bound = listeners::bind_listeners(
&config,

View File

@ -2,6 +2,7 @@
mod api;
mod cli;
mod conntrack_control;
mod config;
mod crypto;
#[cfg(unix)]

View File

@ -234,6 +234,17 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
let me_allows_normal = telemetry.me_level.allows_normal();
let me_allows_debug = telemetry.me_level.allows_debug();
let _ = writeln!(
out,
"# HELP telemt_build_info Build information for the running telemt binary"
);
let _ = writeln!(out, "# TYPE telemt_build_info gauge");
let _ = writeln!(
out,
"telemt_build_info{{version=\"{}\"}} 1",
env!("CARGO_PKG_VERSION")
);
let _ = writeln!(out, "# HELP telemt_uptime_seconds Proxy uptime");
let _ = writeln!(out, "# TYPE telemt_uptime_seconds gauge");
let _ = writeln!(out, "telemt_uptime_seconds {:.1}", stats.uptime_secs());
@ -359,6 +370,134 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
}
);
let _ = writeln!(
out,
"# HELP telemt_accept_permit_timeout_total Accepted connections dropped due to permit wait timeout"
);
let _ = writeln!(out, "# TYPE telemt_accept_permit_timeout_total counter");
let _ = writeln!(
out,
"telemt_accept_permit_timeout_total {}",
if core_enabled {
stats.get_accept_permit_timeout_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_conntrack_control_state Runtime conntrack control state flags"
);
let _ = writeln!(out, "# TYPE telemt_conntrack_control_state gauge");
let _ = writeln!(
out,
"telemt_conntrack_control_state{{flag=\"enabled\"}} {}",
if stats.get_conntrack_control_enabled() {
1
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_control_state{{flag=\"available\"}} {}",
if stats.get_conntrack_control_available() {
1
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_control_state{{flag=\"pressure_active\"}} {}",
if stats.get_conntrack_pressure_active() {
1
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_control_state{{flag=\"rule_apply_ok\"}} {}",
if stats.get_conntrack_rule_apply_ok() {
1
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_conntrack_event_queue_depth Pending close events in conntrack control queue"
);
let _ = writeln!(out, "# TYPE telemt_conntrack_event_queue_depth gauge");
let _ = writeln!(
out,
"telemt_conntrack_event_queue_depth {}",
stats.get_conntrack_event_queue_depth()
);
let _ = writeln!(
out,
"# HELP telemt_conntrack_delete_total Conntrack delete attempts by outcome"
);
let _ = writeln!(out, "# TYPE telemt_conntrack_delete_total counter");
let _ = writeln!(
out,
"telemt_conntrack_delete_total{{result=\"attempt\"}} {}",
if core_enabled {
stats.get_conntrack_delete_attempt_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_delete_total{{result=\"success\"}} {}",
if core_enabled {
stats.get_conntrack_delete_success_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_delete_total{{result=\"not_found\"}} {}",
if core_enabled {
stats.get_conntrack_delete_not_found_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_delete_total{{result=\"error\"}} {}",
if core_enabled {
stats.get_conntrack_delete_error_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_conntrack_close_event_drop_total Dropped conntrack close events due to queue pressure or unavailable sender"
);
let _ = writeln!(
out,
"# TYPE telemt_conntrack_close_event_drop_total counter"
);
let _ = writeln!(
out,
"telemt_conntrack_close_event_drop_total {}",
if core_enabled {
stats.get_conntrack_close_event_drop_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_upstream_connect_attempt_total Upstream connect attempts across all requests"
@ -2775,6 +2914,10 @@ mod tests {
let output = render_metrics(&stats, &config, &tracker).await;
assert!(output.contains(&format!(
"telemt_build_info{{version=\"{}\"}} 1",
env!("CARGO_PKG_VERSION")
)));
assert!(output.contains("telemt_connections_total 2"));
assert!(output.contains("telemt_connections_bad_total 1"));
assert!(output.contains("telemt_handshake_timeouts_total 1"));
@ -2867,6 +3010,7 @@ mod tests {
let tracker = UserIpTracker::new();
let config = ProxyConfig::default();
let output = render_metrics(&stats, &config, &tracker).await;
assert!(output.contains("# TYPE telemt_build_info gauge"));
assert!(output.contains("# TYPE telemt_uptime_seconds gauge"));
assert!(output.contains("# TYPE telemt_connections_total counter"));
assert!(output.contains("# TYPE telemt_connections_bad_total counter"));
@ -2924,6 +3068,14 @@ mod tests {
.unwrap()
.contains("telemt_connections_total 3")
);
assert!(
std::str::from_utf8(body.as_ref())
.unwrap()
.contains(&format!(
"telemt_build_info{{version=\"{}\"}} 1",
env!("CARGO_PKG_VERSION")
))
);
config.general.beobachten = true;
config.general.beobachten_minutes = 10;

View File

@ -80,7 +80,7 @@ use crate::transport::middle_proxy::MePool;
use crate::transport::socket::normalize_ip;
use crate::transport::{UpstreamManager, configure_client_socket, parse_proxy_protocol};
use crate::proxy::direct_relay::handle_via_direct;
use crate::proxy::direct_relay::handle_via_direct_with_shared;
use crate::proxy::handshake::{
HandshakeSuccess, handle_mtproto_handshake_with_shared, handle_tls_handshake_with_shared,
};
@ -191,6 +191,24 @@ fn handshake_timeout_with_mask_grace(config: &ProxyConfig) -> Duration {
}
}
fn effective_client_first_byte_idle_secs(config: &ProxyConfig, shared: &ProxySharedState) -> u64 {
let idle_secs = config.timeouts.client_first_byte_idle_secs;
if idle_secs == 0 {
return 0;
}
if shared.conntrack_pressure_active() {
idle_secs.min(
config
.server
.conntrack_control
.profile
.client_first_byte_idle_cap_secs(),
)
} else {
idle_secs
}
}
const MASK_CLASSIFIER_PREFETCH_WINDOW: usize = 16;
#[cfg(test)]
const MASK_CLASSIFIER_PREFETCH_TIMEOUT: Duration = Duration::from_millis(5);
@ -463,10 +481,11 @@ where
debug!(peer = %real_peer, "New connection (generic stream)");
let first_byte = if config.timeouts.client_first_byte_idle_secs == 0 {
let first_byte_idle_secs = effective_client_first_byte_idle_secs(&config, shared.as_ref());
let first_byte = if first_byte_idle_secs == 0 {
None
} else {
let idle_timeout = Duration::from_secs(config.timeouts.client_first_byte_idle_secs);
let idle_timeout = Duration::from_secs(first_byte_idle_secs);
let mut first_byte = [0u8; 1];
match timeout(idle_timeout, stream.read(&mut first_byte)).await {
Ok(Ok(0)) => {
@ -499,15 +518,15 @@ where
);
return Err(ProxyError::Io(e));
}
Err(_) => {
debug!(
peer = %real_peer,
idle_secs = config.timeouts.client_first_byte_idle_secs,
"Closing idle pooled connection before first client byte"
);
return Ok(());
Err(_) => {
debug!(
peer = %real_peer,
idle_secs = first_byte_idle_secs,
"Closing idle pooled connection before first client byte"
);
return Ok(());
}
}
}
};
let handshake_timeout = handshake_timeout_with_mask_grace(&config);
@ -968,11 +987,12 @@ impl RunningClientHandler {
}
}
let first_byte = if self.config.timeouts.client_first_byte_idle_secs == 0 {
let first_byte_idle_secs =
effective_client_first_byte_idle_secs(&self.config, self.shared.as_ref());
let first_byte = if first_byte_idle_secs == 0 {
None
} else {
let idle_timeout =
Duration::from_secs(self.config.timeouts.client_first_byte_idle_secs);
let idle_timeout = Duration::from_secs(first_byte_idle_secs);
let mut first_byte = [0u8; 1];
match timeout(idle_timeout, self.stream.read(&mut first_byte)).await {
Ok(Ok(0)) => {
@ -1008,7 +1028,7 @@ impl RunningClientHandler {
Err(_) => {
debug!(
peer = %self.peer,
idle_secs = self.config.timeouts.client_first_byte_idle_secs,
idle_secs = first_byte_idle_secs,
"Closing idle pooled connection before first client byte"
);
return Ok(None);
@ -1395,7 +1415,7 @@ impl RunningClientHandler {
local_addr: SocketAddr,
peer_addr: SocketAddr,
ip_tracker: Arc<UserIpTracker>,
_shared: Arc<ProxySharedState>,
shared: Arc<ProxySharedState>,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
@ -1438,12 +1458,12 @@ impl RunningClientHandler {
route_runtime.subscribe(),
route_snapshot,
session_id,
_shared,
shared.clone(),
)
.await
} else {
warn!("use_middle_proxy=true but MePool not initialized, falling back to direct");
handle_via_direct(
handle_via_direct_with_shared(
client_reader,
client_writer,
success,
@ -1455,12 +1475,14 @@ impl RunningClientHandler {
route_runtime.subscribe(),
route_snapshot,
session_id,
local_addr,
shared.clone(),
)
.await
}
} else {
// Direct mode (original behavior)
handle_via_direct(
handle_via_direct_with_shared(
client_reader,
client_writer,
success,
@ -1472,6 +1494,8 @@ impl RunningClientHandler {
route_runtime.subscribe(),
route_snapshot,
session_id,
local_addr,
shared.clone(),
)
.await
};

View File

@ -6,6 +6,7 @@ use std::net::SocketAddr;
use std::path::{Component, Path, PathBuf};
use std::sync::Arc;
use std::sync::{Mutex, OnceLock};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf, split};
use tokio::sync::watch;
@ -16,7 +17,9 @@ use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result};
use crate::protocol::constants::*;
use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce};
use crate::proxy::relay::relay_bidirectional;
use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
};
use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay,
@ -225,7 +228,43 @@ fn unknown_dc_test_lock() -> &'static Mutex<()> {
TEST_LOCK.get_or_init(|| Mutex::new(()))
}
#[allow(dead_code)]
pub(crate) async fn handle_via_direct<R, W>(
client_reader: CryptoReader<R>,
client_writer: CryptoWriter<W>,
success: HandshakeSuccess,
upstream_manager: Arc<UpstreamManager>,
stats: Arc<Stats>,
config: Arc<ProxyConfig>,
buffer_pool: Arc<BufferPool>,
rng: Arc<SecureRandom>,
route_rx: watch::Receiver<RouteCutoverState>,
route_snapshot: RouteCutoverState,
session_id: u64,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
handle_via_direct_with_shared(
client_reader,
client_writer,
success,
upstream_manager,
stats,
config.clone(),
buffer_pool,
rng,
route_rx,
route_snapshot,
session_id,
SocketAddr::from(([0, 0, 0, 0], config.server.port)),
ProxySharedState::new(),
)
.await
}
pub(crate) async fn handle_via_direct_with_shared<R, W>(
client_reader: CryptoReader<R>,
client_writer: CryptoWriter<W>,
success: HandshakeSuccess,
@ -237,6 +276,8 @@ pub(crate) async fn handle_via_direct<R, W>(
mut route_rx: watch::Receiver<RouteCutoverState>,
route_snapshot: RouteCutoverState,
session_id: u64,
local_addr: SocketAddr,
shared: Arc<ProxySharedState>,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
@ -277,7 +318,18 @@ where
let _direct_connection_lease = stats.acquire_direct_connection_lease();
let buffer_pool_trim = Arc::clone(&buffer_pool);
let relay_result = relay_bidirectional(
let relay_activity_timeout = if shared.conntrack_pressure_active() {
Duration::from_secs(
config
.server
.conntrack_control
.profile
.direct_activity_timeout_secs(),
)
} else {
Duration::from_secs(1800)
};
let relay_result = crate::proxy::relay::relay_bidirectional_with_activity_timeout(
client_reader,
client_writer,
tg_reader,
@ -288,6 +340,7 @@ where
Arc::clone(&stats),
config.access.user_data_quota.get(user).copied(),
buffer_pool,
relay_activity_timeout,
);
tokio::pin!(relay_result);
let relay_result = loop {
@ -329,9 +382,52 @@ where
pool_snapshot.allocated,
pool_snapshot.allocated.saturating_sub(pool_snapshot.pooled),
);
let close_reason = classify_conntrack_close_reason(&relay_result);
let publish_result = shared.publish_conntrack_close_event(ConntrackCloseEvent {
src: success.peer,
dst: local_addr,
reason: close_reason,
});
if !matches!(
publish_result,
ConntrackClosePublishResult::Sent | ConntrackClosePublishResult::Disabled
) {
stats.increment_conntrack_close_event_drop_total();
}
relay_result
}
fn classify_conntrack_close_reason(result: &Result<()>) -> ConntrackCloseReason {
match result {
Ok(()) => ConntrackCloseReason::NormalEof,
Err(crate::error::ProxyError::Io(error))
if matches!(error.kind(), std::io::ErrorKind::TimedOut) =>
{
ConntrackCloseReason::Timeout
}
Err(crate::error::ProxyError::Io(error))
if matches!(
error.kind(),
std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::NotConnected
| std::io::ErrorKind::UnexpectedEof
) =>
{
ConntrackCloseReason::Reset
}
Err(crate::error::ProxyError::Proxy(message))
if message.contains("pressure") || message.contains("evicted") =>
{
ConntrackCloseReason::Pressure
}
Err(_) => ConntrackCloseReason::Other,
}
}
fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> {
let prefer_v6 = config.network.prefer == 6 && config.network.ipv6.unwrap_or(true);
let datacenters = if prefer_v6 {

View File

@ -16,12 +16,14 @@ use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::timeout;
use tracing::{debug, info, trace, warn};
use crate::config::ProxyConfig;
use crate::config::{ConntrackPressureProfile, ProxyConfig};
use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result};
use crate::protocol::constants::{secure_padding_len, *};
use crate::proxy::handshake::HandshakeSuccess;
use crate::proxy::shared_state::ProxySharedState;
use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
};
use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay,
@ -135,6 +137,10 @@ fn note_relay_pressure_event_in(shared: &ProxySharedState) {
guard.pressure_event_seq = guard.pressure_event_seq.wrapping_add(1);
}
pub(crate) fn note_global_relay_pressure(shared: &ProxySharedState) {
note_relay_pressure_event_in(shared);
}
fn relay_pressure_event_seq_in(shared: &ProxySharedState) -> u64 {
let guard = relay_idle_candidate_registry_lock_in(shared);
guard.pressure_event_seq
@ -241,6 +247,23 @@ impl RelayClientIdlePolicy {
legacy_frame_read_timeout: frame_read_timeout,
}
}
fn apply_pressure_caps(&mut self, profile: ConntrackPressureProfile) {
let pressure_soft_idle_cap = Duration::from_secs(profile.middle_soft_idle_cap_secs());
let pressure_hard_idle_cap = Duration::from_secs(profile.middle_hard_idle_cap_secs());
self.soft_idle = self.soft_idle.min(pressure_soft_idle_cap);
self.hard_idle = self.hard_idle.min(pressure_hard_idle_cap);
if self.soft_idle > self.hard_idle {
self.soft_idle = self.hard_idle;
}
self.legacy_frame_read_timeout = self
.legacy_frame_read_timeout
.min(pressure_hard_idle_cap);
if self.grace_after_downstream_activity > self.hard_idle {
self.grace_after_downstream_activity = self.hard_idle;
}
}
}
#[derive(Clone, Copy)]
@ -1027,7 +1050,12 @@ where
let translated_local_addr = me_pool.translate_our_addr(local_addr);
let frame_limit = config.general.max_client_frame;
let relay_idle_policy = RelayClientIdlePolicy::from_config(&config);
let mut relay_idle_policy = RelayClientIdlePolicy::from_config(&config);
let mut pressure_caps_applied = false;
if shared.conntrack_pressure_active() {
relay_idle_policy.apply_pressure_caps(config.server.conntrack_control.profile);
pressure_caps_applied = true;
}
let session_started_at = forensics.started_at;
let mut relay_idle_state = RelayClientIdleState::new(session_started_at);
let last_downstream_activity_ms = Arc::new(AtomicU64::new(0));
@ -1421,6 +1449,11 @@ where
let mut route_watch_open = true;
let mut seen_pressure_seq = relay_pressure_event_seq_in(shared.as_ref());
loop {
if shared.conntrack_pressure_active() && !pressure_caps_applied {
relay_idle_policy.apply_pressure_caps(config.server.conntrack_control.profile);
pressure_caps_applied = true;
}
if relay_idle_policy.enabled
&& maybe_evict_idle_candidate_on_pressure_in(
shared.as_ref(),
@ -1600,6 +1633,20 @@ where
frames_ok = frame_counter,
"ME relay cleanup"
);
let close_reason = classify_conntrack_close_reason(&result);
let publish_result = shared.publish_conntrack_close_event(ConntrackCloseEvent {
src: peer,
dst: local_addr,
reason: close_reason,
});
if !matches!(
publish_result,
ConntrackClosePublishResult::Sent | ConntrackClosePublishResult::Disabled
) {
stats.increment_conntrack_close_event_drop_total();
}
clear_relay_idle_candidate_in(shared.as_ref(), conn_id);
me_pool.registry().unregister(conn_id).await;
buffer_pool.trim_to(buffer_pool.max_buffers().min(64));
@ -1612,6 +1659,33 @@ where
result
}
fn classify_conntrack_close_reason(result: &Result<()>) -> ConntrackCloseReason {
match result {
Ok(()) => ConntrackCloseReason::NormalEof,
Err(ProxyError::Io(error)) if matches!(error.kind(), std::io::ErrorKind::TimedOut) => {
ConntrackCloseReason::Timeout
}
Err(ProxyError::Io(error))
if matches!(
error.kind(),
std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::NotConnected
| std::io::ErrorKind::UnexpectedEof
) =>
{
ConntrackCloseReason::Reset
}
Err(ProxyError::Proxy(message))
if message.contains("pressure") || message.contains("evicted") =>
{
ConntrackCloseReason::Pressure
}
Err(_) => ConntrackCloseReason::Other,
}
}
async fn read_client_payload_with_idle_policy_in<R>(
client_reader: &mut CryptoReader<R>,
proto_tag: ProtoTag,

View File

@ -70,6 +70,7 @@ use tracing::{debug, trace, warn};
///
/// iOS keeps Telegram connections alive in background for up to 30 minutes.
/// Closing earlier causes unnecessary reconnects and handshake overhead.
#[allow(dead_code)]
const ACTIVITY_TIMEOUT: Duration = Duration::from_secs(1800);
/// Watchdog check interval — also used for periodic rate logging.
@ -453,6 +454,7 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
/// - Clean shutdown: both write sides are shut down on exit
/// - Error propagation: quota exits return `ProxyError::DataQuotaExceeded`,
/// other I/O failures are returned as `ProxyError::Io`
#[allow(dead_code)]
pub async fn relay_bidirectional<CR, CW, SR, SW>(
client_reader: CR,
client_writer: CW,
@ -471,6 +473,42 @@ where
SR: AsyncRead + Unpin + Send + 'static,
SW: AsyncWrite + Unpin + Send + 'static,
{
relay_bidirectional_with_activity_timeout(
client_reader,
client_writer,
server_reader,
server_writer,
c2s_buf_size,
s2c_buf_size,
user,
stats,
quota_limit,
_buffer_pool,
ACTIVITY_TIMEOUT,
)
.await
}
pub async fn relay_bidirectional_with_activity_timeout<CR, CW, SR, SW>(
client_reader: CR,
client_writer: CW,
server_reader: SR,
server_writer: SW,
c2s_buf_size: usize,
s2c_buf_size: usize,
user: &str,
stats: Arc<Stats>,
quota_limit: Option<u64>,
_buffer_pool: Arc<BufferPool>,
activity_timeout: Duration,
) -> Result<()>
where
CR: AsyncRead + Unpin + Send + 'static,
CW: AsyncWrite + Unpin + Send + 'static,
SR: AsyncRead + Unpin + Send + 'static,
SW: AsyncWrite + Unpin + Send + 'static,
{
let activity_timeout = activity_timeout.max(Duration::from_secs(1));
let epoch = Instant::now();
let counters = Arc::new(SharedCounters::new());
let quota_exceeded = Arc::new(AtomicBool::new(false));
@ -512,7 +550,7 @@ where
}
// ── Activity timeout ────────────────────────────────────
if idle >= ACTIVITY_TIMEOUT {
if idle >= activity_timeout {
let c2s = wd_counters.c2s_bytes.load(Ordering::Relaxed);
let s2c = wd_counters.s2c_bytes.load(Ordering::Relaxed);
warn!(

View File

@ -1,15 +1,40 @@
use std::collections::HashSet;
use std::collections::hash_map::RandomState;
use std::net::IpAddr;
use std::sync::atomic::AtomicU64;
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use dashmap::DashMap;
use tokio::sync::mpsc;
use crate::proxy::handshake::{AuthProbeState, AuthProbeSaturationState};
use crate::proxy::middle_relay::{DesyncDedupRotationState, RelayIdleCandidateRegistry};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ConntrackCloseReason {
NormalEof,
Timeout,
Pressure,
Reset,
Other,
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct ConntrackCloseEvent {
pub(crate) src: SocketAddr,
pub(crate) dst: SocketAddr,
pub(crate) reason: ConntrackCloseReason,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ConntrackClosePublishResult {
Sent,
Disabled,
QueueFull,
QueueClosed,
}
pub(crate) struct HandshakeSharedState {
pub(crate) auth_probe: DashMap<IpAddr, AuthProbeState>,
pub(crate) auth_probe_saturation: Mutex<Option<AuthProbeSaturationState>>,
@ -31,6 +56,8 @@ pub(crate) struct MiddleRelaySharedState {
pub(crate) struct ProxySharedState {
pub(crate) handshake: HandshakeSharedState,
pub(crate) middle_relay: MiddleRelaySharedState,
pub(crate) conntrack_pressure_active: AtomicBool,
pub(crate) conntrack_close_tx: Mutex<Option<mpsc::Sender<ConntrackCloseEvent>>>,
}
impl ProxySharedState {
@ -52,6 +79,67 @@ impl ProxySharedState {
relay_idle_registry: Mutex::new(RelayIdleCandidateRegistry::default()),
relay_idle_mark_seq: AtomicU64::new(0),
},
conntrack_pressure_active: AtomicBool::new(false),
conntrack_close_tx: Mutex::new(None),
})
}
pub(crate) fn set_conntrack_close_sender(&self, tx: mpsc::Sender<ConntrackCloseEvent>) {
match self.conntrack_close_tx.lock() {
Ok(mut guard) => {
*guard = Some(tx);
}
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = Some(tx);
self.conntrack_close_tx.clear_poison();
}
}
}
pub(crate) fn disable_conntrack_close_sender(&self) {
match self.conntrack_close_tx.lock() {
Ok(mut guard) => {
*guard = None;
}
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = None;
self.conntrack_close_tx.clear_poison();
}
}
}
pub(crate) fn publish_conntrack_close_event(
&self,
event: ConntrackCloseEvent,
) -> ConntrackClosePublishResult {
let tx = match self.conntrack_close_tx.lock() {
Ok(guard) => guard.clone(),
Err(poisoned) => {
let guard = poisoned.into_inner();
let cloned = guard.clone();
self.conntrack_close_tx.clear_poison();
cloned
}
};
let Some(tx) = tx else {
return ConntrackClosePublishResult::Disabled;
};
match tx.try_send(event) {
Ok(()) => ConntrackClosePublishResult::Sent,
Err(mpsc::error::TrySendError::Full(_)) => ConntrackClosePublishResult::QueueFull,
Err(mpsc::error::TrySendError::Closed(_)) => ConntrackClosePublishResult::QueueClosed,
}
}
pub(crate) fn set_conntrack_pressure_active(&self, active: bool) {
self.conntrack_pressure_active.store(active, Ordering::Relaxed);
}
pub(crate) fn conntrack_pressure_active(&self) -> bool {
self.conntrack_pressure_active.load(Ordering::Relaxed)
}
}

View File

@ -159,8 +159,8 @@ MemoryDenyWriteExecute=true
LockPersonality=true
# Allow binding to privileged ports and writing to specific paths
AmbientCapabilities=CAP_NET_BIND_SERVICE
CapabilityBoundingSet=CAP_NET_BIND_SERVICE
AmbientCapabilities=CAP_NET_BIND_SERVICE CAP_NET_ADMIN
CapabilityBoundingSet=CAP_NET_BIND_SERVICE CAP_NET_ADMIN
ReadWritePaths=/etc/telemt /var/run /var/lib/telemt
[Install]

View File

@ -91,6 +91,17 @@ pub struct Stats {
current_connections_direct: AtomicU64,
current_connections_me: AtomicU64,
handshake_timeouts: AtomicU64,
accept_permit_timeout_total: AtomicU64,
conntrack_control_enabled_gauge: AtomicBool,
conntrack_control_available_gauge: AtomicBool,
conntrack_pressure_active_gauge: AtomicBool,
conntrack_event_queue_depth_gauge: AtomicU64,
conntrack_rule_apply_ok_gauge: AtomicBool,
conntrack_delete_attempt_total: AtomicU64,
conntrack_delete_success_total: AtomicU64,
conntrack_delete_not_found_total: AtomicU64,
conntrack_delete_error_total: AtomicU64,
conntrack_close_event_drop_total: AtomicU64,
upstream_connect_attempt_total: AtomicU64,
upstream_connect_success_total: AtomicU64,
upstream_connect_fail_total: AtomicU64,
@ -528,6 +539,74 @@ impl Stats {
self.handshake_timeouts.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_accept_permit_timeout_total(&self) {
if self.telemetry_core_enabled() {
self.accept_permit_timeout_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn set_conntrack_control_enabled(&self, enabled: bool) {
self.conntrack_control_enabled_gauge
.store(enabled, Ordering::Relaxed);
}
pub fn set_conntrack_control_available(&self, available: bool) {
self.conntrack_control_available_gauge
.store(available, Ordering::Relaxed);
}
pub fn set_conntrack_pressure_active(&self, active: bool) {
self.conntrack_pressure_active_gauge
.store(active, Ordering::Relaxed);
}
pub fn set_conntrack_event_queue_depth(&self, depth: u64) {
self.conntrack_event_queue_depth_gauge
.store(depth, Ordering::Relaxed);
}
pub fn set_conntrack_rule_apply_ok(&self, ok: bool) {
self.conntrack_rule_apply_ok_gauge
.store(ok, Ordering::Relaxed);
}
pub fn increment_conntrack_delete_attempt_total(&self) {
if self.telemetry_core_enabled() {
self.conntrack_delete_attempt_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_conntrack_delete_success_total(&self) {
if self.telemetry_core_enabled() {
self.conntrack_delete_success_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_conntrack_delete_not_found_total(&self) {
if self.telemetry_core_enabled() {
self.conntrack_delete_not_found_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_conntrack_delete_error_total(&self) {
if self.telemetry_core_enabled() {
self.conntrack_delete_error_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_conntrack_close_event_drop_total(&self) {
if self.telemetry_core_enabled() {
self.conntrack_close_event_drop_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_upstream_connect_attempt_total(&self) {
if self.telemetry_core_enabled() {
self.upstream_connect_attempt_total
@ -1477,6 +1556,9 @@ impl Stats {
pub fn get_connects_bad(&self) -> u64 {
self.connects_bad.load(Ordering::Relaxed)
}
pub fn get_accept_permit_timeout_total(&self) -> u64 {
self.accept_permit_timeout_total.load(Ordering::Relaxed)
}
pub fn get_current_connections_direct(&self) -> u64 {
self.current_connections_direct.load(Ordering::Relaxed)
}
@ -1487,6 +1569,38 @@ impl Stats {
self.get_current_connections_direct()
.saturating_add(self.get_current_connections_me())
}
pub fn get_conntrack_control_enabled(&self) -> bool {
self.conntrack_control_enabled_gauge.load(Ordering::Relaxed)
}
pub fn get_conntrack_control_available(&self) -> bool {
self.conntrack_control_available_gauge
.load(Ordering::Relaxed)
}
pub fn get_conntrack_pressure_active(&self) -> bool {
self.conntrack_pressure_active_gauge.load(Ordering::Relaxed)
}
pub fn get_conntrack_event_queue_depth(&self) -> u64 {
self.conntrack_event_queue_depth_gauge
.load(Ordering::Relaxed)
}
pub fn get_conntrack_rule_apply_ok(&self) -> bool {
self.conntrack_rule_apply_ok_gauge.load(Ordering::Relaxed)
}
pub fn get_conntrack_delete_attempt_total(&self) -> u64 {
self.conntrack_delete_attempt_total.load(Ordering::Relaxed)
}
pub fn get_conntrack_delete_success_total(&self) -> u64 {
self.conntrack_delete_success_total.load(Ordering::Relaxed)
}
pub fn get_conntrack_delete_not_found_total(&self) -> u64 {
self.conntrack_delete_not_found_total.load(Ordering::Relaxed)
}
pub fn get_conntrack_delete_error_total(&self) -> u64 {
self.conntrack_delete_error_total.load(Ordering::Relaxed)
}
pub fn get_conntrack_close_event_drop_total(&self) -> u64 {
self.conntrack_close_event_drop_total.load(Ordering::Relaxed)
}
pub fn get_me_keepalive_sent(&self) -> u64 {
self.me_keepalive_sent.load(Ordering::Relaxed)
}