From 6e3b4a1ce5fb50444d1177d3399c74244d693ece Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 17 Apr 2026 15:11:36 +0300 Subject: [PATCH 1/5] ServerHello fixes --- src/tls_front/emulator.rs | 64 +++++++++---------- ...mulator_profile_fidelity_security_tests.rs | 29 +++++++-- 2 files changed, 56 insertions(+), 37 deletions(-) diff --git a/src/tls_front/emulator.rs b/src/tls_front/emulator.rs index ba8453a..a7f2eb3 100644 --- a/src/tls_front/emulator.rs +++ b/src/tls_front/emulator.rs @@ -18,9 +18,6 @@ fn jitter_and_clamp_sizes(sizes: &[usize], rng: &SecureRandom) -> Vec { .iter() .map(|&size| { let base = size.clamp(MIN_APP_DATA, MAX_APP_DATA); - if base == MIN_APP_DATA || base == MAX_APP_DATA { - return base; - } let jitter_range = ((base as f64) * 0.03).round() as i64; if jitter_range == 0 { return base; @@ -69,9 +66,19 @@ fn ensure_payload_capacity(mut sizes: Vec, payload_len: usize) -> Vec Vec { match cached.behavior_profile.source { TlsProfileSource::Raw | TlsProfileSource::Merged => { - if !cached.behavior_profile.app_data_record_sizes.is_empty() { - return cached.behavior_profile.app_data_record_sizes.clone(); - } + return cached + .app_data_records_sizes + .first() + .copied() + .or_else(|| { + cached + .behavior_profile + .app_data_record_sizes + .first() + .copied() + }) + .map(|size| vec![size]) + .unwrap_or_else(|| vec![cached.total_app_data_len.max(1024)]); } TlsProfileSource::Default | TlsProfileSource::Rustls => {} } @@ -83,8 +90,8 @@ fn emulated_app_data_sizes(cached: &CachedTlsData) -> Vec { sizes } -fn emulated_change_cipher_spec_count(cached: &CachedTlsData) -> usize { - usize::from(cached.behavior_profile.change_cipher_spec_count.max(1)) +fn emulated_change_cipher_spec_count(_cached: &CachedTlsData) -> usize { + 1 } fn emulated_ticket_record_sizes( @@ -92,19 +99,20 @@ fn emulated_ticket_record_sizes( new_session_tickets: u8, rng: &SecureRandom, ) -> Vec { - let mut sizes = match cached.behavior_profile.source { + let target_count = usize::from(new_session_tickets.min(MAX_TICKET_RECORDS as u8)); + if target_count == 0 { + return Vec::new(); + } + + let profiled_sizes = match cached.behavior_profile.source { TlsProfileSource::Raw | TlsProfileSource::Merged => { - cached.behavior_profile.ticket_record_sizes.clone() + cached.behavior_profile.ticket_record_sizes.as_slice() } - TlsProfileSource::Default | TlsProfileSource::Rustls => Vec::new(), + TlsProfileSource::Default | TlsProfileSource::Rustls => &[], }; - let target_count = sizes - .len() - .max(usize::from( - new_session_tickets.min(MAX_TICKET_RECORDS as u8), - )) - .min(MAX_TICKET_RECORDS); + let mut sizes = Vec::with_capacity(target_count); + sizes.extend(profiled_sizes.iter().copied().take(target_count)); while sizes.len() < target_count { sizes.push(rng.range(48) + 48); @@ -511,7 +519,7 @@ mod tests { } #[test] - fn test_build_emulated_server_hello_replays_tail_records_for_profiled_tls() { + fn test_build_emulated_server_hello_ignores_tail_records_for_profiled_tls() { let mut cached = make_cached(None); cached.app_data_records_sizes = vec![27, 3905, 537, 69]; cached.total_app_data_len = 4538; @@ -533,19 +541,11 @@ mod tests { let hello_len = u16::from_be_bytes([response[3], response[4]]) as usize; let ccs_start = 5 + hello_len; - let mut pos = ccs_start + 6; - let mut app_lengths = Vec::new(); - while pos + 5 <= response.len() { - assert_eq!(response[pos], TLS_RECORD_APPLICATION); - let record_len = u16::from_be_bytes([response[pos + 3], response[pos + 4]]) as usize; - app_lengths.push(record_len); - pos += 5 + record_len; - } - - assert_eq!(app_lengths.len(), 4); - assert_eq!(app_lengths[0], 64); - assert_eq!(app_lengths[3], 69); - assert!(app_lengths[1] >= 64); - assert!(app_lengths[2] >= 64); + let app_start = ccs_start + 6; + let app_len = + u16::from_be_bytes([response[app_start + 3], response[app_start + 4]]) as usize; + assert_eq!(response[app_start], TLS_RECORD_APPLICATION); + assert_eq!(app_len, 64); + assert_eq!(app_start + 5 + app_len, response.len()); } } diff --git a/src/tls_front/tests/emulator_profile_fidelity_security_tests.rs b/src/tls_front/tests/emulator_profile_fidelity_security_tests.rs index 694fd76..1a40e9b 100644 --- a/src/tls_front/tests/emulator_profile_fidelity_security_tests.rs +++ b/src/tls_front/tests/emulator_profile_fidelity_security_tests.rs @@ -52,7 +52,7 @@ fn record_lengths_by_type(response: &[u8], wanted_type: u8) -> Vec { } #[test] -fn emulated_server_hello_replays_profile_change_cipher_spec_count() { +fn emulated_server_hello_keeps_single_change_cipher_spec_for_client_compatibility() { let cached = make_cached(); let rng = SecureRandom::new(); @@ -69,12 +69,12 @@ fn emulated_server_hello_replays_profile_change_cipher_spec_count() { assert_eq!(response[0], TLS_RECORD_HANDSHAKE); let ccs_records = record_lengths_by_type(&response, TLS_RECORD_CHANGE_CIPHER); - assert_eq!(ccs_records.len(), 2); + assert_eq!(ccs_records.len(), 1); assert!(ccs_records.iter().all(|len| *len == 1)); } #[test] -fn emulated_server_hello_replays_profile_ticket_tail_lengths() { +fn emulated_server_hello_does_not_emit_profile_ticket_tail_when_disabled() { let cached = make_cached(); let rng = SecureRandom::new(); @@ -90,6 +90,25 @@ fn emulated_server_hello_replays_profile_ticket_tail_lengths() { ); let app_records = record_lengths_by_type(&response, TLS_RECORD_APPLICATION); - assert!(app_records.len() >= 4); - assert_eq!(&app_records[app_records.len() - 2..], &[220, 180]); + assert_eq!(app_records, vec![1200]); +} + +#[test] +fn emulated_server_hello_uses_profile_ticket_lengths_when_enabled() { + let cached = make_cached(); + let rng = SecureRandom::new(); + + let response = build_emulated_server_hello( + b"secret", + &[0x91; 32], + &[0x92; 16], + &cached, + false, + &rng, + None, + 2, + ); + + let app_records = record_lengths_by_type(&response, TLS_RECORD_APPLICATION); + assert_eq!(app_records, vec![1200, 220, 180]); } From 3ca3e8ff0edc5ac1fb1a2bf93d5bc2f976cc864a Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 17 Apr 2026 16:36:15 +0300 Subject: [PATCH 2/5] Docker Health-Check --- Dockerfile | 4 + docker-compose.yml | 6 ++ src/api/mod.rs | 29 +++++- src/api/model.rs | 11 ++ src/cli.rs | 70 ++++++++++++- src/healthcheck.rs | 208 ++++++++++++++++++++++++++++++++++++++ src/main.rs | 1 + src/transport/upstream.rs | 20 ++++ 8 files changed, 347 insertions(+), 2 deletions(-) create mode 100644 src/healthcheck.rs diff --git a/Dockerfile b/Dockerfile index d138ce9..0955031 100644 --- a/Dockerfile +++ b/Dockerfile @@ -77,6 +77,8 @@ COPY config.toml /app/config.toml EXPOSE 443 9090 9091 +HEALTHCHECK --interval=30s --timeout=5s --start-period=20s --retries=3 CMD ["/app/telemt", "healthcheck", "/app/config.toml", "--mode", "liveness"] + ENTRYPOINT ["/app/telemt"] CMD ["config.toml"] @@ -94,5 +96,7 @@ USER nonroot:nonroot EXPOSE 443 9090 9091 +HEALTHCHECK --interval=30s --timeout=5s --start-period=20s --retries=3 CMD ["/app/telemt", "healthcheck", "/app/config.toml", "--mode", "liveness"] + ENTRYPOINT ["/app/telemt"] CMD ["config.toml"] diff --git a/docker-compose.yml b/docker-compose.yml index e1cd678..9fd0641 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,6 +16,12 @@ services: - /etc/telemt:rw,mode=1777,size=4m environment: - RUST_LOG=info + healthcheck: + test: [ "CMD", "/app/telemt", "healthcheck", "/etc/telemt/config.toml", "--mode", "liveness" ] + interval: 30s + timeout: 5s + retries: 3 + start_period: 20s # Uncomment this line if you want to use host network for IPv6, but bridge is default and usually better # network_mode: host cap_drop: diff --git a/src/api/mod.rs b/src/api/mod.rs index 850fb0e..8ee46f7 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -41,7 +41,7 @@ use config_store::{current_revision, load_config_from_disk, parse_if_match}; use events::ApiEventStore; use http_utils::{error_response, read_json, read_optional_json, success_response}; use model::{ - ApiFailure, CreateUserRequest, DeleteUserResponse, HealthData, PatchUserRequest, + ApiFailure, CreateUserRequest, DeleteUserResponse, HealthData, HealthReadyData, PatchUserRequest, RotateSecretRequest, SummaryData, UserActiveIps, }; use runtime_edge::{ @@ -275,6 +275,33 @@ async fn handle( }; Ok(success_response(StatusCode::OK, data, revision)) } + ("GET", "/v1/health/ready") => { + let revision = current_revision(&shared.config_path).await?; + let admission_open = shared.runtime_state.admission_open.load(Ordering::Relaxed); + let upstream_health = shared.upstream_manager.api_health_summary().await; + let ready = admission_open && upstream_health.healthy_total > 0; + let reason = if ready { + None + } else if !admission_open { + Some("admission_closed") + } else { + Some("no_healthy_upstreams") + }; + let data = HealthReadyData { + ready, + status: if ready { "ready" } else { "not_ready" }, + reason, + admission_open, + healthy_upstreams: upstream_health.healthy_total, + total_upstreams: upstream_health.configured_total, + }; + let status_code = if ready { + StatusCode::OK + } else { + StatusCode::SERVICE_UNAVAILABLE + }; + Ok(success_response(status_code, data, revision)) + } ("GET", "/v1/system/info") => { let revision = current_revision(&shared.config_path).await?; let data = build_system_info_data(shared.as_ref(), cfg.as_ref(), &revision); diff --git a/src/api/model.rs b/src/api/model.rs index 66de644..c6e24ea 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -60,6 +60,17 @@ pub(super) struct HealthData { pub(super) read_only: bool, } +#[derive(Serialize)] +pub(super) struct HealthReadyData { + pub(super) ready: bool, + pub(super) status: &'static str, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) reason: Option<&'static str>, + pub(super) admission_open: bool, + pub(super) healthy_upstreams: usize, + pub(super) total_upstreams: usize, +} + #[derive(Serialize)] pub(super) struct SummaryData { pub(super) uptime_seconds: f64, diff --git a/src/cli.rs b/src/cli.rs index 47a10d5..2e24017 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -6,12 +6,15 @@ //! - `reload [--pid-file PATH]` - Reload configuration (SIGHUP) //! - `status [--pid-file PATH]` - Check daemon status //! - `run [OPTIONS] [config.toml]` - Run in foreground (default behavior) +//! - `healthcheck [OPTIONS] [config.toml]` - Run control-plane health probe use rand::RngExt; use std::fs; use std::path::{Path, PathBuf}; use std::process::Command; +use crate::healthcheck::{self, HealthcheckMode}; + #[cfg(unix)] use crate::daemon::{self, DEFAULT_PID_FILE, DaemonOptions}; @@ -28,6 +31,8 @@ pub enum Subcommand { Reload, /// Check daemon status (`status` subcommand). Status, + /// Run health probe and exit with status code. + Healthcheck, /// Fire-and-forget setup (`--init`). Init, } @@ -38,6 +43,8 @@ pub struct ParsedCommand { pub subcommand: Subcommand, pub pid_file: PathBuf, pub config_path: String, + pub healthcheck_mode: HealthcheckMode, + pub healthcheck_mode_invalid: Option, #[cfg(unix)] pub daemon_opts: DaemonOptions, pub init_opts: Option, @@ -52,6 +59,8 @@ impl Default for ParsedCommand { #[cfg(not(unix))] pid_file: PathBuf::from("/var/run/telemt.pid"), config_path: "config.toml".to_string(), + healthcheck_mode: HealthcheckMode::Liveness, + healthcheck_mode_invalid: None, #[cfg(unix)] daemon_opts: DaemonOptions::default(), init_opts: None, @@ -91,6 +100,9 @@ pub fn parse_command(args: &[String]) -> ParsedCommand { "status" => { cmd.subcommand = Subcommand::Status; } + "healthcheck" => { + cmd.subcommand = Subcommand::Healthcheck; + } "run" => { cmd.subcommand = Subcommand::Run; #[cfg(unix)] @@ -113,7 +125,35 @@ pub fn parse_command(args: &[String]) -> ParsedCommand { while i < args.len() { match args[i].as_str() { // Skip subcommand names - "start" | "stop" | "reload" | "status" | "run" => {} + "start" | "stop" | "reload" | "status" | "run" | "healthcheck" => {} + "--mode" => { + i += 1; + if i < args.len() { + match HealthcheckMode::from_cli_arg(&args[i]) { + Some(mode) => { + cmd.healthcheck_mode = mode; + cmd.healthcheck_mode_invalid = None; + } + None => { + cmd.healthcheck_mode_invalid = Some(args[i].clone()); + } + } + } else { + cmd.healthcheck_mode_invalid = Some(String::new()); + } + } + s if s.starts_with("--mode=") => { + let raw = s.trim_start_matches("--mode="); + match HealthcheckMode::from_cli_arg(raw) { + Some(mode) => { + cmd.healthcheck_mode = mode; + cmd.healthcheck_mode_invalid = None; + } + None => { + cmd.healthcheck_mode_invalid = Some(raw.to_string()); + } + } + } // PID file option (for stop/reload/status) "--pid-file" => { i += 1; @@ -152,6 +192,20 @@ pub fn execute_subcommand(cmd: &ParsedCommand) -> Option { Subcommand::Stop => Some(cmd_stop(&cmd.pid_file)), Subcommand::Reload => Some(cmd_reload(&cmd.pid_file)), Subcommand::Status => Some(cmd_status(&cmd.pid_file)), + Subcommand::Healthcheck => { + if let Some(invalid_mode) = cmd.healthcheck_mode_invalid.as_ref() { + if invalid_mode.is_empty() { + eprintln!("[telemt] Missing value for --mode (supported: liveness, ready)"); + } else { + eprintln!( + "[telemt] Invalid --mode value '{invalid_mode}' (supported: liveness, ready)" + ); + } + Some(2) + } else { + Some(healthcheck::run(&cmd.config_path, cmd.healthcheck_mode)) + } + } Subcommand::Init => { if let Some(opts) = cmd.init_opts.clone() { match run_init(opts) { @@ -177,6 +231,20 @@ pub fn execute_subcommand(cmd: &ParsedCommand) -> Option { eprintln!("[telemt] Subcommand not supported on this platform"); Some(1) } + Subcommand::Healthcheck => { + if let Some(invalid_mode) = cmd.healthcheck_mode_invalid.as_ref() { + if invalid_mode.is_empty() { + eprintln!("[telemt] Missing value for --mode (supported: liveness, ready)"); + } else { + eprintln!( + "[telemt] Invalid --mode value '{invalid_mode}' (supported: liveness, ready)" + ); + } + Some(2) + } else { + Some(healthcheck::run(&cmd.config_path, cmd.healthcheck_mode)) + } + } Subcommand::Init => { if let Some(opts) = cmd.init_opts.clone() { match run_init(opts) { diff --git a/src/healthcheck.rs b/src/healthcheck.rs new file mode 100644 index 0000000..be9a381 --- /dev/null +++ b/src/healthcheck.rs @@ -0,0 +1,208 @@ +use std::io::{Read, Write}; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, TcpStream}; +use std::time::Duration; + +use serde_json::Value; + +use crate::config::ProxyConfig; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum HealthcheckMode { + Liveness, + Ready, +} + +impl HealthcheckMode { + pub(crate) fn from_cli_arg(value: &str) -> Option { + match value { + "liveness" => Some(Self::Liveness), + "ready" => Some(Self::Ready), + _ => None, + } + } + + fn request_path(self) -> &'static str { + match self { + Self::Liveness => "/v1/health", + Self::Ready => "/v1/health/ready", + } + } +} + +pub(crate) fn run(config_path: &str, mode: HealthcheckMode) -> i32 { + match run_inner(config_path, mode) { + Ok(()) => 0, + Err(error) => { + eprintln!("[telemt] healthcheck failed: {error}"); + 1 + } + } +} + +fn run_inner(config_path: &str, mode: HealthcheckMode) -> Result<(), String> { + let config = + ProxyConfig::load(config_path).map_err(|error| format!("config load failed: {error}"))?; + let api_cfg = &config.server.api; + if !api_cfg.enabled { + return Ok(()); + } + + let listen: SocketAddr = api_cfg + .listen + .parse() + .map_err(|_| format!("invalid API listen address: {}", api_cfg.listen))?; + if listen.port() == 0 { + return Err("API listen port is 0".to_string()); + } + let target = probe_target(listen); + + let mut stream = TcpStream::connect_timeout(&target, Duration::from_secs(2)) + .map_err(|error| format!("connect {target} failed: {error}"))?; + stream + .set_read_timeout(Some(Duration::from_secs(2))) + .map_err(|error| format!("set read timeout failed: {error}"))?; + stream + .set_write_timeout(Some(Duration::from_secs(2))) + .map_err(|error| format!("set write timeout failed: {error}"))?; + + let request = build_request(target, mode.request_path(), &api_cfg.auth_header); + stream + .write_all(request.as_bytes()) + .map_err(|error| format!("request write failed: {error}"))?; + stream + .flush() + .map_err(|error| format!("request flush failed: {error}"))?; + + let mut raw_response = Vec::new(); + stream + .read_to_end(&mut raw_response) + .map_err(|error| format!("response read failed: {error}"))?; + let response = + String::from_utf8(raw_response).map_err(|_| "response is not valid UTF-8".to_string())?; + + let (status_code, body) = split_response(&response)?; + if status_code != 200 { + return Err(format!("HTTP status {status_code}")); + } + + validate_payload(mode, body)?; + Ok(()) +} + +fn probe_target(listen: SocketAddr) -> SocketAddr { + match listen { + SocketAddr::V4(addr) => { + let ip = if addr.ip().is_unspecified() { + Ipv4Addr::LOCALHOST + } else { + *addr.ip() + }; + SocketAddr::from((ip, addr.port())) + } + SocketAddr::V6(addr) => { + let ip = if addr.ip().is_unspecified() { + Ipv6Addr::LOCALHOST + } else { + *addr.ip() + }; + SocketAddr::from((ip, addr.port())) + } + } +} + +fn build_request(target: SocketAddr, path: &str, auth_header: &str) -> String { + let mut request = format!("GET {path} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n", target); + if !auth_header.is_empty() { + request.push_str("Authorization: "); + request.push_str(auth_header); + request.push_str("\r\n"); + } + request.push_str("\r\n"); + request +} + +fn split_response(response: &str) -> Result<(u16, &str), String> { + let header_end = response + .find("\r\n\r\n") + .ok_or_else(|| "invalid HTTP response headers".to_string())?; + let header = &response[..header_end]; + let body = &response[header_end + 4..]; + let status_line = header + .lines() + .next() + .ok_or_else(|| "missing HTTP status line".to_string())?; + let status_code = parse_status_code(status_line)?; + Ok((status_code, body)) +} + +fn parse_status_code(status_line: &str) -> Result { + let mut parts = status_line.split_whitespace(); + let version = parts + .next() + .ok_or_else(|| "missing HTTP version".to_string())?; + if !version.starts_with("HTTP/") { + return Err(format!("invalid HTTP status line: {status_line}")); + } + let code = parts + .next() + .ok_or_else(|| "missing HTTP status code".to_string())?; + code.parse::() + .map_err(|_| format!("invalid HTTP status code: {code}")) +} + +fn validate_payload(mode: HealthcheckMode, body: &str) -> Result<(), String> { + let payload: Value = + serde_json::from_str(body).map_err(|_| "response body is not valid JSON".to_string())?; + if payload.get("ok").and_then(Value::as_bool) != Some(true) { + return Err("response JSON has ok=false".to_string()); + } + + let data = payload + .get("data") + .ok_or_else(|| "response JSON has no data field".to_string())?; + match mode { + HealthcheckMode::Liveness => { + if data.get("status").and_then(Value::as_str) != Some("ok") { + return Err("liveness status is not ok".to_string()); + } + } + HealthcheckMode::Ready => { + if data.get("ready").and_then(Value::as_bool) != Some(true) { + return Err("readiness flag is false".to_string()); + } + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::{HealthcheckMode, parse_status_code, split_response, validate_payload}; + + #[test] + fn parse_status_code_reads_http_200() { + let status = parse_status_code("HTTP/1.1 200 OK").expect("must parse status"); + assert_eq!(status, 200); + } + + #[test] + fn split_response_extracts_status_and_body() { + let response = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n{\"ok\":true}"; + let (status, body) = split_response(response).expect("must split response"); + assert_eq!(status, 200); + assert_eq!(body, "{\"ok\":true}"); + } + + #[test] + fn validate_payload_accepts_liveness_contract() { + let body = "{\"ok\":true,\"data\":{\"status\":\"ok\"}}"; + validate_payload(HealthcheckMode::Liveness, body).expect("liveness payload must pass"); + } + + #[test] + fn validate_payload_rejects_not_ready() { + let body = "{\"ok\":true,\"data\":{\"ready\":false}}"; + let result = validate_payload(HealthcheckMode::Ready, body); + assert!(result.is_err()); + } +} diff --git a/src/main.rs b/src/main.rs index 5c134b8..ce6b943 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ mod crypto; #[cfg(unix)] mod daemon; mod error; +mod healthcheck; mod ip_tracker; #[cfg(test)] #[path = "tests/ip_tracker_encapsulation_adversarial_tests.rs"] diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index 791fc00..229dfbf 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -279,6 +279,12 @@ pub struct UpstreamApiSummarySnapshot { pub shadowsocks_total: usize, } +#[derive(Debug, Clone, Copy, Default)] +pub struct UpstreamApiHealthSummary { + pub configured_total: usize, + pub healthy_total: usize, +} + #[derive(Debug, Clone)] pub struct UpstreamApiSnapshot { pub summary: UpstreamApiSummarySnapshot, @@ -444,6 +450,20 @@ impl UpstreamManager { Some(UpstreamApiSnapshot { summary, upstreams }) } + pub async fn api_health_summary(&self) -> UpstreamApiHealthSummary { + let guard = self.upstreams.read().await; + let mut summary = UpstreamApiHealthSummary { + configured_total: guard.len(), + healthy_total: 0, + }; + for upstream in guard.iter() { + if upstream.healthy { + summary.healthy_total += 1; + } + } + summary + } + fn describe_upstream(upstream_type: &UpstreamType) -> (UpstreamRouteKind, String) { match upstream_type { UpstreamType::Direct { .. } => (UpstreamRouteKind::Direct, "direct".to_string()), From 093faed0c2b893d590cd8132c1962ba4f757dac1 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 17 Apr 2026 19:06:18 +0300 Subject: [PATCH 3/5] Conntrack Control for Docker --- Dockerfile | 26 +++++++++ docker-compose.host-netfilter.yml | 10 ++++ docker-compose.netfilter.yml | 8 +++ docker-compose.yml | 5 +- src/config/load.rs | 40 +++++++++++++ src/config/types.rs | 5 ++ src/conntrack_control.rs | 93 ++++++++++++++++++++----------- 7 files changed, 152 insertions(+), 35 deletions(-) create mode 100644 docker-compose.host-netfilter.yml create mode 100644 docker-compose.netfilter.yml diff --git a/Dockerfile b/Dockerfile index 0955031..6ef7f93 100644 --- a/Dockerfile +++ b/Dockerfile @@ -82,6 +82,32 @@ HEALTHCHECK --interval=30s --timeout=5s --start-period=20s --retries=3 CMD ["/ap ENTRYPOINT ["/app/telemt"] CMD ["config.toml"] +# ========================== +# Production Netfilter Profile +# ========================== +FROM debian:12-slim AS prod-netfilter + +RUN set -eux; \ + apt-get update; \ + apt-get install -y --no-install-recommends \ + ca-certificates \ + conntrack \ + nftables \ + iptables; \ + rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY --from=minimal /telemt /app/telemt +COPY config.toml /app/config.toml + +EXPOSE 443 9090 9091 + +HEALTHCHECK --interval=30s --timeout=5s --start-period=20s --retries=3 CMD ["/app/telemt", "healthcheck", "/app/config.toml", "--mode", "liveness"] + +ENTRYPOINT ["/app/telemt"] +CMD ["config.toml"] + # ========================== # Production Distroless on MUSL # ========================== diff --git a/docker-compose.host-netfilter.yml b/docker-compose.host-netfilter.yml new file mode 100644 index 0000000..4682489 --- /dev/null +++ b/docker-compose.host-netfilter.yml @@ -0,0 +1,10 @@ +services: + telemt: + build: + context: . + target: prod-netfilter + network_mode: host + ports: [] + cap_add: + - NET_BIND_SERVICE + - NET_ADMIN diff --git a/docker-compose.netfilter.yml b/docker-compose.netfilter.yml new file mode 100644 index 0000000..2aa35ed --- /dev/null +++ b/docker-compose.netfilter.yml @@ -0,0 +1,8 @@ +services: + telemt: + build: + context: . + target: prod-netfilter + cap_add: + - NET_BIND_SERVICE + - NET_ADMIN diff --git a/docker-compose.yml b/docker-compose.yml index 9fd0641..d6aa07a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,9 @@ services: telemt: image: ghcr.io/telemt/telemt:latest - build: . + build: + context: . + target: prod container_name: telemt restart: unless-stopped ports: @@ -28,7 +30,6 @@ services: - ALL cap_add: - NET_BIND_SERVICE - - NET_ADMIN read_only: true security_opt: - no-new-privileges:true diff --git a/src/config/load.rs b/src/config/load.rs index d15773c..c2b5e78 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -343,6 +343,10 @@ impl ProxyConfig { let network_table = parsed_toml .get("network") .and_then(|value| value.as_table()); + let server_table = parsed_toml.get("server").and_then(|value| value.as_table()); + let conntrack_control_table = server_table + .and_then(|table| table.get("conntrack_control")) + .and_then(|value| value.as_table()); let update_every_is_explicit = general_table .map(|table| table.contains_key("update_every")) .unwrap_or(false); @@ -372,10 +376,17 @@ impl ProxyConfig { let stun_servers_is_explicit = network_table .map(|table| table.contains_key("stun_servers")) .unwrap_or(false); + let inline_conntrack_control_is_explicit = conntrack_control_table + .map(|table| table.contains_key("inline_conntrack_control")) + .unwrap_or(false); let mut config: ProxyConfig = parsed_toml .try_into() .map_err(|e| ProxyError::Config(e.to_string()))?; + config + .server + .conntrack_control + .inline_conntrack_control_explicit = inline_conntrack_control_is_explicit; if !update_every_is_explicit && (legacy_secret_is_explicit || legacy_config_is_explicit) { config.general.update_every = None; @@ -1881,6 +1892,35 @@ mod tests { ); } + #[test] + fn conntrack_inline_explicit_flag_is_false_when_omitted() { + let cfg = load_config_from_temp_toml( + r#" + [general] + [network] + [server] + [server.conntrack_control] + [access] + "#, + ); + assert!(!cfg.server.conntrack_control.inline_conntrack_control_explicit); + } + + #[test] + fn conntrack_inline_explicit_flag_is_true_when_present() { + let cfg = load_config_from_temp_toml( + r#" + [general] + [network] + [server] + [server.conntrack_control] + inline_conntrack_control = true + [access] + "#, + ); + assert!(cfg.server.conntrack_control.inline_conntrack_control_explicit); + } + #[test] fn unknown_sni_action_parses_and_defaults_to_drop() { let cfg_default: ProxyConfig = toml::from_str( diff --git a/src/config/types.rs b/src/config/types.rs index 9f7e0f4..82ae9c3 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -1329,6 +1329,10 @@ pub struct ConntrackControlConfig { #[serde(default = "default_conntrack_control_enabled")] pub inline_conntrack_control: bool, + /// Tracks whether inline_conntrack_control was explicitly set in config. + #[serde(skip)] + pub inline_conntrack_control_explicit: bool, + /// Conntrack mode for listener ingress traffic. #[serde(default)] pub mode: ConntrackMode, @@ -1363,6 +1367,7 @@ impl Default for ConntrackControlConfig { fn default() -> Self { Self { inline_conntrack_control: default_conntrack_control_enabled(), + inline_conntrack_control_explicit: false, mode: ConntrackMode::default(), backend: ConntrackBackend::default(), profile: ConntrackPressureProfile::default(), diff --git a/src/conntrack_control.rs b/src/conntrack_control.rs index 12069c3..6f83fa6 100644 --- a/src/conntrack_control.rs +++ b/src/conntrack_control.rs @@ -24,6 +24,13 @@ enum NetfilterBackend { Iptables, } +#[derive(Clone, Copy)] +struct ConntrackRuntimeSupport { + netfilter_backend: Option, + has_cap_net_admin: bool, + has_conntrack_binary: bool, +} + #[derive(Clone, Copy)] struct PressureSample { conn_pct: Option, @@ -56,11 +63,8 @@ pub(crate) fn spawn_conntrack_controller( shared: Arc, ) { if !cfg!(target_os = "linux") { - let enabled = config_rx - .borrow() - .server - .conntrack_control - .inline_conntrack_control; + let cfg = config_rx.borrow(); + let enabled = cfg.server.conntrack_control.inline_conntrack_control; stats.set_conntrack_control_enabled(enabled); stats.set_conntrack_control_available(false); stats.set_conntrack_pressure_active(false); @@ -68,9 +72,9 @@ pub(crate) fn spawn_conntrack_controller( stats.set_conntrack_rule_apply_ok(false); shared.disable_conntrack_close_sender(); shared.set_conntrack_pressure_active(false); - if enabled { + if enabled && cfg.server.conntrack_control.inline_conntrack_control_explicit { warn!( - "conntrack control is configured but unsupported on this OS; disabling runtime worker" + "conntrack control explicitly enabled but unsupported on this OS; disabling runtime worker" ); } return; @@ -92,16 +96,17 @@ async fn run_conntrack_controller( let mut cfg = config_rx.borrow().clone(); let mut pressure_state = PressureState::new(stats.as_ref()); let mut delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec; - let mut backend = pick_backend(cfg.server.conntrack_control.backend); + let mut runtime_support = probe_runtime_support(cfg.server.conntrack_control.backend); + let mut effective_enabled = effective_conntrack_enabled(&cfg, runtime_support); apply_runtime_state( stats.as_ref(), shared.as_ref(), &cfg, - backend.is_some(), + runtime_support, false, ); - reconcile_rules(&cfg, backend, stats.as_ref()).await; + reconcile_rules(&cfg, runtime_support, stats.as_ref()).await; loop { tokio::select! { @@ -110,17 +115,18 @@ async fn run_conntrack_controller( break; } cfg = config_rx.borrow_and_update().clone(); - backend = pick_backend(cfg.server.conntrack_control.backend); + runtime_support = probe_runtime_support(cfg.server.conntrack_control.backend); + effective_enabled = effective_conntrack_enabled(&cfg, runtime_support); delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec; - apply_runtime_state(stats.as_ref(), shared.as_ref(), &cfg, backend.is_some(), pressure_state.active); - reconcile_rules(&cfg, backend, stats.as_ref()).await; + apply_runtime_state(stats.as_ref(), shared.as_ref(), &cfg, runtime_support, pressure_state.active); + reconcile_rules(&cfg, runtime_support, stats.as_ref()).await; } event = close_rx.recv() => { let Some(event) = event else { break; }; stats.set_conntrack_event_queue_depth(close_rx.len() as u64); - if !cfg.server.conntrack_control.inline_conntrack_control { + if !effective_enabled { continue; } if !pressure_state.active { @@ -156,6 +162,7 @@ async fn run_conntrack_controller( stats.as_ref(), shared.as_ref(), &cfg, + effective_enabled, &sample, &mut pressure_state, ); @@ -175,20 +182,24 @@ fn apply_runtime_state( stats: &Stats, shared: &ProxySharedState, cfg: &ProxyConfig, - backend_available: bool, + runtime_support: ConntrackRuntimeSupport, pressure_active: bool, ) { let enabled = cfg.server.conntrack_control.inline_conntrack_control; - let available = enabled && backend_available && has_cap_net_admin(); - if enabled && !available { + let available = effective_conntrack_enabled(cfg, runtime_support); + if enabled && !available && cfg.server.conntrack_control.inline_conntrack_control_explicit { warn!( - "conntrack control enabled but unavailable (missing CAP_NET_ADMIN or backend binaries)" + has_cap_net_admin = runtime_support.has_cap_net_admin, + backend_available = runtime_support.netfilter_backend.is_some(), + conntrack_binary_available = runtime_support.has_conntrack_binary, + configured_backend = ?cfg.server.conntrack_control.backend, + "conntrack control explicitly enabled but unavailable; disabling runtime features" ); } stats.set_conntrack_control_enabled(enabled); stats.set_conntrack_control_available(available); - shared.set_conntrack_pressure_active(enabled && pressure_active); - stats.set_conntrack_pressure_active(enabled && pressure_active); + shared.set_conntrack_pressure_active(available && pressure_active); + stats.set_conntrack_pressure_active(available && pressure_active); } fn collect_pressure_sample( @@ -228,10 +239,11 @@ fn update_pressure_state( stats: &Stats, shared: &ProxySharedState, cfg: &ProxyConfig, + effective_enabled: bool, sample: &PressureSample, state: &mut PressureState, ) { - if !cfg.server.conntrack_control.inline_conntrack_control { + if !effective_enabled { if state.active { state.active = false; state.low_streak = 0; @@ -285,22 +297,22 @@ fn update_pressure_state( state.low_streak = 0; } -async fn reconcile_rules(cfg: &ProxyConfig, backend: Option, stats: &Stats) { +async fn reconcile_rules(cfg: &ProxyConfig, runtime_support: ConntrackRuntimeSupport, stats: &Stats) { if !cfg.server.conntrack_control.inline_conntrack_control { clear_notrack_rules_all_backends().await; stats.set_conntrack_rule_apply_ok(true); return; } - if !has_cap_net_admin() { + if !effective_conntrack_enabled(cfg, runtime_support) { + clear_notrack_rules_all_backends().await; stats.set_conntrack_rule_apply_ok(false); return; } - let Some(backend) = backend else { - stats.set_conntrack_rule_apply_ok(false); - return; - }; + let backend = runtime_support + .netfilter_backend + .expect("netfilter backend must be available for effective conntrack control"); let apply_result = match backend { NetfilterBackend::Nftables => apply_nft_rules(cfg).await, @@ -315,6 +327,21 @@ async fn reconcile_rules(cfg: &ProxyConfig, backend: Option, s } } +fn probe_runtime_support(configured_backend: ConntrackBackend) -> ConntrackRuntimeSupport { + ConntrackRuntimeSupport { + netfilter_backend: pick_backend(configured_backend), + has_cap_net_admin: has_cap_net_admin(), + has_conntrack_binary: command_exists("conntrack"), + } +} + +fn effective_conntrack_enabled(cfg: &ProxyConfig, runtime_support: ConntrackRuntimeSupport) -> bool { + cfg.server.conntrack_control.inline_conntrack_control + && runtime_support.has_cap_net_admin + && runtime_support.netfilter_backend.is_some() + && runtime_support.has_conntrack_binary +} + fn pick_backend(configured: ConntrackBackend) -> Option { match configured { ConntrackBackend::Auto => { @@ -710,7 +737,7 @@ mod tests { me_queue_pressure_delta: 0, }; - update_pressure_state(&stats, shared.as_ref(), &cfg, &sample, &mut state); + update_pressure_state(&stats, shared.as_ref(), &cfg, true, &sample, &mut state); assert!(state.active); assert!(shared.conntrack_pressure_active()); @@ -731,7 +758,7 @@ mod tests { accept_timeout_delta: 0, me_queue_pressure_delta: 0, }; - update_pressure_state(&stats, shared.as_ref(), &cfg, &high_sample, &mut state); + update_pressure_state(&stats, shared.as_ref(), &cfg, true, &high_sample, &mut state); assert!(state.active); let low_sample = PressureSample { @@ -740,11 +767,11 @@ mod tests { accept_timeout_delta: 0, me_queue_pressure_delta: 0, }; - update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state); + update_pressure_state(&stats, shared.as_ref(), &cfg, true, &low_sample, &mut state); assert!(state.active); - update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state); + update_pressure_state(&stats, shared.as_ref(), &cfg, true, &low_sample, &mut state); assert!(state.active); - update_pressure_state(&stats, shared.as_ref(), &cfg, &low_sample, &mut state); + update_pressure_state(&stats, shared.as_ref(), &cfg, true, &low_sample, &mut state); assert!(!state.active); assert!(!shared.conntrack_pressure_active()); @@ -765,7 +792,7 @@ mod tests { me_queue_pressure_delta: 10, }; - update_pressure_state(&stats, shared.as_ref(), &cfg, &sample, &mut state); + update_pressure_state(&stats, shared.as_ref(), &cfg, false, &sample, &mut state); assert!(!state.active); assert!(!shared.conntrack_pressure_active()); From b447f60a7224721777707776698c5891e5f33c36 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 17 Apr 2026 19:08:57 +0300 Subject: [PATCH 4/5] Rustfmt + Bump --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/api/mod.rs | 4 ++-- src/config/load.rs | 12 ++++++++++-- src/conntrack_control.rs | 35 ++++++++++++++++++++++++++++++----- src/healthcheck.rs | 5 ++++- 6 files changed, 48 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 164fccd..9f14499 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2780,7 +2780,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" [[package]] name = "telemt" -version = "3.4.2" +version = "3.4.3" dependencies = [ "aes", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index beac5d5..be2df6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.4.2" +version = "3.4.3" edition = "2024" [features] diff --git a/src/api/mod.rs b/src/api/mod.rs index 8ee46f7..46bdc10 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -41,8 +41,8 @@ use config_store::{current_revision, load_config_from_disk, parse_if_match}; use events::ApiEventStore; use http_utils::{error_response, read_json, read_optional_json, success_response}; use model::{ - ApiFailure, CreateUserRequest, DeleteUserResponse, HealthData, HealthReadyData, PatchUserRequest, - RotateSecretRequest, SummaryData, UserActiveIps, + ApiFailure, CreateUserRequest, DeleteUserResponse, HealthData, HealthReadyData, + PatchUserRequest, RotateSecretRequest, SummaryData, UserActiveIps, }; use runtime_edge::{ EdgeConnectionsCacheEntry, build_runtime_connections_summary_data, diff --git a/src/config/load.rs b/src/config/load.rs index c2b5e78..f80982f 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -1903,7 +1903,11 @@ mod tests { [access] "#, ); - assert!(!cfg.server.conntrack_control.inline_conntrack_control_explicit); + assert!( + !cfg.server + .conntrack_control + .inline_conntrack_control_explicit + ); } #[test] @@ -1918,7 +1922,11 @@ mod tests { [access] "#, ); - assert!(cfg.server.conntrack_control.inline_conntrack_control_explicit); + assert!( + cfg.server + .conntrack_control + .inline_conntrack_control_explicit + ); } #[test] diff --git a/src/conntrack_control.rs b/src/conntrack_control.rs index 6f83fa6..33a9174 100644 --- a/src/conntrack_control.rs +++ b/src/conntrack_control.rs @@ -72,7 +72,12 @@ pub(crate) fn spawn_conntrack_controller( stats.set_conntrack_rule_apply_ok(false); shared.disable_conntrack_close_sender(); shared.set_conntrack_pressure_active(false); - if enabled && cfg.server.conntrack_control.inline_conntrack_control_explicit { + if enabled + && cfg + .server + .conntrack_control + .inline_conntrack_control_explicit + { warn!( "conntrack control explicitly enabled but unsupported on this OS; disabling runtime worker" ); @@ -187,7 +192,13 @@ fn apply_runtime_state( ) { let enabled = cfg.server.conntrack_control.inline_conntrack_control; let available = effective_conntrack_enabled(cfg, runtime_support); - if enabled && !available && cfg.server.conntrack_control.inline_conntrack_control_explicit { + if enabled + && !available + && cfg + .server + .conntrack_control + .inline_conntrack_control_explicit + { warn!( has_cap_net_admin = runtime_support.has_cap_net_admin, backend_available = runtime_support.netfilter_backend.is_some(), @@ -297,7 +308,11 @@ fn update_pressure_state( state.low_streak = 0; } -async fn reconcile_rules(cfg: &ProxyConfig, runtime_support: ConntrackRuntimeSupport, stats: &Stats) { +async fn reconcile_rules( + cfg: &ProxyConfig, + runtime_support: ConntrackRuntimeSupport, + stats: &Stats, +) { if !cfg.server.conntrack_control.inline_conntrack_control { clear_notrack_rules_all_backends().await; stats.set_conntrack_rule_apply_ok(true); @@ -335,7 +350,10 @@ fn probe_runtime_support(configured_backend: ConntrackBackend) -> ConntrackRunti } } -fn effective_conntrack_enabled(cfg: &ProxyConfig, runtime_support: ConntrackRuntimeSupport) -> bool { +fn effective_conntrack_enabled( + cfg: &ProxyConfig, + runtime_support: ConntrackRuntimeSupport, +) -> bool { cfg.server.conntrack_control.inline_conntrack_control && runtime_support.has_cap_net_admin && runtime_support.netfilter_backend.is_some() @@ -758,7 +776,14 @@ mod tests { accept_timeout_delta: 0, me_queue_pressure_delta: 0, }; - update_pressure_state(&stats, shared.as_ref(), &cfg, true, &high_sample, &mut state); + update_pressure_state( + &stats, + shared.as_ref(), + &cfg, + true, + &high_sample, + &mut state, + ); assert!(state.active); let low_sample = PressureSample { diff --git a/src/healthcheck.rs b/src/healthcheck.rs index be9a381..c9fa610 100644 --- a/src/healthcheck.rs +++ b/src/healthcheck.rs @@ -111,7 +111,10 @@ fn probe_target(listen: SocketAddr) -> SocketAddr { } fn build_request(target: SocketAddr, path: &str, auth_header: &str) -> String { - let mut request = format!("GET {path} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n", target); + let mut request = format!( + "GET {path} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n", + target + ); if !auth_header.is_empty() { request.push_str("Authorization: "); request.push_str(auth_header); From bde30eaf05377c4234a9df1dbeca2b6b585508d2 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 17 Apr 2026 19:20:06 +0300 Subject: [PATCH 5/5] Update emulator.rs --- src/tls_front/emulator.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/tls_front/emulator.rs b/src/tls_front/emulator.rs index a7f2eb3..af51ca0 100644 --- a/src/tls_front/emulator.rs +++ b/src/tls_front/emulator.rs @@ -253,7 +253,18 @@ pub fn build_emulated_server_hello( } // --- ApplicationData (fake encrypted records) --- - let mut sizes = jitter_and_clamp_sizes(&emulated_app_data_sizes(cached), rng); + let mut sizes = { + let base_sizes = emulated_app_data_sizes(cached); + match cached.behavior_profile.source { + TlsProfileSource::Raw | TlsProfileSource::Merged => base_sizes + .into_iter() + .map(|size| size.clamp(MIN_APP_DATA, MAX_APP_DATA)) + .collect(), + TlsProfileSource::Default | TlsProfileSource::Rustls => { + jitter_and_clamp_sizes(&base_sizes, rng) + } + } + }; let compact_payload = cached .cert_info .as_ref()