mirror of https://github.com/telemt/telemt.git
Compare commits
3 Commits
7b25f62438
...
aee5e24dbd
| Author | SHA1 | Date |
|---|---|---|
|
|
aee5e24dbd | |
|
|
8e7b27a16d | |
|
|
7f0057acd7 |
|
|
@ -1,58 +0,0 @@
|
|||
# Architect Mode Rules for Telemt
|
||||
|
||||
## Architecture Overview
|
||||
|
||||
```mermaid
|
||||
graph TB
|
||||
subgraph Entry
|
||||
Client[Clients] --> Listener[TCP/Unix Listener]
|
||||
end
|
||||
|
||||
subgraph Proxy Layer
|
||||
Listener --> ClientHandler[ClientHandler]
|
||||
ClientHandler --> Handshake[Handshake Validator]
|
||||
Handshake --> |Valid| Relay[Relay Layer]
|
||||
Handshake --> |Invalid| Masking[Masking/TLS Fronting]
|
||||
end
|
||||
|
||||
subgraph Transport
|
||||
Relay --> MiddleProxy[Middle-End Proxy Pool]
|
||||
Relay --> DirectRelay[Direct DC Relay]
|
||||
MiddleProxy --> TelegramDC[Telegram DCs]
|
||||
DirectRelay --> TelegramDC
|
||||
end
|
||||
```
|
||||
|
||||
## Module Dependencies
|
||||
- [`src/main.rs`](src/main.rs) - Entry point, spawns all async tasks
|
||||
- [`src/config/`](src/config/) - Configuration loading with auto-migration
|
||||
- [`src/error.rs`](src/error.rs) - Error types, must be used by all modules
|
||||
- [`src/crypto/`](src/crypto/) - AES, SHA, random number generation
|
||||
- [`src/protocol/`](src/protocol/) - MTProto constants, frame encoding, obfuscation
|
||||
- [`src/stream/`](src/stream/) - Stream wrappers, buffer pool, frame codecs
|
||||
- [`src/proxy/`](src/proxy/) - Client handling, handshake, relay logic
|
||||
- [`src/transport/`](src/transport/) - Upstream management, middle-proxy, SOCKS support
|
||||
- [`src/stats/`](src/stats/) - Statistics and replay protection
|
||||
- [`src/ip_tracker.rs`](src/ip_tracker.rs) - Per-user IP tracking
|
||||
|
||||
## Key Architectural Constraints
|
||||
|
||||
### Middle-End Proxy Mode
|
||||
- Requires public IP on interface OR 1:1 NAT with STUN probing
|
||||
- Uses separate `proxy-secret` from Telegram (NOT user secrets)
|
||||
- Falls back to direct mode automatically on STUN mismatch
|
||||
|
||||
### TLS Fronting
|
||||
- Invalid handshakes are transparently proxied to `mask_host`
|
||||
- This is critical for DPI evasion - do not change this behavior
|
||||
- `mask_unix_sock` and `mask_host` are mutually exclusive
|
||||
|
||||
### Stream Architecture
|
||||
- Buffer pool is shared globally via Arc - prevents allocation storms
|
||||
- Frame codecs implement tokio-util Encoder/Decoder traits
|
||||
- State machine in [`src/stream/state.rs`](src/stream/state.rs) manages stream transitions
|
||||
|
||||
### Configuration Migration
|
||||
- [`ProxyConfig::load()`](src/config/mod.rs:641) mutates config in-place
|
||||
- New fields must have sensible defaults
|
||||
- DC203 override is auto-injected for CDN/media support
|
||||
|
|
@ -1,23 +0,0 @@
|
|||
# Code Mode Rules for Telemt
|
||||
|
||||
## Error Handling
|
||||
- Always use [`ProxyError`](src/error.rs:168) from [`src/error.rs`](src/error.rs) for proxy operations
|
||||
- [`HandshakeResult<T,R,W>`](src/error.rs:292) returns streams on bad client - these MUST be returned for masking, never dropped
|
||||
- Use [`Recoverable`](src/error.rs:110) trait to check if errors are retryable
|
||||
|
||||
## Configuration Changes
|
||||
- [`ProxyConfig::load()`](src/config/mod.rs:641) auto-mutates config - new fields should have defaults
|
||||
- DC203 override is auto-injected if missing - do not remove this behavior
|
||||
- When adding config fields, add migration logic in [`ProxyConfig::load()`](src/config/mod.rs:641)
|
||||
|
||||
## Crypto Code
|
||||
- [`SecureRandom`](src/crypto/random.rs) from [`src/crypto/random.rs`](src/crypto/random.rs) must be used for all crypto operations
|
||||
- Never use `rand::thread_rng()` directly - use the shared `Arc<SecureRandom>`
|
||||
|
||||
## Stream Handling
|
||||
- Buffer pool [`BufferPool`](src/stream/buffer_pool.rs) is shared via Arc - always use it instead of allocating
|
||||
- Frame codecs in [`src/stream/frame_codec.rs`](src/stream/frame_codec.rs) implement tokio-util's Encoder/Decoder traits
|
||||
|
||||
## Testing
|
||||
- Tests are inline in modules using `#[cfg(test)]`
|
||||
- Use `cargo test --lib <module_name>` to run tests for specific modules
|
||||
|
|
@ -1,27 +0,0 @@
|
|||
# Debug Mode Rules for Telemt
|
||||
|
||||
## Logging
|
||||
- `RUST_LOG` environment variable takes absolute priority over all config log levels
|
||||
- Log levels: `trace`, `debug`, `info`, `warn`, `error`
|
||||
- Use `RUST_LOG=debug cargo run` for detailed operational logs
|
||||
- Use `RUST_LOG=trace cargo run` for full protocol-level debugging
|
||||
|
||||
## Middle-End Proxy Debugging
|
||||
- Set `ME_DIAG=1` environment variable for high-precision cryptography diagnostics
|
||||
- STUN probe results are logged at startup - check for mismatch between local and reflected IP
|
||||
- If Middle-End fails, check `proxy_secret_path` points to valid file from https://core.telegram.org/getProxySecret
|
||||
|
||||
## Connection Issues
|
||||
- DC connectivity is logged at startup with RTT measurements
|
||||
- If DC ping fails, check `dc_overrides` for custom addresses
|
||||
- Use `prefer_ipv6=false` in config if IPv6 is unreliable
|
||||
|
||||
## TLS Fronting Issues
|
||||
- Invalid handshakes are proxied to `mask_host` - check this host is reachable
|
||||
- `mask_unix_sock` and `mask_host` are mutually exclusive - only one can be set
|
||||
- If `mask_unix_sock` is set, socket must exist before connections arrive
|
||||
|
||||
## Common Errors
|
||||
- `ReplayAttack` - client replayed a handshake nonce, potential attack
|
||||
- `TimeSkew` - client clock is off, can disable with `ignore_time_skew=true`
|
||||
- `TgHandshakeTimeout` - upstream DC connection failed, check network
|
||||
|
|
@ -81,10 +81,21 @@ pub(super) struct ZeroCoreData {
|
|||
pub(super) connections_total: u64,
|
||||
pub(super) connections_bad_total: u64,
|
||||
pub(super) handshake_timeouts_total: u64,
|
||||
pub(super) accept_permit_timeout_total: u64,
|
||||
pub(super) configured_users: usize,
|
||||
pub(super) telemetry_core_enabled: bool,
|
||||
pub(super) telemetry_user_enabled: bool,
|
||||
pub(super) telemetry_me_level: String,
|
||||
pub(super) conntrack_control_enabled: bool,
|
||||
pub(super) conntrack_control_available: bool,
|
||||
pub(super) conntrack_pressure_active: bool,
|
||||
pub(super) conntrack_event_queue_depth: u64,
|
||||
pub(super) conntrack_rule_apply_ok: bool,
|
||||
pub(super) conntrack_delete_attempt_total: u64,
|
||||
pub(super) conntrack_delete_success_total: u64,
|
||||
pub(super) conntrack_delete_not_found_total: u64,
|
||||
pub(super) conntrack_delete_error_total: u64,
|
||||
pub(super) conntrack_close_event_drop_total: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
|
|
|
|||
|
|
@ -39,10 +39,21 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
|
|||
connections_total: stats.get_connects_all(),
|
||||
connections_bad_total: stats.get_connects_bad(),
|
||||
handshake_timeouts_total: stats.get_handshake_timeouts(),
|
||||
accept_permit_timeout_total: stats.get_accept_permit_timeout_total(),
|
||||
configured_users,
|
||||
telemetry_core_enabled: telemetry.core_enabled,
|
||||
telemetry_user_enabled: telemetry.user_enabled,
|
||||
telemetry_me_level: telemetry.me_level.to_string(),
|
||||
conntrack_control_enabled: stats.get_conntrack_control_enabled(),
|
||||
conntrack_control_available: stats.get_conntrack_control_available(),
|
||||
conntrack_pressure_active: stats.get_conntrack_pressure_active(),
|
||||
conntrack_event_queue_depth: stats.get_conntrack_event_queue_depth(),
|
||||
conntrack_rule_apply_ok: stats.get_conntrack_rule_apply_ok(),
|
||||
conntrack_delete_attempt_total: stats.get_conntrack_delete_attempt_total(),
|
||||
conntrack_delete_success_total: stats.get_conntrack_delete_success_total(),
|
||||
conntrack_delete_not_found_total: stats.get_conntrack_delete_not_found_total(),
|
||||
conntrack_delete_error_total: stats.get_conntrack_delete_error_total(),
|
||||
conntrack_close_event_drop_total: stats.get_conntrack_close_event_drop_total(),
|
||||
},
|
||||
upstream: build_zero_upstream_data(stats),
|
||||
middle_proxy: ZeroMiddleProxyData {
|
||||
|
|
|
|||
|
|
@ -48,6 +48,10 @@ const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 16;
|
|||
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 1000;
|
||||
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
|
||||
const DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS: u64 = 250;
|
||||
const DEFAULT_CONNTRACK_CONTROL_ENABLED: bool = true;
|
||||
const DEFAULT_CONNTRACK_PRESSURE_HIGH_WATERMARK_PCT: u8 = 85;
|
||||
const DEFAULT_CONNTRACK_PRESSURE_LOW_WATERMARK_PCT: u8 = 70;
|
||||
const DEFAULT_CONNTRACK_DELETE_BUDGET_PER_SEC: u64 = 4096;
|
||||
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
|
||||
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
|
||||
const DEFAULT_UPSTREAM_CONNECT_BUDGET_MS: u64 = 3000;
|
||||
|
|
@ -221,6 +225,22 @@ pub(crate) fn default_accept_permit_timeout_ms() -> u64 {
|
|||
DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS
|
||||
}
|
||||
|
||||
pub(crate) fn default_conntrack_control_enabled() -> bool {
|
||||
DEFAULT_CONNTRACK_CONTROL_ENABLED
|
||||
}
|
||||
|
||||
pub(crate) fn default_conntrack_pressure_high_watermark_pct() -> u8 {
|
||||
DEFAULT_CONNTRACK_PRESSURE_HIGH_WATERMARK_PCT
|
||||
}
|
||||
|
||||
pub(crate) fn default_conntrack_pressure_low_watermark_pct() -> u8 {
|
||||
DEFAULT_CONNTRACK_PRESSURE_LOW_WATERMARK_PCT
|
||||
}
|
||||
|
||||
pub(crate) fn default_conntrack_delete_budget_per_sec() -> u64 {
|
||||
DEFAULT_CONNTRACK_DELETE_BUDGET_PER_SEC
|
||||
}
|
||||
|
||||
pub(crate) fn default_prefer_4() -> u8 {
|
||||
4
|
||||
}
|
||||
|
|
|
|||
|
|
@ -922,6 +922,39 @@ impl ProxyConfig {
|
|||
));
|
||||
}
|
||||
|
||||
if config.server.conntrack_control.pressure_high_watermark_pct == 0
|
||||
|| config.server.conntrack_control.pressure_high_watermark_pct > 100
|
||||
{
|
||||
return Err(ProxyError::Config(
|
||||
"server.conntrack_control.pressure_high_watermark_pct must be within [1, 100]"
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if config.server.conntrack_control.pressure_low_watermark_pct
|
||||
>= config.server.conntrack_control.pressure_high_watermark_pct
|
||||
{
|
||||
return Err(ProxyError::Config(
|
||||
"server.conntrack_control.pressure_low_watermark_pct must be < pressure_high_watermark_pct"
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if config.server.conntrack_control.delete_budget_per_sec == 0 {
|
||||
return Err(ProxyError::Config(
|
||||
"server.conntrack_control.delete_budget_per_sec must be > 0".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if matches!(config.server.conntrack_control.mode, ConntrackMode::Hybrid)
|
||||
&& config.server.conntrack_control.hybrid_listener_ips.is_empty()
|
||||
{
|
||||
return Err(ProxyError::Config(
|
||||
"server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid"
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if config.general.effective_me_pool_force_close_secs() > 0
|
||||
&& config.general.effective_me_pool_force_close_secs()
|
||||
< config.general.me_pool_drain_ttl_secs
|
||||
|
|
@ -1327,6 +1360,31 @@ mod tests {
|
|||
cfg.server.api.runtime_edge_events_capacity,
|
||||
default_api_runtime_edge_events_capacity()
|
||||
);
|
||||
assert_eq!(
|
||||
cfg.server.conntrack_control.inline_conntrack_control,
|
||||
default_conntrack_control_enabled()
|
||||
);
|
||||
assert_eq!(cfg.server.conntrack_control.mode, ConntrackMode::default());
|
||||
assert_eq!(
|
||||
cfg.server.conntrack_control.backend,
|
||||
ConntrackBackend::default()
|
||||
);
|
||||
assert_eq!(
|
||||
cfg.server.conntrack_control.profile,
|
||||
ConntrackPressureProfile::default()
|
||||
);
|
||||
assert_eq!(
|
||||
cfg.server.conntrack_control.pressure_high_watermark_pct,
|
||||
default_conntrack_pressure_high_watermark_pct()
|
||||
);
|
||||
assert_eq!(
|
||||
cfg.server.conntrack_control.pressure_low_watermark_pct,
|
||||
default_conntrack_pressure_low_watermark_pct()
|
||||
);
|
||||
assert_eq!(
|
||||
cfg.server.conntrack_control.delete_budget_per_sec,
|
||||
default_conntrack_delete_budget_per_sec()
|
||||
);
|
||||
assert_eq!(cfg.access.users, default_access_users());
|
||||
assert_eq!(
|
||||
cfg.access.user_max_tcp_conns_global_each,
|
||||
|
|
@ -1472,6 +1530,31 @@ mod tests {
|
|||
server.api.runtime_edge_events_capacity,
|
||||
default_api_runtime_edge_events_capacity()
|
||||
);
|
||||
assert_eq!(
|
||||
server.conntrack_control.inline_conntrack_control,
|
||||
default_conntrack_control_enabled()
|
||||
);
|
||||
assert_eq!(server.conntrack_control.mode, ConntrackMode::default());
|
||||
assert_eq!(
|
||||
server.conntrack_control.backend,
|
||||
ConntrackBackend::default()
|
||||
);
|
||||
assert_eq!(
|
||||
server.conntrack_control.profile,
|
||||
ConntrackPressureProfile::default()
|
||||
);
|
||||
assert_eq!(
|
||||
server.conntrack_control.pressure_high_watermark_pct,
|
||||
default_conntrack_pressure_high_watermark_pct()
|
||||
);
|
||||
assert_eq!(
|
||||
server.conntrack_control.pressure_low_watermark_pct,
|
||||
default_conntrack_pressure_low_watermark_pct()
|
||||
);
|
||||
assert_eq!(
|
||||
server.conntrack_control.delete_budget_per_sec,
|
||||
default_conntrack_delete_budget_per_sec()
|
||||
);
|
||||
|
||||
let access = AccessConfig::default();
|
||||
assert_eq!(access.users, default_access_users());
|
||||
|
|
@ -2404,6 +2487,118 @@ mod tests {
|
|||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn conntrack_pressure_high_watermark_out_of_range_is_rejected() {
|
||||
let toml = r#"
|
||||
[server.conntrack_control]
|
||||
pressure_high_watermark_pct = 0
|
||||
|
||||
[censorship]
|
||||
tls_domain = "example.com"
|
||||
|
||||
[access.users]
|
||||
user = "00000000000000000000000000000000"
|
||||
"#;
|
||||
let dir = std::env::temp_dir();
|
||||
let path = dir.join("telemt_conntrack_high_watermark_invalid_test.toml");
|
||||
std::fs::write(&path, toml).unwrap();
|
||||
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||
assert!(
|
||||
err.contains("server.conntrack_control.pressure_high_watermark_pct must be within [1, 100]")
|
||||
);
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn conntrack_pressure_low_watermark_must_be_below_high() {
|
||||
let toml = r#"
|
||||
[server.conntrack_control]
|
||||
pressure_high_watermark_pct = 50
|
||||
pressure_low_watermark_pct = 50
|
||||
|
||||
[censorship]
|
||||
tls_domain = "example.com"
|
||||
|
||||
[access.users]
|
||||
user = "00000000000000000000000000000000"
|
||||
"#;
|
||||
let dir = std::env::temp_dir();
|
||||
let path = dir.join("telemt_conntrack_low_watermark_invalid_test.toml");
|
||||
std::fs::write(&path, toml).unwrap();
|
||||
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||
assert!(
|
||||
err.contains(
|
||||
"server.conntrack_control.pressure_low_watermark_pct must be < pressure_high_watermark_pct"
|
||||
)
|
||||
);
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn conntrack_delete_budget_zero_is_rejected() {
|
||||
let toml = r#"
|
||||
[server.conntrack_control]
|
||||
delete_budget_per_sec = 0
|
||||
|
||||
[censorship]
|
||||
tls_domain = "example.com"
|
||||
|
||||
[access.users]
|
||||
user = "00000000000000000000000000000000"
|
||||
"#;
|
||||
let dir = std::env::temp_dir();
|
||||
let path = dir.join("telemt_conntrack_delete_budget_invalid_test.toml");
|
||||
std::fs::write(&path, toml).unwrap();
|
||||
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||
assert!(err.contains("server.conntrack_control.delete_budget_per_sec must be > 0"));
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn conntrack_hybrid_mode_requires_listener_allow_list() {
|
||||
let toml = r#"
|
||||
[server.conntrack_control]
|
||||
mode = "hybrid"
|
||||
|
||||
[censorship]
|
||||
tls_domain = "example.com"
|
||||
|
||||
[access.users]
|
||||
user = "00000000000000000000000000000000"
|
||||
"#;
|
||||
let dir = std::env::temp_dir();
|
||||
let path = dir.join("telemt_conntrack_hybrid_requires_ips_test.toml");
|
||||
std::fs::write(&path, toml).unwrap();
|
||||
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||
assert!(
|
||||
err.contains("server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid")
|
||||
);
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn conntrack_profile_is_loaded_from_config() {
|
||||
let toml = r#"
|
||||
[server.conntrack_control]
|
||||
profile = "aggressive"
|
||||
|
||||
[censorship]
|
||||
tls_domain = "example.com"
|
||||
|
||||
[access.users]
|
||||
user = "00000000000000000000000000000000"
|
||||
"#;
|
||||
let dir = std::env::temp_dir();
|
||||
let path = dir.join("telemt_conntrack_profile_parse_test.toml");
|
||||
std::fs::write(&path, toml).unwrap();
|
||||
let cfg = ProxyConfig::load(&path).unwrap();
|
||||
assert_eq!(
|
||||
cfg.server.conntrack_control.profile,
|
||||
ConntrackPressureProfile::Aggressive
|
||||
);
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn force_close_default_matches_drain_ttl() {
|
||||
let toml = r#"
|
||||
|
|
|
|||
|
|
@ -1216,6 +1216,118 @@ impl Default for ApiConfig {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ConntrackMode {
|
||||
#[default]
|
||||
Tracked,
|
||||
Notrack,
|
||||
Hybrid,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ConntrackBackend {
|
||||
#[default]
|
||||
Auto,
|
||||
Nftables,
|
||||
Iptables,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ConntrackPressureProfile {
|
||||
Conservative,
|
||||
#[default]
|
||||
Balanced,
|
||||
Aggressive,
|
||||
}
|
||||
|
||||
impl ConntrackPressureProfile {
|
||||
pub fn client_first_byte_idle_cap_secs(self) -> u64 {
|
||||
match self {
|
||||
Self::Conservative => 30,
|
||||
Self::Balanced => 20,
|
||||
Self::Aggressive => 10,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn direct_activity_timeout_secs(self) -> u64 {
|
||||
match self {
|
||||
Self::Conservative => 180,
|
||||
Self::Balanced => 120,
|
||||
Self::Aggressive => 60,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn middle_soft_idle_cap_secs(self) -> u64 {
|
||||
match self {
|
||||
Self::Conservative => 60,
|
||||
Self::Balanced => 30,
|
||||
Self::Aggressive => 20,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn middle_hard_idle_cap_secs(self) -> u64 {
|
||||
match self {
|
||||
Self::Conservative => 180,
|
||||
Self::Balanced => 90,
|
||||
Self::Aggressive => 60,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ConntrackControlConfig {
|
||||
/// Enables runtime conntrack-control worker for pressure mitigation.
|
||||
#[serde(default = "default_conntrack_control_enabled")]
|
||||
pub inline_conntrack_control: bool,
|
||||
|
||||
/// Conntrack mode for listener ingress traffic.
|
||||
#[serde(default)]
|
||||
pub mode: ConntrackMode,
|
||||
|
||||
/// Netfilter backend used to reconcile notrack rules.
|
||||
#[serde(default)]
|
||||
pub backend: ConntrackBackend,
|
||||
|
||||
/// Pressure profile for timeout caps under resource saturation.
|
||||
#[serde(default)]
|
||||
pub profile: ConntrackPressureProfile,
|
||||
|
||||
/// Listener IP allow-list for hybrid mode.
|
||||
/// Ignored in tracked/notrack mode.
|
||||
#[serde(default)]
|
||||
pub hybrid_listener_ips: Vec<IpAddr>,
|
||||
|
||||
/// Pressure high watermark as percentage.
|
||||
#[serde(default = "default_conntrack_pressure_high_watermark_pct")]
|
||||
pub pressure_high_watermark_pct: u8,
|
||||
|
||||
/// Pressure low watermark as percentage.
|
||||
#[serde(default = "default_conntrack_pressure_low_watermark_pct")]
|
||||
pub pressure_low_watermark_pct: u8,
|
||||
|
||||
/// Maximum conntrack delete operations per second.
|
||||
#[serde(default = "default_conntrack_delete_budget_per_sec")]
|
||||
pub delete_budget_per_sec: u64,
|
||||
}
|
||||
|
||||
impl Default for ConntrackControlConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
inline_conntrack_control: default_conntrack_control_enabled(),
|
||||
mode: ConntrackMode::default(),
|
||||
backend: ConntrackBackend::default(),
|
||||
profile: ConntrackPressureProfile::default(),
|
||||
hybrid_listener_ips: Vec::new(),
|
||||
pressure_high_watermark_pct: default_conntrack_pressure_high_watermark_pct(),
|
||||
pressure_low_watermark_pct: default_conntrack_pressure_low_watermark_pct(),
|
||||
delete_budget_per_sec: default_conntrack_delete_budget_per_sec(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ServerConfig {
|
||||
#[serde(default = "default_port")]
|
||||
|
|
@ -1291,6 +1403,10 @@ pub struct ServerConfig {
|
|||
/// `0` keeps legacy unbounded wait behavior.
|
||||
#[serde(default = "default_accept_permit_timeout_ms")]
|
||||
pub accept_permit_timeout_ms: u64,
|
||||
|
||||
/// Runtime conntrack control and pressure policy.
|
||||
#[serde(default)]
|
||||
pub conntrack_control: ConntrackControlConfig,
|
||||
}
|
||||
|
||||
impl Default for ServerConfig {
|
||||
|
|
@ -1313,6 +1429,7 @@ impl Default for ServerConfig {
|
|||
listen_backlog: default_listen_backlog(),
|
||||
max_connections: default_server_max_connections(),
|
||||
accept_permit_timeout_ms: default_accept_permit_timeout_ms(),
|
||||
conntrack_control: ConntrackControlConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,704 @@
|
|||
use std::collections::BTreeSet;
|
||||
use std::net::IpAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::process::Command;
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::config::{ConntrackBackend, ConntrackMode, ProxyConfig};
|
||||
use crate::proxy::middle_relay::note_global_relay_pressure;
|
||||
use crate::proxy::shared_state::{ConntrackCloseEvent, ConntrackCloseReason, ProxySharedState};
|
||||
use crate::stats::Stats;
|
||||
|
||||
const CONNTRACK_EVENT_QUEUE_CAPACITY: usize = 32_768;
|
||||
const PRESSURE_RELEASE_TICKS: u8 = 3;
|
||||
const PRESSURE_SAMPLE_INTERVAL: Duration = Duration::from_secs(1);
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
enum NetfilterBackend {
|
||||
Nftables,
|
||||
Iptables,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct PressureSample {
|
||||
conn_pct: Option<u8>,
|
||||
fd_pct: Option<u8>,
|
||||
accept_timeout_delta: u64,
|
||||
me_queue_pressure_delta: u64,
|
||||
}
|
||||
|
||||
struct PressureState {
|
||||
active: bool,
|
||||
low_streak: u8,
|
||||
prev_accept_timeout_total: u64,
|
||||
prev_me_queue_pressure_total: u64,
|
||||
}
|
||||
|
||||
impl PressureState {
|
||||
fn new(stats: &Stats) -> Self {
|
||||
Self {
|
||||
active: false,
|
||||
low_streak: 0,
|
||||
prev_accept_timeout_total: stats.get_accept_permit_timeout_total(),
|
||||
prev_me_queue_pressure_total: stats.get_me_c2me_send_full_total(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_conntrack_controller(
|
||||
config_rx: watch::Receiver<Arc<ProxyConfig>>,
|
||||
stats: Arc<Stats>,
|
||||
shared: Arc<ProxySharedState>,
|
||||
) {
|
||||
if !cfg!(target_os = "linux") {
|
||||
let enabled = config_rx.borrow().server.conntrack_control.inline_conntrack_control;
|
||||
stats.set_conntrack_control_enabled(enabled);
|
||||
stats.set_conntrack_control_available(false);
|
||||
stats.set_conntrack_pressure_active(false);
|
||||
stats.set_conntrack_event_queue_depth(0);
|
||||
stats.set_conntrack_rule_apply_ok(false);
|
||||
shared.disable_conntrack_close_sender();
|
||||
shared.set_conntrack_pressure_active(false);
|
||||
if enabled {
|
||||
warn!("conntrack control is configured but unsupported on this OS; disabling runtime worker");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
let (tx, rx) = mpsc::channel(CONNTRACK_EVENT_QUEUE_CAPACITY);
|
||||
shared.set_conntrack_close_sender(tx);
|
||||
tokio::spawn(async move {
|
||||
run_conntrack_controller(config_rx, stats, shared, rx).await;
|
||||
});
|
||||
}
|
||||
|
||||
async fn run_conntrack_controller(
|
||||
mut config_rx: watch::Receiver<Arc<ProxyConfig>>,
|
||||
stats: Arc<Stats>,
|
||||
shared: Arc<ProxySharedState>,
|
||||
mut close_rx: mpsc::Receiver<ConntrackCloseEvent>,
|
||||
) {
|
||||
let mut cfg = config_rx.borrow().clone();
|
||||
let mut pressure_state = PressureState::new(stats.as_ref());
|
||||
let mut delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec;
|
||||
let mut backend = pick_backend(cfg.server.conntrack_control.backend);
|
||||
|
||||
apply_runtime_state(stats.as_ref(), shared.as_ref(), &cfg, backend.is_some(), false);
|
||||
reconcile_rules(&cfg, backend, stats.as_ref()).await;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
changed = config_rx.changed() => {
|
||||
if changed.is_err() {
|
||||
break;
|
||||
}
|
||||
cfg = config_rx.borrow_and_update().clone();
|
||||
backend = pick_backend(cfg.server.conntrack_control.backend);
|
||||
delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec;
|
||||
apply_runtime_state(stats.as_ref(), shared.as_ref(), &cfg, backend.is_some(), pressure_state.active);
|
||||
reconcile_rules(&cfg, backend, stats.as_ref()).await;
|
||||
}
|
||||
event = close_rx.recv() => {
|
||||
let Some(event) = event else {
|
||||
break;
|
||||
};
|
||||
stats.set_conntrack_event_queue_depth(close_rx.len() as u64);
|
||||
if !cfg.server.conntrack_control.inline_conntrack_control {
|
||||
continue;
|
||||
}
|
||||
if !pressure_state.active {
|
||||
continue;
|
||||
}
|
||||
if !matches!(event.reason, ConntrackCloseReason::Timeout | ConntrackCloseReason::Pressure | ConntrackCloseReason::Reset) {
|
||||
continue;
|
||||
}
|
||||
if delete_budget_tokens == 0 {
|
||||
continue;
|
||||
}
|
||||
stats.increment_conntrack_delete_attempt_total();
|
||||
match delete_conntrack_entry(event).await {
|
||||
DeleteOutcome::Deleted => {
|
||||
delete_budget_tokens = delete_budget_tokens.saturating_sub(1);
|
||||
stats.increment_conntrack_delete_success_total();
|
||||
}
|
||||
DeleteOutcome::NotFound => {
|
||||
delete_budget_tokens = delete_budget_tokens.saturating_sub(1);
|
||||
stats.increment_conntrack_delete_not_found_total();
|
||||
}
|
||||
DeleteOutcome::Error => {
|
||||
delete_budget_tokens = delete_budget_tokens.saturating_sub(1);
|
||||
stats.increment_conntrack_delete_error_total();
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = tokio::time::sleep(PRESSURE_SAMPLE_INTERVAL) => {
|
||||
delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec;
|
||||
stats.set_conntrack_event_queue_depth(close_rx.len() as u64);
|
||||
let sample = collect_pressure_sample(stats.as_ref(), &cfg, &mut pressure_state);
|
||||
update_pressure_state(
|
||||
stats.as_ref(),
|
||||
shared.as_ref(),
|
||||
&cfg,
|
||||
&sample,
|
||||
&mut pressure_state,
|
||||
);
|
||||
if pressure_state.active {
|
||||
note_global_relay_pressure(shared.as_ref());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
shared.disable_conntrack_close_sender();
|
||||
shared.set_conntrack_pressure_active(false);
|
||||
stats.set_conntrack_pressure_active(false);
|
||||
}
|
||||
|
||||
fn apply_runtime_state(
|
||||
stats: &Stats,
|
||||
shared: &ProxySharedState,
|
||||
cfg: &ProxyConfig,
|
||||
backend_available: bool,
|
||||
pressure_active: bool,
|
||||
) {
|
||||
let enabled = cfg.server.conntrack_control.inline_conntrack_control;
|
||||
let available = enabled && backend_available && has_cap_net_admin();
|
||||
if enabled && !available {
|
||||
warn!(
|
||||
"conntrack control enabled but unavailable (missing CAP_NET_ADMIN or backend binaries)"
|
||||
);
|
||||
}
|
||||
stats.set_conntrack_control_enabled(enabled);
|
||||
stats.set_conntrack_control_available(available);
|
||||
shared.set_conntrack_pressure_active(enabled && pressure_active);
|
||||
stats.set_conntrack_pressure_active(enabled && pressure_active);
|
||||
}
|
||||
|
||||
fn collect_pressure_sample(
|
||||
stats: &Stats,
|
||||
cfg: &ProxyConfig,
|
||||
state: &mut PressureState,
|
||||
) -> PressureSample {
|
||||
let current_connections = stats.get_current_connections_total();
|
||||
let conn_pct = if cfg.server.max_connections == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(
|
||||
((current_connections.saturating_mul(100)) / u64::from(cfg.server.max_connections))
|
||||
.min(100) as u8,
|
||||
)
|
||||
};
|
||||
|
||||
let fd_pct = fd_usage_pct();
|
||||
|
||||
let accept_total = stats.get_accept_permit_timeout_total();
|
||||
let accept_delta = accept_total.saturating_sub(state.prev_accept_timeout_total);
|
||||
state.prev_accept_timeout_total = accept_total;
|
||||
|
||||
let me_total = stats.get_me_c2me_send_full_total();
|
||||
let me_delta = me_total.saturating_sub(state.prev_me_queue_pressure_total);
|
||||
state.prev_me_queue_pressure_total = me_total;
|
||||
|
||||
PressureSample {
|
||||
conn_pct,
|
||||
fd_pct,
|
||||
accept_timeout_delta: accept_delta,
|
||||
me_queue_pressure_delta: me_delta,
|
||||
}
|
||||
}
|
||||
|
||||
fn update_pressure_state(
|
||||
stats: &Stats,
|
||||
shared: &ProxySharedState,
|
||||
cfg: &ProxyConfig,
|
||||
sample: &PressureSample,
|
||||
state: &mut PressureState,
|
||||
) {
|
||||
if !cfg.server.conntrack_control.inline_conntrack_control {
|
||||
if state.active {
|
||||
state.active = false;
|
||||
state.low_streak = 0;
|
||||
shared.set_conntrack_pressure_active(false);
|
||||
stats.set_conntrack_pressure_active(false);
|
||||
info!("Conntrack pressure mode deactivated (feature disabled)");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
let high = cfg.server.conntrack_control.pressure_high_watermark_pct;
|
||||
let low = cfg.server.conntrack_control.pressure_low_watermark_pct;
|
||||
|
||||
let high_hit = sample.conn_pct.is_some_and(|v| v >= high)
|
||||
|| sample.fd_pct.is_some_and(|v| v >= high)
|
||||
|| sample.accept_timeout_delta > 0
|
||||
|| sample.me_queue_pressure_delta > 0;
|
||||
|
||||
let low_clear = sample.conn_pct.is_none_or(|v| v <= low)
|
||||
&& sample.fd_pct.is_none_or(|v| v <= low)
|
||||
&& sample.accept_timeout_delta == 0
|
||||
&& sample.me_queue_pressure_delta == 0;
|
||||
|
||||
if !state.active && high_hit {
|
||||
state.active = true;
|
||||
state.low_streak = 0;
|
||||
shared.set_conntrack_pressure_active(true);
|
||||
stats.set_conntrack_pressure_active(true);
|
||||
info!(
|
||||
conn_pct = ?sample.conn_pct,
|
||||
fd_pct = ?sample.fd_pct,
|
||||
accept_timeout_delta = sample.accept_timeout_delta,
|
||||
me_queue_pressure_delta = sample.me_queue_pressure_delta,
|
||||
"Conntrack pressure mode activated"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if state.active && low_clear {
|
||||
state.low_streak = state.low_streak.saturating_add(1);
|
||||
if state.low_streak >= PRESSURE_RELEASE_TICKS {
|
||||
state.active = false;
|
||||
state.low_streak = 0;
|
||||
shared.set_conntrack_pressure_active(false);
|
||||
stats.set_conntrack_pressure_active(false);
|
||||
info!("Conntrack pressure mode deactivated");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
state.low_streak = 0;
|
||||
}
|
||||
|
||||
async fn reconcile_rules(cfg: &ProxyConfig, backend: Option<NetfilterBackend>, stats: &Stats) {
|
||||
if !cfg.server.conntrack_control.inline_conntrack_control {
|
||||
clear_notrack_rules_all_backends().await;
|
||||
stats.set_conntrack_rule_apply_ok(true);
|
||||
return;
|
||||
}
|
||||
|
||||
if !has_cap_net_admin() {
|
||||
stats.set_conntrack_rule_apply_ok(false);
|
||||
return;
|
||||
}
|
||||
|
||||
let Some(backend) = backend else {
|
||||
stats.set_conntrack_rule_apply_ok(false);
|
||||
return;
|
||||
};
|
||||
|
||||
let apply_result = match backend {
|
||||
NetfilterBackend::Nftables => apply_nft_rules(cfg).await,
|
||||
NetfilterBackend::Iptables => apply_iptables_rules(cfg).await,
|
||||
};
|
||||
|
||||
if let Err(error) = apply_result {
|
||||
warn!(error = %error, "Failed to reconcile conntrack/notrack rules");
|
||||
stats.set_conntrack_rule_apply_ok(false);
|
||||
} else {
|
||||
stats.set_conntrack_rule_apply_ok(true);
|
||||
}
|
||||
}
|
||||
|
||||
fn pick_backend(configured: ConntrackBackend) -> Option<NetfilterBackend> {
|
||||
match configured {
|
||||
ConntrackBackend::Auto => {
|
||||
if command_exists("nft") {
|
||||
Some(NetfilterBackend::Nftables)
|
||||
} else if command_exists("iptables") {
|
||||
Some(NetfilterBackend::Iptables)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
ConntrackBackend::Nftables => command_exists("nft").then_some(NetfilterBackend::Nftables),
|
||||
ConntrackBackend::Iptables => command_exists("iptables").then_some(NetfilterBackend::Iptables),
|
||||
}
|
||||
}
|
||||
|
||||
fn command_exists(binary: &str) -> bool {
|
||||
let Some(path_var) = std::env::var_os("PATH") else {
|
||||
return false;
|
||||
};
|
||||
std::env::split_paths(&path_var).any(|dir| {
|
||||
let candidate: PathBuf = dir.join(binary);
|
||||
candidate.exists() && candidate.is_file()
|
||||
})
|
||||
}
|
||||
|
||||
fn notrack_targets(cfg: &ProxyConfig) -> (Vec<Option<IpAddr>>, Vec<Option<IpAddr>>) {
|
||||
let mode = cfg.server.conntrack_control.mode;
|
||||
let mut v4_targets: BTreeSet<Option<IpAddr>> = BTreeSet::new();
|
||||
let mut v6_targets: BTreeSet<Option<IpAddr>> = BTreeSet::new();
|
||||
|
||||
match mode {
|
||||
ConntrackMode::Tracked => {}
|
||||
ConntrackMode::Notrack => {
|
||||
if cfg.server.listeners.is_empty() {
|
||||
if let Some(ipv4) = cfg
|
||||
.server
|
||||
.listen_addr_ipv4
|
||||
.as_ref()
|
||||
.and_then(|s| s.parse::<IpAddr>().ok())
|
||||
{
|
||||
if ipv4.is_unspecified() {
|
||||
v4_targets.insert(None);
|
||||
} else {
|
||||
v4_targets.insert(Some(ipv4));
|
||||
}
|
||||
}
|
||||
if let Some(ipv6) = cfg
|
||||
.server
|
||||
.listen_addr_ipv6
|
||||
.as_ref()
|
||||
.and_then(|s| s.parse::<IpAddr>().ok())
|
||||
{
|
||||
if ipv6.is_unspecified() {
|
||||
v6_targets.insert(None);
|
||||
} else {
|
||||
v6_targets.insert(Some(ipv6));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for listener in &cfg.server.listeners {
|
||||
if listener.ip.is_ipv4() {
|
||||
if listener.ip.is_unspecified() {
|
||||
v4_targets.insert(None);
|
||||
} else {
|
||||
v4_targets.insert(Some(listener.ip));
|
||||
}
|
||||
} else if listener.ip.is_unspecified() {
|
||||
v6_targets.insert(None);
|
||||
} else {
|
||||
v6_targets.insert(Some(listener.ip));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ConntrackMode::Hybrid => {
|
||||
for ip in &cfg.server.conntrack_control.hybrid_listener_ips {
|
||||
if ip.is_ipv4() {
|
||||
v4_targets.insert(Some(*ip));
|
||||
} else {
|
||||
v6_targets.insert(Some(*ip));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(
|
||||
v4_targets.into_iter().collect(),
|
||||
v6_targets.into_iter().collect(),
|
||||
)
|
||||
}
|
||||
|
||||
async fn apply_nft_rules(cfg: &ProxyConfig) -> Result<(), String> {
|
||||
let _ = run_command("nft", &["delete", "table", "inet", "telemt_conntrack"], None).await;
|
||||
if matches!(cfg.server.conntrack_control.mode, ConntrackMode::Tracked) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let (v4_targets, v6_targets) = notrack_targets(cfg);
|
||||
let mut rules = Vec::new();
|
||||
for ip in v4_targets {
|
||||
let rule = if let Some(ip) = ip {
|
||||
format!("tcp dport {} ip daddr {} notrack", cfg.server.port, ip)
|
||||
} else {
|
||||
format!("tcp dport {} notrack", cfg.server.port)
|
||||
};
|
||||
rules.push(rule);
|
||||
}
|
||||
for ip in v6_targets {
|
||||
let rule = if let Some(ip) = ip {
|
||||
format!("tcp dport {} ip6 daddr {} notrack", cfg.server.port, ip)
|
||||
} else {
|
||||
format!("tcp dport {} notrack", cfg.server.port)
|
||||
};
|
||||
rules.push(rule);
|
||||
}
|
||||
|
||||
let rule_blob = if rules.is_empty() {
|
||||
String::new()
|
||||
} else {
|
||||
format!(" {}\n", rules.join("\n "))
|
||||
};
|
||||
let script = format!(
|
||||
"table inet telemt_conntrack {{\n chain preraw {{\n type filter hook prerouting priority raw; policy accept;\n{rule_blob} }}\n}}\n"
|
||||
);
|
||||
run_command("nft", &["-f", "-"], Some(script)).await
|
||||
}
|
||||
|
||||
async fn apply_iptables_rules(cfg: &ProxyConfig) -> Result<(), String> {
|
||||
apply_iptables_rules_for_binary("iptables", cfg, true).await?;
|
||||
apply_iptables_rules_for_binary("ip6tables", cfg, false).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn apply_iptables_rules_for_binary(
|
||||
binary: &str,
|
||||
cfg: &ProxyConfig,
|
||||
ipv4: bool,
|
||||
) -> Result<(), String> {
|
||||
if !command_exists(binary) {
|
||||
return Ok(());
|
||||
}
|
||||
let chain = "TELEMT_NOTRACK";
|
||||
let _ = run_command(binary, &["-t", "raw", "-D", "PREROUTING", "-j", chain], None).await;
|
||||
let _ = run_command(binary, &["-t", "raw", "-F", chain], None).await;
|
||||
let _ = run_command(binary, &["-t", "raw", "-X", chain], None).await;
|
||||
|
||||
if matches!(cfg.server.conntrack_control.mode, ConntrackMode::Tracked) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
run_command(binary, &["-t", "raw", "-N", chain], None).await?;
|
||||
run_command(binary, &["-t", "raw", "-F", chain], None).await?;
|
||||
if run_command(binary, &["-t", "raw", "-C", "PREROUTING", "-j", chain], None).await.is_err() {
|
||||
run_command(binary, &["-t", "raw", "-I", "PREROUTING", "1", "-j", chain], None).await?;
|
||||
}
|
||||
|
||||
let (v4_targets, v6_targets) = notrack_targets(cfg);
|
||||
let selected = if ipv4 { v4_targets } else { v6_targets };
|
||||
for ip in selected {
|
||||
let mut args = vec![
|
||||
"-t".to_string(),
|
||||
"raw".to_string(),
|
||||
"-A".to_string(),
|
||||
chain.to_string(),
|
||||
"-p".to_string(),
|
||||
"tcp".to_string(),
|
||||
"--dport".to_string(),
|
||||
cfg.server.port.to_string(),
|
||||
];
|
||||
if let Some(ip) = ip {
|
||||
args.push("-d".to_string());
|
||||
args.push(ip.to_string());
|
||||
}
|
||||
args.push("-j".to_string());
|
||||
args.push("CT".to_string());
|
||||
args.push("--notrack".to_string());
|
||||
let arg_refs: Vec<&str> = args.iter().map(String::as_str).collect();
|
||||
run_command(binary, &arg_refs, None).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn clear_notrack_rules_all_backends() {
|
||||
let _ = run_command("nft", &["delete", "table", "inet", "telemt_conntrack"], None).await;
|
||||
let _ = run_command("iptables", &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], None).await;
|
||||
let _ = run_command("iptables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await;
|
||||
let _ = run_command("iptables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await;
|
||||
let _ = run_command("ip6tables", &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], None).await;
|
||||
let _ = run_command("ip6tables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await;
|
||||
let _ = run_command("ip6tables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await;
|
||||
}
|
||||
|
||||
enum DeleteOutcome {
|
||||
Deleted,
|
||||
NotFound,
|
||||
Error,
|
||||
}
|
||||
|
||||
async fn delete_conntrack_entry(event: ConntrackCloseEvent) -> DeleteOutcome {
|
||||
if !command_exists("conntrack") {
|
||||
return DeleteOutcome::Error;
|
||||
}
|
||||
let args = vec![
|
||||
"-D".to_string(),
|
||||
"-p".to_string(),
|
||||
"tcp".to_string(),
|
||||
"-s".to_string(),
|
||||
event.src.ip().to_string(),
|
||||
"--sport".to_string(),
|
||||
event.src.port().to_string(),
|
||||
"-d".to_string(),
|
||||
event.dst.ip().to_string(),
|
||||
"--dport".to_string(),
|
||||
event.dst.port().to_string(),
|
||||
];
|
||||
let arg_refs: Vec<&str> = args.iter().map(String::as_str).collect();
|
||||
match run_command("conntrack", &arg_refs, None).await {
|
||||
Ok(()) => DeleteOutcome::Deleted,
|
||||
Err(error) => {
|
||||
if error.contains("0 flow entries have been deleted") {
|
||||
DeleteOutcome::NotFound
|
||||
} else {
|
||||
debug!(error = %error, "conntrack delete failed");
|
||||
DeleteOutcome::Error
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_command(binary: &str, args: &[&str], stdin: Option<String>) -> Result<(), String> {
|
||||
if !command_exists(binary) {
|
||||
return Err(format!("{binary} is not available"));
|
||||
}
|
||||
let mut command = Command::new(binary);
|
||||
command.args(args);
|
||||
if stdin.is_some() {
|
||||
command.stdin(std::process::Stdio::piped());
|
||||
}
|
||||
command.stdout(std::process::Stdio::null());
|
||||
command.stderr(std::process::Stdio::piped());
|
||||
let mut child = command
|
||||
.spawn()
|
||||
.map_err(|e| format!("spawn {binary} failed: {e}"))?;
|
||||
if let Some(blob) = stdin
|
||||
&& let Some(mut writer) = child.stdin.take()
|
||||
{
|
||||
writer
|
||||
.write_all(blob.as_bytes())
|
||||
.await
|
||||
.map_err(|e| format!("stdin write {binary} failed: {e}"))?;
|
||||
}
|
||||
let output = child
|
||||
.wait_with_output()
|
||||
.await
|
||||
.map_err(|e| format!("wait {binary} failed: {e}"))?;
|
||||
if output.status.success() {
|
||||
return Ok(());
|
||||
}
|
||||
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
|
||||
Err(if stderr.is_empty() {
|
||||
format!("{binary} exited with status {}", output.status)
|
||||
} else {
|
||||
stderr
|
||||
})
|
||||
}
|
||||
|
||||
fn fd_usage_pct() -> Option<u8> {
|
||||
let soft_limit = nofile_soft_limit()?;
|
||||
if soft_limit == 0 {
|
||||
return None;
|
||||
}
|
||||
let fd_count = std::fs::read_dir("/proc/self/fd").ok()?.count() as u64;
|
||||
Some(((fd_count.saturating_mul(100)) / soft_limit).min(100) as u8)
|
||||
}
|
||||
|
||||
fn nofile_soft_limit() -> Option<u64> {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
let mut lim = libc::rlimit {
|
||||
rlim_cur: 0,
|
||||
rlim_max: 0,
|
||||
};
|
||||
let rc = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut lim) };
|
||||
if rc != 0 {
|
||||
return None;
|
||||
}
|
||||
return Some(lim.rlim_cur);
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn has_cap_net_admin() -> bool {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
let Ok(status) = std::fs::read_to_string("/proc/self/status") else {
|
||||
return false;
|
||||
};
|
||||
for line in status.lines() {
|
||||
if let Some(raw) = line.strip_prefix("CapEff:") {
|
||||
let caps = raw.trim();
|
||||
if let Ok(bits) = u64::from_str_radix(caps, 16) {
|
||||
const CAP_NET_ADMIN_BIT: u64 = 12;
|
||||
return (bits & (1u64 << CAP_NET_ADMIN_BIT)) != 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::config::ProxyConfig;
|
||||
|
||||
#[test]
|
||||
fn pressure_activates_on_accept_timeout_spike() {
|
||||
let stats = Stats::new();
|
||||
let shared = ProxySharedState::new();
|
||||
let mut cfg = ProxyConfig::default();
|
||||
cfg.server.conntrack_control.inline_conntrack_control = true;
|
||||
let mut state = PressureState::new(&stats);
|
||||
let sample = PressureSample {
|
||||
conn_pct: Some(10),
|
||||
fd_pct: Some(10),
|
||||
accept_timeout_delta: 1,
|
||||
me_queue_pressure_delta: 0,
|
||||
};
|
||||
|
||||
update_pressure_state(&stats, shared.as_ref(), &cfg, &sample, &mut state);
|
||||
|
||||
assert!(state.active);
|
||||
assert!(shared.conntrack_pressure_active());
|
||||
assert!(stats.get_conntrack_pressure_active());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pressure_releases_after_hysteresis_window() {
|
||||
let stats = Stats::new();
|
||||
let shared = ProxySharedState::new();
|
||||
let mut cfg = ProxyConfig::default();
|
||||
cfg.server.conntrack_control.inline_conntrack_control = true;
|
||||
let mut state = PressureState::new(&stats);
|
||||
|
||||
let high_sample = PressureSample {
|
||||
conn_pct: Some(95),
|
||||
fd_pct: Some(95),
|
||||
accept_timeout_delta: 0,
|
||||
me_queue_pressure_delta: 0,
|
||||
};
|
||||
update_pressure_state(&stats, shared.as_ref(), &cfg, &high_sample, &mut state);
|
||||
assert!(state.active);
|
||||
|
||||
let low_sample = PressureSample {
|
||||
conn_pct: Some(10),
|
||||
fd_pct: Some(10),
|
||||
accept_timeout_delta: 0,
|
||||
me_queue_pressure_delta: 0,
|
||||
};
|
||||
update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state);
|
||||
assert!(state.active);
|
||||
update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state);
|
||||
assert!(state.active);
|
||||
update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state);
|
||||
|
||||
assert!(!state.active);
|
||||
assert!(!shared.conntrack_pressure_active());
|
||||
assert!(!stats.get_conntrack_pressure_active());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pressure_does_not_activate_when_disabled() {
|
||||
let stats = Stats::new();
|
||||
let shared = ProxySharedState::new();
|
||||
let mut cfg = ProxyConfig::default();
|
||||
cfg.server.conntrack_control.inline_conntrack_control = false;
|
||||
let mut state = PressureState::new(&stats);
|
||||
let sample = PressureSample {
|
||||
conn_pct: Some(100),
|
||||
fd_pct: Some(100),
|
||||
accept_timeout_delta: 10,
|
||||
me_queue_pressure_delta: 10,
|
||||
};
|
||||
|
||||
update_pressure_state(&stats, shared.as_ref(), &cfg, &sample, &mut state);
|
||||
|
||||
assert!(!state.active);
|
||||
assert!(!shared.conntrack_pressure_active());
|
||||
assert!(!stats.get_conntrack_pressure_active());
|
||||
}
|
||||
}
|
||||
|
|
@ -262,6 +262,7 @@ pub(crate) async fn bind_listeners(
|
|||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
stats.increment_accept_permit_timeout_total();
|
||||
debug!(
|
||||
timeout_ms = accept_permit_timeout_ms,
|
||||
"Dropping accepted unix connection: permit wait timeout"
|
||||
|
|
@ -407,6 +408,7 @@ pub(crate) fn spawn_tcp_accept_loops(
|
|||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
stats.increment_accept_permit_timeout_total();
|
||||
debug!(
|
||||
peer = %peer_addr,
|
||||
timeout_ms = accept_permit_timeout_ms,
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ use tracing::{error, info, warn};
|
|||
use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload};
|
||||
|
||||
use crate::api;
|
||||
use crate::conntrack_control;
|
||||
use crate::config::{LogLevel, ProxyConfig};
|
||||
use crate::crypto::SecureRandom;
|
||||
use crate::ip_tracker::UserIpTracker;
|
||||
|
|
@ -633,6 +634,11 @@ async fn run_inner(
|
|||
.await;
|
||||
let _admission_tx_hold = admission_tx;
|
||||
let shared_state = ProxySharedState::new();
|
||||
conntrack_control::spawn_conntrack_controller(
|
||||
config_rx.clone(),
|
||||
stats.clone(),
|
||||
shared_state.clone(),
|
||||
);
|
||||
|
||||
let bound = listeners::bind_listeners(
|
||||
&config,
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
mod api;
|
||||
mod cli;
|
||||
mod conntrack_control;
|
||||
mod config;
|
||||
mod crypto;
|
||||
#[cfg(unix)]
|
||||
|
|
|
|||
128
src/metrics.rs
128
src/metrics.rs
|
|
@ -359,6 +359,134 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_accept_permit_timeout_total Accepted connections dropped due to permit wait timeout"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_accept_permit_timeout_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_accept_permit_timeout_total {}",
|
||||
if core_enabled {
|
||||
stats.get_accept_permit_timeout_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_conntrack_control_state Runtime conntrack control state flags"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_conntrack_control_state gauge");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_conntrack_control_state{{flag=\"enabled\"}} {}",
|
||||
if stats.get_conntrack_control_enabled() {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_conntrack_control_state{{flag=\"available\"}} {}",
|
||||
if stats.get_conntrack_control_available() {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_conntrack_control_state{{flag=\"pressure_active\"}} {}",
|
||||
if stats.get_conntrack_pressure_active() {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_conntrack_control_state{{flag=\"rule_apply_ok\"}} {}",
|
||||
if stats.get_conntrack_rule_apply_ok() {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_conntrack_event_queue_depth Pending close events in conntrack control queue"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_conntrack_event_queue_depth gauge");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_conntrack_event_queue_depth {}",
|
||||
stats.get_conntrack_event_queue_depth()
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_conntrack_delete_total Conntrack delete attempts by outcome"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_conntrack_delete_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_conntrack_delete_total{{result=\"attempt\"}} {}",
|
||||
if core_enabled {
|
||||
stats.get_conntrack_delete_attempt_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_conntrack_delete_total{{result=\"success\"}} {}",
|
||||
if core_enabled {
|
||||
stats.get_conntrack_delete_success_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_conntrack_delete_total{{result=\"not_found\"}} {}",
|
||||
if core_enabled {
|
||||
stats.get_conntrack_delete_not_found_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_conntrack_delete_total{{result=\"error\"}} {}",
|
||||
if core_enabled {
|
||||
stats.get_conntrack_delete_error_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_conntrack_close_event_drop_total Dropped conntrack close events due to queue pressure or unavailable sender"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# TYPE telemt_conntrack_close_event_drop_total counter"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_conntrack_close_event_drop_total {}",
|
||||
if core_enabled {
|
||||
stats.get_conntrack_close_event_drop_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_upstream_connect_attempt_total Upstream connect attempts across all requests"
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ use crate::transport::middle_proxy::MePool;
|
|||
use crate::transport::socket::normalize_ip;
|
||||
use crate::transport::{UpstreamManager, configure_client_socket, parse_proxy_protocol};
|
||||
|
||||
use crate::proxy::direct_relay::handle_via_direct;
|
||||
use crate::proxy::direct_relay::handle_via_direct_with_shared;
|
||||
use crate::proxy::handshake::{
|
||||
HandshakeSuccess, handle_mtproto_handshake_with_shared, handle_tls_handshake_with_shared,
|
||||
};
|
||||
|
|
@ -191,6 +191,24 @@ fn handshake_timeout_with_mask_grace(config: &ProxyConfig) -> Duration {
|
|||
}
|
||||
}
|
||||
|
||||
fn effective_client_first_byte_idle_secs(config: &ProxyConfig, shared: &ProxySharedState) -> u64 {
|
||||
let idle_secs = config.timeouts.client_first_byte_idle_secs;
|
||||
if idle_secs == 0 {
|
||||
return 0;
|
||||
}
|
||||
if shared.conntrack_pressure_active() {
|
||||
idle_secs.min(
|
||||
config
|
||||
.server
|
||||
.conntrack_control
|
||||
.profile
|
||||
.client_first_byte_idle_cap_secs(),
|
||||
)
|
||||
} else {
|
||||
idle_secs
|
||||
}
|
||||
}
|
||||
|
||||
const MASK_CLASSIFIER_PREFETCH_WINDOW: usize = 16;
|
||||
#[cfg(test)]
|
||||
const MASK_CLASSIFIER_PREFETCH_TIMEOUT: Duration = Duration::from_millis(5);
|
||||
|
|
@ -463,10 +481,11 @@ where
|
|||
|
||||
debug!(peer = %real_peer, "New connection (generic stream)");
|
||||
|
||||
let first_byte = if config.timeouts.client_first_byte_idle_secs == 0 {
|
||||
let first_byte_idle_secs = effective_client_first_byte_idle_secs(&config, shared.as_ref());
|
||||
let first_byte = if first_byte_idle_secs == 0 {
|
||||
None
|
||||
} else {
|
||||
let idle_timeout = Duration::from_secs(config.timeouts.client_first_byte_idle_secs);
|
||||
let idle_timeout = Duration::from_secs(first_byte_idle_secs);
|
||||
let mut first_byte = [0u8; 1];
|
||||
match timeout(idle_timeout, stream.read(&mut first_byte)).await {
|
||||
Ok(Ok(0)) => {
|
||||
|
|
@ -502,7 +521,7 @@ where
|
|||
Err(_) => {
|
||||
debug!(
|
||||
peer = %real_peer,
|
||||
idle_secs = config.timeouts.client_first_byte_idle_secs,
|
||||
idle_secs = first_byte_idle_secs,
|
||||
"Closing idle pooled connection before first client byte"
|
||||
);
|
||||
return Ok(());
|
||||
|
|
@ -968,11 +987,12 @@ impl RunningClientHandler {
|
|||
}
|
||||
}
|
||||
|
||||
let first_byte = if self.config.timeouts.client_first_byte_idle_secs == 0 {
|
||||
let first_byte_idle_secs =
|
||||
effective_client_first_byte_idle_secs(&self.config, self.shared.as_ref());
|
||||
let first_byte = if first_byte_idle_secs == 0 {
|
||||
None
|
||||
} else {
|
||||
let idle_timeout =
|
||||
Duration::from_secs(self.config.timeouts.client_first_byte_idle_secs);
|
||||
let idle_timeout = Duration::from_secs(first_byte_idle_secs);
|
||||
let mut first_byte = [0u8; 1];
|
||||
match timeout(idle_timeout, self.stream.read(&mut first_byte)).await {
|
||||
Ok(Ok(0)) => {
|
||||
|
|
@ -1008,7 +1028,7 @@ impl RunningClientHandler {
|
|||
Err(_) => {
|
||||
debug!(
|
||||
peer = %self.peer,
|
||||
idle_secs = self.config.timeouts.client_first_byte_idle_secs,
|
||||
idle_secs = first_byte_idle_secs,
|
||||
"Closing idle pooled connection before first client byte"
|
||||
);
|
||||
return Ok(None);
|
||||
|
|
@ -1395,7 +1415,7 @@ impl RunningClientHandler {
|
|||
local_addr: SocketAddr,
|
||||
peer_addr: SocketAddr,
|
||||
ip_tracker: Arc<UserIpTracker>,
|
||||
_shared: Arc<ProxySharedState>,
|
||||
shared: Arc<ProxySharedState>,
|
||||
) -> Result<()>
|
||||
where
|
||||
R: AsyncRead + Unpin + Send + 'static,
|
||||
|
|
@ -1438,12 +1458,12 @@ impl RunningClientHandler {
|
|||
route_runtime.subscribe(),
|
||||
route_snapshot,
|
||||
session_id,
|
||||
_shared,
|
||||
shared.clone(),
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
warn!("use_middle_proxy=true but MePool not initialized, falling back to direct");
|
||||
handle_via_direct(
|
||||
handle_via_direct_with_shared(
|
||||
client_reader,
|
||||
client_writer,
|
||||
success,
|
||||
|
|
@ -1455,12 +1475,14 @@ impl RunningClientHandler {
|
|||
route_runtime.subscribe(),
|
||||
route_snapshot,
|
||||
session_id,
|
||||
local_addr,
|
||||
shared.clone(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
} else {
|
||||
// Direct mode (original behavior)
|
||||
handle_via_direct(
|
||||
handle_via_direct_with_shared(
|
||||
client_reader,
|
||||
client_writer,
|
||||
success,
|
||||
|
|
@ -1472,6 +1494,8 @@ impl RunningClientHandler {
|
|||
route_runtime.subscribe(),
|
||||
route_snapshot,
|
||||
session_id,
|
||||
local_addr,
|
||||
shared.clone(),
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ use std::net::SocketAddr;
|
|||
use std::path::{Component, Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Mutex, OnceLock};
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf, split};
|
||||
use tokio::sync::watch;
|
||||
|
|
@ -16,7 +17,9 @@ use crate::crypto::SecureRandom;
|
|||
use crate::error::{ProxyError, Result};
|
||||
use crate::protocol::constants::*;
|
||||
use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce};
|
||||
use crate::proxy::relay::relay_bidirectional;
|
||||
use crate::proxy::shared_state::{
|
||||
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
|
||||
};
|
||||
use crate::proxy::route_mode::{
|
||||
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
|
||||
cutover_stagger_delay,
|
||||
|
|
@ -225,7 +228,43 @@ fn unknown_dc_test_lock() -> &'static Mutex<()> {
|
|||
TEST_LOCK.get_or_init(|| Mutex::new(()))
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn handle_via_direct<R, W>(
|
||||
client_reader: CryptoReader<R>,
|
||||
client_writer: CryptoWriter<W>,
|
||||
success: HandshakeSuccess,
|
||||
upstream_manager: Arc<UpstreamManager>,
|
||||
stats: Arc<Stats>,
|
||||
config: Arc<ProxyConfig>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
rng: Arc<SecureRandom>,
|
||||
route_rx: watch::Receiver<RouteCutoverState>,
|
||||
route_snapshot: RouteCutoverState,
|
||||
session_id: u64,
|
||||
) -> Result<()>
|
||||
where
|
||||
R: AsyncRead + Unpin + Send + 'static,
|
||||
W: AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
handle_via_direct_with_shared(
|
||||
client_reader,
|
||||
client_writer,
|
||||
success,
|
||||
upstream_manager,
|
||||
stats,
|
||||
config.clone(),
|
||||
buffer_pool,
|
||||
rng,
|
||||
route_rx,
|
||||
route_snapshot,
|
||||
session_id,
|
||||
SocketAddr::from(([0, 0, 0, 0], config.server.port)),
|
||||
ProxySharedState::new(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_via_direct_with_shared<R, W>(
|
||||
client_reader: CryptoReader<R>,
|
||||
client_writer: CryptoWriter<W>,
|
||||
success: HandshakeSuccess,
|
||||
|
|
@ -237,6 +276,8 @@ pub(crate) async fn handle_via_direct<R, W>(
|
|||
mut route_rx: watch::Receiver<RouteCutoverState>,
|
||||
route_snapshot: RouteCutoverState,
|
||||
session_id: u64,
|
||||
local_addr: SocketAddr,
|
||||
shared: Arc<ProxySharedState>,
|
||||
) -> Result<()>
|
||||
where
|
||||
R: AsyncRead + Unpin + Send + 'static,
|
||||
|
|
@ -277,7 +318,18 @@ where
|
|||
let _direct_connection_lease = stats.acquire_direct_connection_lease();
|
||||
|
||||
let buffer_pool_trim = Arc::clone(&buffer_pool);
|
||||
let relay_result = relay_bidirectional(
|
||||
let relay_activity_timeout = if shared.conntrack_pressure_active() {
|
||||
Duration::from_secs(
|
||||
config
|
||||
.server
|
||||
.conntrack_control
|
||||
.profile
|
||||
.direct_activity_timeout_secs(),
|
||||
)
|
||||
} else {
|
||||
Duration::from_secs(1800)
|
||||
};
|
||||
let relay_result = crate::proxy::relay::relay_bidirectional_with_activity_timeout(
|
||||
client_reader,
|
||||
client_writer,
|
||||
tg_reader,
|
||||
|
|
@ -288,6 +340,7 @@ where
|
|||
Arc::clone(&stats),
|
||||
config.access.user_data_quota.get(user).copied(),
|
||||
buffer_pool,
|
||||
relay_activity_timeout,
|
||||
);
|
||||
tokio::pin!(relay_result);
|
||||
let relay_result = loop {
|
||||
|
|
@ -329,9 +382,52 @@ where
|
|||
pool_snapshot.allocated,
|
||||
pool_snapshot.allocated.saturating_sub(pool_snapshot.pooled),
|
||||
);
|
||||
|
||||
let close_reason = classify_conntrack_close_reason(&relay_result);
|
||||
let publish_result = shared.publish_conntrack_close_event(ConntrackCloseEvent {
|
||||
src: success.peer,
|
||||
dst: local_addr,
|
||||
reason: close_reason,
|
||||
});
|
||||
if !matches!(
|
||||
publish_result,
|
||||
ConntrackClosePublishResult::Sent | ConntrackClosePublishResult::Disabled
|
||||
) {
|
||||
stats.increment_conntrack_close_event_drop_total();
|
||||
}
|
||||
|
||||
relay_result
|
||||
}
|
||||
|
||||
fn classify_conntrack_close_reason(result: &Result<()>) -> ConntrackCloseReason {
|
||||
match result {
|
||||
Ok(()) => ConntrackCloseReason::NormalEof,
|
||||
Err(crate::error::ProxyError::Io(error))
|
||||
if matches!(error.kind(), std::io::ErrorKind::TimedOut) =>
|
||||
{
|
||||
ConntrackCloseReason::Timeout
|
||||
}
|
||||
Err(crate::error::ProxyError::Io(error))
|
||||
if matches!(
|
||||
error.kind(),
|
||||
std::io::ErrorKind::ConnectionReset
|
||||
| std::io::ErrorKind::ConnectionAborted
|
||||
| std::io::ErrorKind::BrokenPipe
|
||||
| std::io::ErrorKind::NotConnected
|
||||
| std::io::ErrorKind::UnexpectedEof
|
||||
) =>
|
||||
{
|
||||
ConntrackCloseReason::Reset
|
||||
}
|
||||
Err(crate::error::ProxyError::Proxy(message))
|
||||
if message.contains("pressure") || message.contains("evicted") =>
|
||||
{
|
||||
ConntrackCloseReason::Pressure
|
||||
}
|
||||
Err(_) => ConntrackCloseReason::Other,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> {
|
||||
let prefer_v6 = config.network.prefer == 6 && config.network.ipv6.unwrap_or(true);
|
||||
let datacenters = if prefer_v6 {
|
||||
|
|
|
|||
|
|
@ -16,12 +16,14 @@ use tokio::sync::{mpsc, oneshot, watch};
|
|||
use tokio::time::timeout;
|
||||
use tracing::{debug, info, trace, warn};
|
||||
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::config::{ConntrackPressureProfile, ProxyConfig};
|
||||
use crate::crypto::SecureRandom;
|
||||
use crate::error::{ProxyError, Result};
|
||||
use crate::protocol::constants::{secure_padding_len, *};
|
||||
use crate::proxy::handshake::HandshakeSuccess;
|
||||
use crate::proxy::shared_state::ProxySharedState;
|
||||
use crate::proxy::shared_state::{
|
||||
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
|
||||
};
|
||||
use crate::proxy::route_mode::{
|
||||
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
|
||||
cutover_stagger_delay,
|
||||
|
|
@ -135,6 +137,10 @@ fn note_relay_pressure_event_in(shared: &ProxySharedState) {
|
|||
guard.pressure_event_seq = guard.pressure_event_seq.wrapping_add(1);
|
||||
}
|
||||
|
||||
pub(crate) fn note_global_relay_pressure(shared: &ProxySharedState) {
|
||||
note_relay_pressure_event_in(shared);
|
||||
}
|
||||
|
||||
fn relay_pressure_event_seq_in(shared: &ProxySharedState) -> u64 {
|
||||
let guard = relay_idle_candidate_registry_lock_in(shared);
|
||||
guard.pressure_event_seq
|
||||
|
|
@ -241,6 +247,23 @@ impl RelayClientIdlePolicy {
|
|||
legacy_frame_read_timeout: frame_read_timeout,
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_pressure_caps(&mut self, profile: ConntrackPressureProfile) {
|
||||
let pressure_soft_idle_cap = Duration::from_secs(profile.middle_soft_idle_cap_secs());
|
||||
let pressure_hard_idle_cap = Duration::from_secs(profile.middle_hard_idle_cap_secs());
|
||||
|
||||
self.soft_idle = self.soft_idle.min(pressure_soft_idle_cap);
|
||||
self.hard_idle = self.hard_idle.min(pressure_hard_idle_cap);
|
||||
if self.soft_idle > self.hard_idle {
|
||||
self.soft_idle = self.hard_idle;
|
||||
}
|
||||
self.legacy_frame_read_timeout = self
|
||||
.legacy_frame_read_timeout
|
||||
.min(pressure_hard_idle_cap);
|
||||
if self.grace_after_downstream_activity > self.hard_idle {
|
||||
self.grace_after_downstream_activity = self.hard_idle;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
|
|
@ -1027,7 +1050,12 @@ where
|
|||
let translated_local_addr = me_pool.translate_our_addr(local_addr);
|
||||
|
||||
let frame_limit = config.general.max_client_frame;
|
||||
let relay_idle_policy = RelayClientIdlePolicy::from_config(&config);
|
||||
let mut relay_idle_policy = RelayClientIdlePolicy::from_config(&config);
|
||||
let mut pressure_caps_applied = false;
|
||||
if shared.conntrack_pressure_active() {
|
||||
relay_idle_policy.apply_pressure_caps(config.server.conntrack_control.profile);
|
||||
pressure_caps_applied = true;
|
||||
}
|
||||
let session_started_at = forensics.started_at;
|
||||
let mut relay_idle_state = RelayClientIdleState::new(session_started_at);
|
||||
let last_downstream_activity_ms = Arc::new(AtomicU64::new(0));
|
||||
|
|
@ -1421,6 +1449,11 @@ where
|
|||
let mut route_watch_open = true;
|
||||
let mut seen_pressure_seq = relay_pressure_event_seq_in(shared.as_ref());
|
||||
loop {
|
||||
if shared.conntrack_pressure_active() && !pressure_caps_applied {
|
||||
relay_idle_policy.apply_pressure_caps(config.server.conntrack_control.profile);
|
||||
pressure_caps_applied = true;
|
||||
}
|
||||
|
||||
if relay_idle_policy.enabled
|
||||
&& maybe_evict_idle_candidate_on_pressure_in(
|
||||
shared.as_ref(),
|
||||
|
|
@ -1600,6 +1633,20 @@ where
|
|||
frames_ok = frame_counter,
|
||||
"ME relay cleanup"
|
||||
);
|
||||
|
||||
let close_reason = classify_conntrack_close_reason(&result);
|
||||
let publish_result = shared.publish_conntrack_close_event(ConntrackCloseEvent {
|
||||
src: peer,
|
||||
dst: local_addr,
|
||||
reason: close_reason,
|
||||
});
|
||||
if !matches!(
|
||||
publish_result,
|
||||
ConntrackClosePublishResult::Sent | ConntrackClosePublishResult::Disabled
|
||||
) {
|
||||
stats.increment_conntrack_close_event_drop_total();
|
||||
}
|
||||
|
||||
clear_relay_idle_candidate_in(shared.as_ref(), conn_id);
|
||||
me_pool.registry().unregister(conn_id).await;
|
||||
buffer_pool.trim_to(buffer_pool.max_buffers().min(64));
|
||||
|
|
@ -1612,6 +1659,33 @@ where
|
|||
result
|
||||
}
|
||||
|
||||
fn classify_conntrack_close_reason(result: &Result<()>) -> ConntrackCloseReason {
|
||||
match result {
|
||||
Ok(()) => ConntrackCloseReason::NormalEof,
|
||||
Err(ProxyError::Io(error)) if matches!(error.kind(), std::io::ErrorKind::TimedOut) => {
|
||||
ConntrackCloseReason::Timeout
|
||||
}
|
||||
Err(ProxyError::Io(error))
|
||||
if matches!(
|
||||
error.kind(),
|
||||
std::io::ErrorKind::ConnectionReset
|
||||
| std::io::ErrorKind::ConnectionAborted
|
||||
| std::io::ErrorKind::BrokenPipe
|
||||
| std::io::ErrorKind::NotConnected
|
||||
| std::io::ErrorKind::UnexpectedEof
|
||||
) =>
|
||||
{
|
||||
ConntrackCloseReason::Reset
|
||||
}
|
||||
Err(ProxyError::Proxy(message))
|
||||
if message.contains("pressure") || message.contains("evicted") =>
|
||||
{
|
||||
ConntrackCloseReason::Pressure
|
||||
}
|
||||
Err(_) => ConntrackCloseReason::Other,
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_client_payload_with_idle_policy_in<R>(
|
||||
client_reader: &mut CryptoReader<R>,
|
||||
proto_tag: ProtoTag,
|
||||
|
|
|
|||
|
|
@ -70,6 +70,7 @@ use tracing::{debug, trace, warn};
|
|||
///
|
||||
/// iOS keeps Telegram connections alive in background for up to 30 minutes.
|
||||
/// Closing earlier causes unnecessary reconnects and handshake overhead.
|
||||
#[allow(dead_code)]
|
||||
const ACTIVITY_TIMEOUT: Duration = Duration::from_secs(1800);
|
||||
|
||||
/// Watchdog check interval — also used for periodic rate logging.
|
||||
|
|
@ -453,6 +454,7 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
|
|||
/// - Clean shutdown: both write sides are shut down on exit
|
||||
/// - Error propagation: quota exits return `ProxyError::DataQuotaExceeded`,
|
||||
/// other I/O failures are returned as `ProxyError::Io`
|
||||
#[allow(dead_code)]
|
||||
pub async fn relay_bidirectional<CR, CW, SR, SW>(
|
||||
client_reader: CR,
|
||||
client_writer: CW,
|
||||
|
|
@ -471,6 +473,42 @@ where
|
|||
SR: AsyncRead + Unpin + Send + 'static,
|
||||
SW: AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
relay_bidirectional_with_activity_timeout(
|
||||
client_reader,
|
||||
client_writer,
|
||||
server_reader,
|
||||
server_writer,
|
||||
c2s_buf_size,
|
||||
s2c_buf_size,
|
||||
user,
|
||||
stats,
|
||||
quota_limit,
|
||||
_buffer_pool,
|
||||
ACTIVITY_TIMEOUT,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn relay_bidirectional_with_activity_timeout<CR, CW, SR, SW>(
|
||||
client_reader: CR,
|
||||
client_writer: CW,
|
||||
server_reader: SR,
|
||||
server_writer: SW,
|
||||
c2s_buf_size: usize,
|
||||
s2c_buf_size: usize,
|
||||
user: &str,
|
||||
stats: Arc<Stats>,
|
||||
quota_limit: Option<u64>,
|
||||
_buffer_pool: Arc<BufferPool>,
|
||||
activity_timeout: Duration,
|
||||
) -> Result<()>
|
||||
where
|
||||
CR: AsyncRead + Unpin + Send + 'static,
|
||||
CW: AsyncWrite + Unpin + Send + 'static,
|
||||
SR: AsyncRead + Unpin + Send + 'static,
|
||||
SW: AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
let activity_timeout = activity_timeout.max(Duration::from_secs(1));
|
||||
let epoch = Instant::now();
|
||||
let counters = Arc::new(SharedCounters::new());
|
||||
let quota_exceeded = Arc::new(AtomicBool::new(false));
|
||||
|
|
@ -512,7 +550,7 @@ where
|
|||
}
|
||||
|
||||
// ── Activity timeout ────────────────────────────────────
|
||||
if idle >= ACTIVITY_TIMEOUT {
|
||||
if idle >= activity_timeout {
|
||||
let c2s = wd_counters.c2s_bytes.load(Ordering::Relaxed);
|
||||
let s2c = wd_counters.s2c_bytes.load(Ordering::Relaxed);
|
||||
warn!(
|
||||
|
|
|
|||
|
|
@ -1,15 +1,40 @@
|
|||
use std::collections::HashSet;
|
||||
use std::collections::hash_map::RandomState;
|
||||
use std::net::IpAddr;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Instant;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::proxy::handshake::{AuthProbeState, AuthProbeSaturationState};
|
||||
use crate::proxy::middle_relay::{DesyncDedupRotationState, RelayIdleCandidateRegistry};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(crate) enum ConntrackCloseReason {
|
||||
NormalEof,
|
||||
Timeout,
|
||||
Pressure,
|
||||
Reset,
|
||||
Other,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(crate) struct ConntrackCloseEvent {
|
||||
pub(crate) src: SocketAddr,
|
||||
pub(crate) dst: SocketAddr,
|
||||
pub(crate) reason: ConntrackCloseReason,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(crate) enum ConntrackClosePublishResult {
|
||||
Sent,
|
||||
Disabled,
|
||||
QueueFull,
|
||||
QueueClosed,
|
||||
}
|
||||
|
||||
pub(crate) struct HandshakeSharedState {
|
||||
pub(crate) auth_probe: DashMap<IpAddr, AuthProbeState>,
|
||||
pub(crate) auth_probe_saturation: Mutex<Option<AuthProbeSaturationState>>,
|
||||
|
|
@ -31,6 +56,8 @@ pub(crate) struct MiddleRelaySharedState {
|
|||
pub(crate) struct ProxySharedState {
|
||||
pub(crate) handshake: HandshakeSharedState,
|
||||
pub(crate) middle_relay: MiddleRelaySharedState,
|
||||
pub(crate) conntrack_pressure_active: AtomicBool,
|
||||
pub(crate) conntrack_close_tx: Mutex<Option<mpsc::Sender<ConntrackCloseEvent>>>,
|
||||
}
|
||||
|
||||
impl ProxySharedState {
|
||||
|
|
@ -52,6 +79,67 @@ impl ProxySharedState {
|
|||
relay_idle_registry: Mutex::new(RelayIdleCandidateRegistry::default()),
|
||||
relay_idle_mark_seq: AtomicU64::new(0),
|
||||
},
|
||||
conntrack_pressure_active: AtomicBool::new(false),
|
||||
conntrack_close_tx: Mutex::new(None),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn set_conntrack_close_sender(&self, tx: mpsc::Sender<ConntrackCloseEvent>) {
|
||||
match self.conntrack_close_tx.lock() {
|
||||
Ok(mut guard) => {
|
||||
*guard = Some(tx);
|
||||
}
|
||||
Err(poisoned) => {
|
||||
let mut guard = poisoned.into_inner();
|
||||
*guard = Some(tx);
|
||||
self.conntrack_close_tx.clear_poison();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn disable_conntrack_close_sender(&self) {
|
||||
match self.conntrack_close_tx.lock() {
|
||||
Ok(mut guard) => {
|
||||
*guard = None;
|
||||
}
|
||||
Err(poisoned) => {
|
||||
let mut guard = poisoned.into_inner();
|
||||
*guard = None;
|
||||
self.conntrack_close_tx.clear_poison();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn publish_conntrack_close_event(
|
||||
&self,
|
||||
event: ConntrackCloseEvent,
|
||||
) -> ConntrackClosePublishResult {
|
||||
let tx = match self.conntrack_close_tx.lock() {
|
||||
Ok(guard) => guard.clone(),
|
||||
Err(poisoned) => {
|
||||
let guard = poisoned.into_inner();
|
||||
let cloned = guard.clone();
|
||||
self.conntrack_close_tx.clear_poison();
|
||||
cloned
|
||||
}
|
||||
};
|
||||
|
||||
let Some(tx) = tx else {
|
||||
return ConntrackClosePublishResult::Disabled;
|
||||
};
|
||||
|
||||
match tx.try_send(event) {
|
||||
Ok(()) => ConntrackClosePublishResult::Sent,
|
||||
Err(mpsc::error::TrySendError::Full(_)) => ConntrackClosePublishResult::QueueFull,
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => ConntrackClosePublishResult::QueueClosed,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_conntrack_pressure_active(&self, active: bool) {
|
||||
self.conntrack_pressure_active.store(active, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub(crate) fn conntrack_pressure_active(&self) -> bool {
|
||||
self.conntrack_pressure_active.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -159,8 +159,8 @@ MemoryDenyWriteExecute=true
|
|||
LockPersonality=true
|
||||
|
||||
# Allow binding to privileged ports and writing to specific paths
|
||||
AmbientCapabilities=CAP_NET_BIND_SERVICE
|
||||
CapabilityBoundingSet=CAP_NET_BIND_SERVICE
|
||||
AmbientCapabilities=CAP_NET_BIND_SERVICE CAP_NET_ADMIN
|
||||
CapabilityBoundingSet=CAP_NET_BIND_SERVICE CAP_NET_ADMIN
|
||||
ReadWritePaths=/etc/telemt /var/run /var/lib/telemt
|
||||
|
||||
[Install]
|
||||
|
|
|
|||
114
src/stats/mod.rs
114
src/stats/mod.rs
|
|
@ -91,6 +91,17 @@ pub struct Stats {
|
|||
current_connections_direct: AtomicU64,
|
||||
current_connections_me: AtomicU64,
|
||||
handshake_timeouts: AtomicU64,
|
||||
accept_permit_timeout_total: AtomicU64,
|
||||
conntrack_control_enabled_gauge: AtomicBool,
|
||||
conntrack_control_available_gauge: AtomicBool,
|
||||
conntrack_pressure_active_gauge: AtomicBool,
|
||||
conntrack_event_queue_depth_gauge: AtomicU64,
|
||||
conntrack_rule_apply_ok_gauge: AtomicBool,
|
||||
conntrack_delete_attempt_total: AtomicU64,
|
||||
conntrack_delete_success_total: AtomicU64,
|
||||
conntrack_delete_not_found_total: AtomicU64,
|
||||
conntrack_delete_error_total: AtomicU64,
|
||||
conntrack_close_event_drop_total: AtomicU64,
|
||||
upstream_connect_attempt_total: AtomicU64,
|
||||
upstream_connect_success_total: AtomicU64,
|
||||
upstream_connect_fail_total: AtomicU64,
|
||||
|
|
@ -528,6 +539,74 @@ impl Stats {
|
|||
self.handshake_timeouts.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn increment_accept_permit_timeout_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.accept_permit_timeout_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_conntrack_control_enabled(&self, enabled: bool) {
|
||||
self.conntrack_control_enabled_gauge
|
||||
.store(enabled, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn set_conntrack_control_available(&self, available: bool) {
|
||||
self.conntrack_control_available_gauge
|
||||
.store(available, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn set_conntrack_pressure_active(&self, active: bool) {
|
||||
self.conntrack_pressure_active_gauge
|
||||
.store(active, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn set_conntrack_event_queue_depth(&self, depth: u64) {
|
||||
self.conntrack_event_queue_depth_gauge
|
||||
.store(depth, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn set_conntrack_rule_apply_ok(&self, ok: bool) {
|
||||
self.conntrack_rule_apply_ok_gauge
|
||||
.store(ok, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn increment_conntrack_delete_attempt_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.conntrack_delete_attempt_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn increment_conntrack_delete_success_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.conntrack_delete_success_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn increment_conntrack_delete_not_found_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.conntrack_delete_not_found_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn increment_conntrack_delete_error_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.conntrack_delete_error_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn increment_conntrack_close_event_drop_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.conntrack_close_event_drop_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn increment_upstream_connect_attempt_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.upstream_connect_attempt_total
|
||||
|
|
@ -1477,6 +1556,9 @@ impl Stats {
|
|||
pub fn get_connects_bad(&self) -> u64 {
|
||||
self.connects_bad.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_accept_permit_timeout_total(&self) -> u64 {
|
||||
self.accept_permit_timeout_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_current_connections_direct(&self) -> u64 {
|
||||
self.current_connections_direct.load(Ordering::Relaxed)
|
||||
}
|
||||
|
|
@ -1487,6 +1569,38 @@ impl Stats {
|
|||
self.get_current_connections_direct()
|
||||
.saturating_add(self.get_current_connections_me())
|
||||
}
|
||||
pub fn get_conntrack_control_enabled(&self) -> bool {
|
||||
self.conntrack_control_enabled_gauge.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_conntrack_control_available(&self) -> bool {
|
||||
self.conntrack_control_available_gauge
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_conntrack_pressure_active(&self) -> bool {
|
||||
self.conntrack_pressure_active_gauge.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_conntrack_event_queue_depth(&self) -> u64 {
|
||||
self.conntrack_event_queue_depth_gauge
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_conntrack_rule_apply_ok(&self) -> bool {
|
||||
self.conntrack_rule_apply_ok_gauge.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_conntrack_delete_attempt_total(&self) -> u64 {
|
||||
self.conntrack_delete_attempt_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_conntrack_delete_success_total(&self) -> u64 {
|
||||
self.conntrack_delete_success_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_conntrack_delete_not_found_total(&self) -> u64 {
|
||||
self.conntrack_delete_not_found_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_conntrack_delete_error_total(&self) -> u64 {
|
||||
self.conntrack_delete_error_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_conntrack_close_event_drop_total(&self) -> u64 {
|
||||
self.conntrack_close_event_drop_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_keepalive_sent(&self) -> u64 {
|
||||
self.me_keepalive_sent.load(Ordering::Relaxed)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue