diff --git a/docs/Config_params/CONFIG_PARAMS.en.md b/docs/Config_params/CONFIG_PARAMS.en.md index b4627eb..a36182c 100644 --- a/docs/Config_params/CONFIG_PARAMS.en.md +++ b/docs/Config_params/CONFIG_PARAMS.en.md @@ -2268,37 +2268,40 @@ Note: This section also accepts the legacy alias `[server.admin_api]` (same sche | Key | Type | Default | | --- | ---- | ------- | -| [`tls_domain`](#tls_domain) | `String` | `"petrovich.ru"` | -| [`tls_domains`](#tls_domains) | `String[]` | `[]` | -| [`unknown_sni_action`](#unknown_sni_action) | `"drop"`, `"mask"`, `"accept"` | `"drop"` | -| [`tls_fetch_scope`](#tls_fetch_scope) | `String` | `""` | -| [`tls_fetch`](#tls_fetch) | `Table` | built-in defaults | -| [`mask`](#mask) | `bool` | `true` | -| [`mask_host`](#mask_host) | `String` | — | -| [`mask_port`](#mask_port) | `u16` | `443` | -| [`mask_unix_sock`](#mask_unix_sock) | `String` | — | -| [`fake_cert_len`](#fake_cert_len) | `usize` | `2048` | -| [`tls_emulation`](#tls_emulation) | `bool` | `true` | -| [`tls_front_dir`](#tls_front_dir) | `String` | `"tlsfront"` | -| [`server_hello_delay_min_ms`](#server_hello_delay_min_ms) | `u64` | `0` | -| [`server_hello_delay_max_ms`](#server_hello_delay_max_ms) | `u64` | `0` | -| [`tls_new_session_tickets`](#tls_new_session_tickets) | `u8` | `0` | -| [`tls_full_cert_ttl_secs`](#tls_full_cert_ttl_secs) | `u64` | `90` | -| [`alpn_enforce`](#alpn_enforce) | `bool` | `true` | -| [`mask_proxy_protocol`](#mask_proxy_protocol) | `u8` | `0` | -| [`mask_shape_hardening`](#mask_shape_hardening) | `bool` | `true` | -| [`mask_shape_hardening_aggressive_mode`](#mask_shape_hardening_aggressive_mode) | `bool` | `false` | -| [`mask_shape_bucket_floor_bytes`](#mask_shape_bucket_floor_bytes) | `usize` | `512` | -| [`mask_shape_bucket_cap_bytes`](#mask_shape_bucket_cap_bytes) | `usize` | `4096` | -| [`mask_shape_above_cap_blur`](#mask_shape_above_cap_blur) | `bool` | `false` | -| [`mask_shape_above_cap_blur_max_bytes`](#mask_shape_above_cap_blur_max_bytes) | `usize` | `512` | -| [`mask_relay_max_bytes`](#mask_relay_max_bytes) | `usize` | `5242880` | -| [`mask_classifier_prefetch_timeout_ms`](#mask_classifier_prefetch_timeout_ms) | `u64` | `5` | -| [`mask_timing_normalization_enabled`](#mask_timing_normalization_enabled) | `bool` | `false` | -| [`mask_timing_normalization_floor_ms`](#mask_timing_normalization_floor_ms) | `u64` | `0` | -| [`mask_timing_normalization_ceiling_ms`](#mask_timing_normalization_ceiling_ms) | `u64` | `0` | +| [`tls_domain`](#cfg-censorship-tls_domain) | `String` | `"petrovich.ru"` | +| [`tls_domains`](#cfg-censorship-tls_domains) | `String[]` | `[]` | +| [`unknown_sni_action`](#cfg-censorship-unknown_sni_action) | `"drop"`, `"mask"`, `"accept"` | `"drop"` | +| [`tls_fetch_scope`](#cfg-censorship-tls_fetch_scope) | `String` | `""` | +| [`tls_fetch`](#cfg-censorship-tls_fetch) | `Table` | built-in defaults | +| [`mask`](#cfg-censorship-mask) | `bool` | `true` | +| [`mask_host`](#cfg-censorship-mask_host) | `String` | — | +| [`mask_port`](#cfg-censorship-mask_port) | `u16` | `443` | +| [`mask_unix_sock`](#cfg-censorship-mask_unix_sock) | `String` | — | +| [`fake_cert_len`](#cfg-censorship-fake_cert_len) | `usize` | `2048` | +| [`tls_emulation`](#cfg-censorship-tls_emulation) | `bool` | `true` | +| [`tls_front_dir`](#cfg-censorship-tls_front_dir) | `String` | `"tlsfront"` | +| [`server_hello_delay_min_ms`](#cfg-censorship-server_hello_delay_min_ms) | `u64` | `0` | +| [`server_hello_delay_max_ms`](#cfg-censorship-server_hello_delay_max_ms) | `u64` | `0` | +| [`tls_new_session_tickets`](#cfg-censorship-tls_new_session_tickets) | `u8` | `0` | +| [`tls_full_cert_ttl_secs`](#cfg-censorship-tls_full_cert_ttl_secs) | `u64` | `90` | +| [`alpn_enforce`](#cfg-censorship-alpn_enforce) | `bool` | `true` | +| [`mask_proxy_protocol`](#cfg-censorship-mask_proxy_protocol) | `u8` | `0` | +| [`mask_shape_hardening`](#cfg-censorship-mask_shape_hardening) | `bool` | `true` | +| [`mask_shape_hardening_aggressive_mode`](#cfg-censorship-mask_shape_hardening_aggressive_mode) | `bool` | `false` | +| [`mask_shape_bucket_floor_bytes`](#cfg-censorship-mask_shape_bucket_floor_bytes) | `usize` | `512` | +| [`mask_shape_bucket_cap_bytes`](#cfg-censorship-mask_shape_bucket_cap_bytes) | `usize` | `4096` | +| [`mask_shape_above_cap_blur`](#cfg-censorship-mask_shape_above_cap_blur) | `bool` | `false` | +| [`mask_shape_above_cap_blur_max_bytes`](#cfg-censorship-mask_shape_above_cap_blur_max_bytes) | `usize` | `512` | +| [`mask_relay_max_bytes`](#cfg-censorship-mask_relay_max_bytes) | `usize` | `5242880` | +| [`mask_relay_timeout_ms`](#cfg-censorship-mask_relay_timeout_ms) | `u64` | `60_000` | +| [`mask_relay_idle_timeout_ms`](#cfg-censorship-mask_relay_idle_timeout_ms) | `u64` | `5_000` | +| [`mask_classifier_prefetch_timeout_ms`](#cfg-censorship-mask_classifier_prefetch_timeout_ms) | `u64` | `5` | +| [`mask_timing_normalization_enabled`](#cfg-censorship-mask_timing_normalization_enabled) | `bool` | `false` | +| [`mask_timing_normalization_floor_ms`](#cfg-censorship-mask_timing_normalization_floor_ms) | `u64` | `0` | +| [`mask_timing_normalization_ceiling_ms`](#cfg-censorship-mask_timing_normalization_ceiling_ms) | `u64` | `0` | -## tls_domain +## "cfg-censorship-tls_domain" +- `tls_domain` - **Constraints / validation**: Must be a non-empty domain name. Must not contain spaces or `/`. - **Description**: Primary domain used for Fake-TLS masking / fronting profile and as the default SNI domain presented to clients. This value becomes part of generated `ee` links, and changing it invalidates previously generated links. @@ -2539,7 +2542,28 @@ Note: This section also accepts the legacy alias `[server.admin_api]` (same sche [censorship] mask_relay_max_bytes = 5242880 ``` -## mask_classifier_prefetch_timeout_ms +## "cfg-censorship-mask_relay_timeout_ms" +- `mask_relay_timeout_ms` + - **Constraints / validation**: Should be `>= mask_relay_idle_timeout_ms`. + - **Description**: Wall-clock cap for the full masking relay on non-MTProto fallback paths. Raise when the mask target is a long-lived service (e.g. WebSocket). Default: 60 000 ms (1 minute). + - **Example**: + + ```toml + [censorship] + mask_relay_timeout_ms = 60000 + ``` +## "cfg-censorship-mask_relay_idle_timeout_ms" +- `mask_relay_idle_timeout_ms` + - **Constraints / validation**: Should be `<= mask_relay_timeout_ms`. + - **Description**: Per-read idle timeout on masking relay and drain paths. Limits resource consumption by slow-loris attacks and port scanners. A read call stalling beyond this value is treated as an abandoned connection. Default: 5 000 ms (5 s). + - **Example**: + + ```toml + [censorship] + mask_relay_idle_timeout_ms = 5000 + ``` +## "cfg-censorship-mask_classifier_prefetch_timeout_ms" +- `mask_classifier_prefetch_timeout_ms` - **Constraints / validation**: Must be within `[5, 50]` (milliseconds). - **Description**: Timeout budget (ms) for extending fragmented initial classifier window on masking fallback. - **Example**: diff --git a/src/api/mod.rs b/src/api/mod.rs index e60a375..850fb0e 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,6 +1,6 @@ #![allow(clippy::too_many_arguments)] -use std::convert::Infallible; +use std::io::{Error as IoError, ErrorKind}; use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; @@ -16,7 +16,7 @@ use tokio::net::TcpListener; use tokio::sync::{Mutex, RwLock, watch}; use tracing::{debug, info, warn}; -use crate::config::ProxyConfig; +use crate::config::{ApiGrayAction, ProxyConfig}; use crate::ip_tracker::UserIpTracker; use crate::proxy::route_mode::RouteRuntimeController; use crate::startup::StartupTracker; @@ -184,7 +184,9 @@ pub async fn serve( .serve_connection(hyper_util::rt::TokioIo::new(stream), svc) .await { - debug!(error = %error, "API connection error"); + if !error.is_user() { + debug!(error = %error, "API connection error"); + } } }); } @@ -195,7 +197,7 @@ async fn handle( peer: SocketAddr, shared: Arc, config_rx: watch::Receiver>, -) -> Result>, Infallible> { +) -> Result>, IoError> { let request_id = shared.next_request_id(); let cfg = config_rx.borrow().clone(); let api_cfg = &cfg.server.api; @@ -213,14 +215,25 @@ async fn handle( if !api_cfg.whitelist.is_empty() && !api_cfg.whitelist.iter().any(|net| net.contains(peer.ip())) { - return Ok(error_response( - request_id, - ApiFailure::new( - StatusCode::FORBIDDEN, - "forbidden", - "Source IP is not allowed", - ), - )); + return match api_cfg.gray_action { + ApiGrayAction::Api => Ok(error_response( + request_id, + ApiFailure::new( + StatusCode::FORBIDDEN, + "forbidden", + "Source IP is not allowed", + ), + )), + ApiGrayAction::Ok200 => Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "text/html; charset=utf-8") + .body(Full::new(Bytes::new())) + .unwrap()), + ApiGrayAction::Drop => Err(IoError::new( + ErrorKind::ConnectionAborted, + "api request dropped by gray_action=drop", + )), + }; } if !api_cfg.auth_header.is_empty() { @@ -244,11 +257,16 @@ async fn handle( let method = req.method().clone(); let path = req.uri().path().to_string(); + let normalized_path = if path.len() > 1 { + path.trim_end_matches('/') + } else { + path.as_str() + }; let query = req.uri().query().map(str::to_string); let body_limit = api_cfg.request_body_limit_bytes; let result: Result>, ApiFailure> = async { - match (method.as_str(), path.as_str()) { + match (method.as_str(), normalized_path) { ("GET", "/v1/health") => { let revision = current_revision(&shared.config_path).await?; let data = HealthData { @@ -431,7 +449,7 @@ async fn handle( Ok(success_response(status, data, revision)) } _ => { - if let Some(user) = path.strip_prefix("/v1/users/") + if let Some(user) = normalized_path.strip_prefix("/v1/users/") && !user.is_empty() && !user.contains('/') { @@ -600,6 +618,12 @@ async fn handle( ), )); } + debug!( + method = method.as_str(), + path = %path, + normalized_path = %normalized_path, + "API route not found" + ); Ok(error_response( request_id, ApiFailure::new(StatusCode::NOT_FOUND, "not_found", "Route not found"), diff --git a/src/api/users.rs b/src/api/users.rs index 5a09714..6b20b85 100644 --- a/src/api/users.rs +++ b/src/api/users.rs @@ -452,7 +452,11 @@ fn build_user_links( startup_detected_ip_v6: Option, ) -> UserLinks { let hosts = resolve_link_hosts(cfg, startup_detected_ip_v4, startup_detected_ip_v6); - let port = cfg.general.links.public_port.unwrap_or(cfg.server.port); + let port = cfg + .general + .links + .public_port + .unwrap_or(resolve_default_link_port(cfg)); let tls_domains = resolve_tls_domains(cfg); let mut classic = Vec::new(); @@ -490,6 +494,14 @@ fn build_user_links( } } +fn resolve_default_link_port(cfg: &ProxyConfig) -> u16 { + cfg.server + .listeners + .first() + .and_then(|listener| listener.port) + .unwrap_or(cfg.server.port) +} + fn resolve_link_hosts( cfg: &ProxyConfig, startup_detected_ip_v4: Option, diff --git a/src/cli.rs b/src/cli.rs index 5a79bae..47a10d5 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -598,16 +598,17 @@ secure = false tls = true [server] -port = {port} listen_addr_ipv4 = "0.0.0.0" listen_addr_ipv6 = "::" [[server.listeners]] ip = "0.0.0.0" +port = {port} # reuse_allow = false # Set true only when intentionally running multiple telemt instances on same port [[server.listeners]] ip = "::" +port = {port} [timeouts] client_first_byte_idle_secs = 300 diff --git a/src/config/defaults.rs b/src/config/defaults.rs index beedd10..8eebe6c 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -615,6 +615,26 @@ pub(crate) fn default_mask_relay_max_bytes() -> usize { 32 * 1024 } +#[cfg(not(test))] +pub(crate) fn default_mask_relay_timeout_ms() -> u64 { + 60_000 +} + +#[cfg(test)] +pub(crate) fn default_mask_relay_timeout_ms() -> u64 { + 200 +} + +#[cfg(not(test))] +pub(crate) fn default_mask_relay_idle_timeout_ms() -> u64 { + 5_000 +} + +#[cfg(test)] +pub(crate) fn default_mask_relay_idle_timeout_ms() -> u64 { + 100 +} + pub(crate) fn default_mask_classifier_prefetch_timeout_ms() -> u64 { 5 } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 5582e9b..f42638c 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -17,8 +17,9 @@ //! | `network` | `dns_overrides` | Applied immediately | //! | `access` | All user/quota fields | Effective immediately | //! -//! Fields that require re-binding sockets (`server.port`, `censorship.*`, -//! `network.*`, `use_middle_proxy`) are **not** applied; a warning is emitted. +//! Fields that require re-binding sockets (`server.listeners`, legacy +//! `server.port`, `censorship.*`, `network.*`, `use_middle_proxy`) are **not** +//! applied; a warning is emitted. //! Non-hot changes are never mixed into the runtime config snapshot. use std::collections::BTreeSet; @@ -299,6 +300,7 @@ fn listeners_equal( } lhs.iter().zip(rhs.iter()).all(|(a, b)| { a.ip == b.ip + && a.port == b.port && a.announce == b.announce && a.announce_ip == b.announce_ip && a.proxy_protocol == b.proxy_protocol @@ -306,6 +308,14 @@ fn listeners_equal( }) } +fn resolve_default_link_port(cfg: &ProxyConfig) -> u16 { + cfg.server + .listeners + .first() + .and_then(|listener| listener.port) + .unwrap_or(cfg.server.port) +} + #[derive(Debug, Clone, Default, PartialEq, Eq)] struct WatchManifest { files: BTreeSet, @@ -560,6 +570,7 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b if old.server.api.enabled != new.server.api.enabled || old.server.api.listen != new.server.api.listen || old.server.api.whitelist != new.server.api.whitelist + || old.server.api.gray_action != new.server.api.gray_action || old.server.api.auth_header != new.server.api.auth_header || old.server.api.request_body_limit_bytes != new.server.api.request_body_limit_bytes || old.server.api.minimal_runtime_enabled != new.server.api.minimal_runtime_enabled @@ -611,6 +622,8 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b || old.censorship.mask_shape_above_cap_blur_max_bytes != new.censorship.mask_shape_above_cap_blur_max_bytes || old.censorship.mask_relay_max_bytes != new.censorship.mask_relay_max_bytes + || old.censorship.mask_relay_timeout_ms != new.censorship.mask_relay_timeout_ms + || old.censorship.mask_relay_idle_timeout_ms != new.censorship.mask_relay_idle_timeout_ms || old.censorship.mask_classifier_prefetch_timeout_ms != new.censorship.mask_classifier_prefetch_timeout_ms || old.censorship.mask_timing_normalization_enabled @@ -1117,7 +1130,7 @@ fn log_changes( .general .links .public_port - .unwrap_or(new_cfg.server.port); + .unwrap_or(resolve_default_link_port(new_cfg)); for user in &added { if let Some(secret) = new_hot.users.get(*user) { print_user_links(user, secret, &host, port, new_cfg); diff --git a/src/config/load.rs b/src/config/load.rs index f9e230c..e5c8202 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -253,6 +253,12 @@ fn validate_upstreams(config: &ProxyConfig) -> Result<()> { } for upstream in &config.upstreams { + if matches!(upstream.ipv4, Some(false)) && matches!(upstream.ipv6, Some(false)) { + return Err(ProxyError::Config( + "upstream.ipv4 and upstream.ipv6 cannot both be false".to_string(), + )); + } + if let UpstreamType::Shadowsocks { url, .. } = &upstream.upstream_type { let parsed = ShadowsocksServerConfig::from_url(url) .map_err(|error| ProxyError::Config(format!("invalid shadowsocks url: {error}")))?; @@ -340,12 +346,29 @@ impl ProxyConfig { let update_every_is_explicit = general_table .map(|table| table.contains_key("update_every")) .unwrap_or(false); + let beobachten_is_explicit = general_table + .map(|table| table.contains_key("beobachten")) + .unwrap_or(false); + let beobachten_minutes_is_explicit = general_table + .map(|table| table.contains_key("beobachten_minutes")) + .unwrap_or(false); + let beobachten_flush_secs_is_explicit = general_table + .map(|table| table.contains_key("beobachten_flush_secs")) + .unwrap_or(false); + let beobachten_file_is_explicit = general_table + .map(|table| table.contains_key("beobachten_file")) + .unwrap_or(false); let legacy_secret_is_explicit = general_table .map(|table| table.contains_key("proxy_secret_auto_reload_secs")) .unwrap_or(false); let legacy_config_is_explicit = general_table .map(|table| table.contains_key("proxy_config_auto_reload_secs")) .unwrap_or(false); + let legacy_top_level_beobachten = parsed_toml.get("beobachten").cloned(); + let legacy_top_level_beobachten_minutes = parsed_toml.get("beobachten_minutes").cloned(); + let legacy_top_level_beobachten_flush_secs = + parsed_toml.get("beobachten_flush_secs").cloned(); + let legacy_top_level_beobachten_file = parsed_toml.get("beobachten_file").cloned(); let stun_servers_is_explicit = network_table .map(|table| table.contains_key("stun_servers")) .unwrap_or(false); @@ -358,6 +381,59 @@ impl ProxyConfig { config.general.update_every = None; } + // Backward compatibility: legacy top-level beobachten* keys. + // Prefer `[general].*` when both are present. + let mut legacy_beobachten_applied = false; + if !beobachten_is_explicit && let Some(value) = legacy_top_level_beobachten.as_ref() { + let parsed = value.as_bool().ok_or_else(|| { + ProxyError::Config("beobachten (top-level) must be a boolean".to_string()) + })?; + config.general.beobachten = parsed; + legacy_beobachten_applied = true; + } + if !beobachten_minutes_is_explicit + && let Some(value) = legacy_top_level_beobachten_minutes.as_ref() + { + let raw = value.as_integer().ok_or_else(|| { + ProxyError::Config("beobachten_minutes (top-level) must be an integer".to_string()) + })?; + let parsed = u64::try_from(raw).map_err(|_| { + ProxyError::Config( + "beobachten_minutes (top-level) must be within u64 range".to_string(), + ) + })?; + config.general.beobachten_minutes = parsed; + legacy_beobachten_applied = true; + } + if !beobachten_flush_secs_is_explicit + && let Some(value) = legacy_top_level_beobachten_flush_secs.as_ref() + { + let raw = value.as_integer().ok_or_else(|| { + ProxyError::Config( + "beobachten_flush_secs (top-level) must be an integer".to_string(), + ) + })?; + let parsed = u64::try_from(raw).map_err(|_| { + ProxyError::Config( + "beobachten_flush_secs (top-level) must be within u64 range".to_string(), + ) + })?; + config.general.beobachten_flush_secs = parsed; + legacy_beobachten_applied = true; + } + if !beobachten_file_is_explicit + && let Some(value) = legacy_top_level_beobachten_file.as_ref() + { + let parsed = value.as_str().ok_or_else(|| { + ProxyError::Config("beobachten_file (top-level) must be a string".to_string()) + })?; + config.general.beobachten_file = parsed.to_string(); + legacy_beobachten_applied = true; + } + if legacy_beobachten_applied { + warn!("top-level beobachten* keys are deprecated; use general.beobachten* instead"); + } + let legacy_nat_stun = config.general.middle_proxy_nat_stun.take(); let legacy_nat_stun_servers = std::mem::take(&mut config.general.middle_proxy_nat_stun_servers); @@ -1250,6 +1326,7 @@ impl ProxyConfig { if let Ok(ipv4) = ipv4_str.parse::() { config.server.listeners.push(ListenerConfig { ip: ipv4, + port: Some(config.server.port), announce: None, announce_ip: None, proxy_protocol: None, @@ -1261,6 +1338,7 @@ impl ProxyConfig { { config.server.listeners.push(ListenerConfig { ip: ipv6, + port: Some(config.server.port), announce: None, announce_ip: None, proxy_protocol: None, @@ -1269,6 +1347,13 @@ impl ProxyConfig { } } + // Migration: listeners[].port fallback to legacy server.port. + for listener in &mut config.server.listeners { + if listener.port.is_none() { + listener.port = Some(config.server.port); + } + } + // Migration: announce_ip → announce for each listener. for listener in &mut config.server.listeners { if listener.announce.is_none() @@ -1289,11 +1374,14 @@ impl ProxyConfig { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }); } @@ -1385,6 +1473,21 @@ mod tests { const TEST_SHADOWSOCKS_URL: &str = "ss://2022-blake3-aes-256-gcm:MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5MDE=@127.0.0.1:8388"; + fn load_config_from_temp_toml(toml: &str) -> ProxyConfig { + let nonce = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + let dir = std::env::temp_dir().join(format!("telemt_load_cfg_{nonce}")); + std::fs::create_dir_all(&dir).unwrap(); + let path = dir.join("config.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + let _ = std::fs::remove_file(path); + let _ = std::fs::remove_dir(dir); + cfg + } + #[test] fn serde_defaults_remain_unchanged_for_present_sections() { let toml = r#" @@ -1481,6 +1584,7 @@ mod tests { cfg.general.rpc_proxy_req_every, default_rpc_proxy_req_every() ); + assert_eq!(cfg.general.beobachten_file, default_beobachten_file()); assert_eq!(cfg.general.update_every, default_update_every()); assert_eq!(cfg.server.listen_addr_ipv4, default_listen_addr_ipv4()); assert_eq!(cfg.server.listen_addr_ipv6, default_listen_addr_ipv6_opt()); @@ -1491,6 +1595,7 @@ mod tests { assert_eq!(cfg.censorship.unknown_sni_action, UnknownSniAction::Drop); assert_eq!(cfg.server.api.listen, default_api_listen()); assert_eq!(cfg.server.api.whitelist, default_api_whitelist()); + assert_eq!(cfg.server.api.gray_action, ApiGrayAction::Drop); assert_eq!( cfg.server.api.request_body_limit_bytes, default_api_request_body_limit_bytes() @@ -1647,6 +1752,7 @@ mod tests { default_upstream_connect_failfast_hard_errors() ); assert_eq!(general.rpc_proxy_req_every, default_rpc_proxy_req_every()); + assert_eq!(general.beobachten_file, default_beobachten_file()); assert_eq!(general.update_every, default_update_every()); let server = ServerConfig::default(); @@ -1661,6 +1767,7 @@ mod tests { ); assert_eq!(server.api.listen, default_api_listen()); assert_eq!(server.api.whitelist, default_api_whitelist()); + assert_eq!(server.api.gray_action, ApiGrayAction::Drop); assert_eq!( server.api.request_body_limit_bytes, default_api_request_body_limit_bytes() @@ -1808,6 +1915,107 @@ mod tests { ); } + #[test] + fn api_gray_action_parses_and_defaults_to_drop() { + let cfg_default: ProxyConfig = toml::from_str( + r#" + [server] + [general] + [network] + [access] + "#, + ) + .unwrap(); + assert_eq!(cfg_default.server.api.gray_action, ApiGrayAction::Drop); + + let cfg_api: ProxyConfig = toml::from_str( + r#" + [server] + [general] + [network] + [access] + [server.api] + gray_action = "api" + "#, + ) + .unwrap(); + assert_eq!(cfg_api.server.api.gray_action, ApiGrayAction::Api); + + let cfg_200: ProxyConfig = toml::from_str( + r#" + [server] + [general] + [network] + [access] + [server.api] + gray_action = "200" + "#, + ) + .unwrap(); + assert_eq!(cfg_200.server.api.gray_action, ApiGrayAction::Ok200); + + let cfg_drop: ProxyConfig = toml::from_str( + r#" + [server] + [general] + [network] + [access] + [server.api] + gray_action = "drop" + "#, + ) + .unwrap(); + assert_eq!(cfg_drop.server.api.gray_action, ApiGrayAction::Drop); + } + + #[test] + fn top_level_beobachten_keys_migrate_to_general_when_general_not_explicit() { + let cfg = load_config_from_temp_toml( + r#" + beobachten = false + beobachten_minutes = 7 + beobachten_flush_secs = 3 + beobachten_file = "tmp/legacy-beob.txt" + + [server] + [general] + [network] + [access] + "#, + ); + + assert!(!cfg.general.beobachten); + assert_eq!(cfg.general.beobachten_minutes, 7); + assert_eq!(cfg.general.beobachten_flush_secs, 3); + assert_eq!(cfg.general.beobachten_file, "tmp/legacy-beob.txt"); + } + + #[test] + fn general_beobachten_keys_have_priority_over_legacy_top_level() { + let cfg = load_config_from_temp_toml( + r#" + beobachten = true + beobachten_minutes = 30 + beobachten_flush_secs = 30 + beobachten_file = "tmp/legacy-beob.txt" + + [server] + [general] + beobachten = false + beobachten_minutes = 5 + beobachten_flush_secs = 2 + beobachten_file = "tmp/general-beob.txt" + [network] + [access] + "#, + ); + + assert!(!cfg.general.beobachten); + assert_eq!(cfg.general.beobachten_minutes, 5); + assert_eq!(cfg.general.beobachten_flush_secs, 2); + assert_eq!(cfg.general.beobachten_file, "tmp/general-beob.txt"); + } + #[test] fn dc_overrides_allow_string_and_array() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index 98c22a6..35b8d46 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -1153,7 +1153,8 @@ pub struct LinksConfig { #[serde(default)] pub public_host: Option, - /// Public port for tg:// link generation (overrides server.port). + /// Public port for tg:// link generation. + /// Overrides listener ports and legacy `server.port`. #[serde(default)] pub public_port: Option, } @@ -1183,6 +1184,13 @@ pub struct ApiConfig { #[serde(default = "default_api_whitelist")] pub whitelist: Vec, + /// Behavior for requests from source IPs outside `whitelist`. + /// - `api`: return structured API forbidden response. + /// - `200`: return `200 OK` with an empty body. + /// - `drop`: close the connection without HTTP response. + #[serde(default)] + pub gray_action: ApiGrayAction, + /// Optional static value for `Authorization` header validation. /// Empty string disables header auth. #[serde(default)] @@ -1227,6 +1235,7 @@ impl Default for ApiConfig { enabled: default_true(), listen: default_api_listen(), whitelist: default_api_whitelist(), + gray_action: ApiGrayAction::default(), auth_header: String::new(), request_body_limit_bytes: default_api_request_body_limit_bytes(), minimal_runtime_enabled: default_api_minimal_runtime_enabled(), @@ -1240,6 +1249,19 @@ impl Default for ApiConfig { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum ApiGrayAction { + /// Preserve current API behavior for denied source IPs. + Api, + /// Mimic a plain web endpoint by returning `200 OK` with an empty body. + #[serde(rename = "200")] + Ok200, + /// Drop connection without HTTP response for denied source IPs. + #[default] + Drop, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(rename_all = "lowercase")] pub enum ConntrackMode { @@ -1354,6 +1376,8 @@ impl Default for ConntrackControlConfig { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ServerConfig { + /// Legacy listener port used for backward compatibility. + /// For new configs prefer `[[server.listeners]].port`. #[serde(default = "default_port")] pub port: u16, @@ -1710,6 +1734,19 @@ pub struct AntiCensorshipConfig { #[serde(default = "default_mask_relay_max_bytes")] pub mask_relay_max_bytes: usize, + /// Wall-clock cap for the full masking relay on non-MTProto fallback paths. + /// Raise when the mask target is a long-lived service (e.g. WebSocket). + /// Default: 60 000 ms (60 s). + #[serde(default = "default_mask_relay_timeout_ms")] + pub mask_relay_timeout_ms: u64, + + /// Per-read idle timeout on masking relay and drain paths. + /// Limits resource consumption by slow-loris attacks and port scanners. + /// A read call stalling beyond this is treated as an abandoned connection. + /// Default: 5 000 ms (5 s). + #[serde(default = "default_mask_relay_idle_timeout_ms")] + pub mask_relay_idle_timeout_ms: u64, + /// Prefetch timeout (ms) for extending fragmented masking classifier window. #[serde(default = "default_mask_classifier_prefetch_timeout_ms")] pub mask_classifier_prefetch_timeout_ms: u64, @@ -1755,6 +1792,8 @@ impl Default for AntiCensorshipConfig { mask_shape_above_cap_blur: default_mask_shape_above_cap_blur(), mask_shape_above_cap_blur_max_bytes: default_mask_shape_above_cap_blur_max_bytes(), mask_relay_max_bytes: default_mask_relay_max_bytes(), + mask_relay_timeout_ms: default_mask_relay_timeout_ms(), + mask_relay_idle_timeout_ms: default_mask_relay_idle_timeout_ms(), mask_classifier_prefetch_timeout_ms: default_mask_classifier_prefetch_timeout_ms(), mask_timing_normalization_enabled: default_mask_timing_normalization_enabled(), mask_timing_normalization_floor_ms: default_mask_timing_normalization_floor_ms(), @@ -1841,6 +1880,10 @@ pub enum UpstreamType { interface: Option, #[serde(default)] bind_addresses: Option>, + /// Linux-only hard interface pinning via `SO_BINDTODEVICE`. + /// Optional alias: `force_bind`. + #[serde(default, alias = "force_bind")] + bindtodevice: Option, }, Socks4 { address: String, @@ -1877,11 +1920,22 @@ pub struct UpstreamConfig { pub scopes: String, #[serde(skip)] pub selected_scope: String, + /// Allow IPv4 DC targets for this upstream. + /// `None` means auto-detect from runtime connectivity state. + #[serde(default)] + pub ipv4: Option, + /// Allow IPv6 DC targets for this upstream. + /// `None` means auto-detect from runtime connectivity state. + #[serde(default)] + pub ipv6: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ListenerConfig { pub ip: IpAddr, + /// Per-listener TCP port. If omitted, falls back to legacy `server.port`. + #[serde(default)] + pub port: Option, /// IP address or hostname to announce in proxy links. /// Takes precedence over `announce_ip` if both are set. #[serde(default)] diff --git a/src/conntrack_control.rs b/src/conntrack_control.rs index 306697e..12069c3 100644 --- a/src/conntrack_control.rs +++ b/src/conntrack_control.rs @@ -343,15 +343,28 @@ fn command_exists(binary: &str) -> bool { }) } -fn notrack_targets(cfg: &ProxyConfig) -> (Vec>, Vec>) { +fn listener_port_set(cfg: &ProxyConfig) -> Vec { + let mut ports: BTreeSet = BTreeSet::new(); + if cfg.server.listeners.is_empty() { + ports.insert(cfg.server.port); + } else { + for listener in &cfg.server.listeners { + ports.insert(listener.port.unwrap_or(cfg.server.port)); + } + } + ports.into_iter().collect() +} + +fn notrack_targets(cfg: &ProxyConfig) -> (Vec<(Option, u16)>, Vec<(Option, u16)>) { let mode = cfg.server.conntrack_control.mode; - let mut v4_targets: BTreeSet> = BTreeSet::new(); - let mut v6_targets: BTreeSet> = BTreeSet::new(); + let mut v4_targets: BTreeSet<(Option, u16)> = BTreeSet::new(); + let mut v6_targets: BTreeSet<(Option, u16)> = BTreeSet::new(); match mode { ConntrackMode::Tracked => {} ConntrackMode::Notrack => { if cfg.server.listeners.is_empty() { + let port = cfg.server.port; if let Some(ipv4) = cfg .server .listen_addr_ipv4 @@ -359,9 +372,9 @@ fn notrack_targets(cfg: &ProxyConfig) -> (Vec>, Vec().ok()) { if ipv4.is_unspecified() { - v4_targets.insert(None); + v4_targets.insert((None, port)); } else { - v4_targets.insert(Some(ipv4)); + v4_targets.insert((Some(ipv4), port)); } } if let Some(ipv6) = cfg @@ -371,33 +384,39 @@ fn notrack_targets(cfg: &ProxyConfig) -> (Vec>, Vec().ok()) { if ipv6.is_unspecified() { - v6_targets.insert(None); + v6_targets.insert((None, port)); } else { - v6_targets.insert(Some(ipv6)); + v6_targets.insert((Some(ipv6), port)); } } } else { for listener in &cfg.server.listeners { + let port = listener.port.unwrap_or(cfg.server.port); if listener.ip.is_ipv4() { if listener.ip.is_unspecified() { - v4_targets.insert(None); + v4_targets.insert((None, port)); } else { - v4_targets.insert(Some(listener.ip)); + v4_targets.insert((Some(listener.ip), port)); } } else if listener.ip.is_unspecified() { - v6_targets.insert(None); + v6_targets.insert((None, port)); } else { - v6_targets.insert(Some(listener.ip)); + v6_targets.insert((Some(listener.ip), port)); } } } } ConntrackMode::Hybrid => { + let ports = listener_port_set(cfg); for ip in &cfg.server.conntrack_control.hybrid_listener_ips { if ip.is_ipv4() { - v4_targets.insert(Some(*ip)); + for port in &ports { + v4_targets.insert((Some(*ip), *port)); + } } else { - v6_targets.insert(Some(*ip)); + for port in &ports { + v6_targets.insert((Some(*ip), *port)); + } } } } @@ -422,19 +441,19 @@ async fn apply_nft_rules(cfg: &ProxyConfig) -> Result<(), String> { let (v4_targets, v6_targets) = notrack_targets(cfg); let mut rules = Vec::new(); - for ip in v4_targets { + for (ip, port) in v4_targets { let rule = if let Some(ip) = ip { - format!("tcp dport {} ip daddr {} notrack", cfg.server.port, ip) + format!("tcp dport {} ip daddr {} notrack", port, ip) } else { - format!("tcp dport {} notrack", cfg.server.port) + format!("tcp dport {} notrack", port) }; rules.push(rule); } - for ip in v6_targets { + for (ip, port) in v6_targets { let rule = if let Some(ip) = ip { - format!("tcp dport {} ip6 daddr {} notrack", cfg.server.port, ip) + format!("tcp dport {} ip6 daddr {} notrack", port, ip) } else { - format!("tcp dport {} notrack", cfg.server.port) + format!("tcp dport {} notrack", port) }; rules.push(rule); } @@ -498,7 +517,7 @@ async fn apply_iptables_rules_for_binary( let (v4_targets, v6_targets) = notrack_targets(cfg); let selected = if ipv4 { v4_targets } else { v6_targets }; - for ip in selected { + for (ip, port) in selected { let mut args = vec![ "-t".to_string(), "raw".to_string(), @@ -507,7 +526,7 @@ async fn apply_iptables_rules_for_binary( "-p".to_string(), "tcp".to_string(), "--dport".to_string(), - cfg.server.port.to_string(), + port.to_string(), ]; if let Some(ip) = ip { args.push("-d".to_string()); diff --git a/src/maestro/listeners.rs b/src/maestro/listeners.rs index f032d77..796eb9e 100644 --- a/src/maestro/listeners.rs +++ b/src/maestro/listeners.rs @@ -31,6 +31,19 @@ pub(crate) struct BoundListeners { pub(crate) has_unix_listener: bool, } +fn listener_port_or_legacy(listener: &crate::config::ListenerConfig, config: &ProxyConfig) -> u16 { + listener.port.unwrap_or(config.server.port) +} + +fn default_link_port(config: &ProxyConfig) -> u16 { + config + .server + .listeners + .first() + .and_then(|listener| listener.port) + .unwrap_or(config.server.port) +} + #[allow(clippy::too_many_arguments)] pub(crate) async fn bind_listeners( config: &Arc, @@ -63,7 +76,8 @@ pub(crate) async fn bind_listeners( let mut listeners = Vec::new(); for listener_conf in &config.server.listeners { - let addr = SocketAddr::new(listener_conf.ip, config.server.port); + let listener_port = listener_port_or_legacy(listener_conf, config); + let addr = SocketAddr::new(listener_conf.ip, listener_port); if addr.is_ipv4() && !decision_ipv4_dc { warn!(%addr, "Skipping IPv4 listener: IPv4 disabled by [network]"); continue; @@ -106,11 +120,7 @@ pub(crate) async fn bind_listeners( if config.general.links.public_host.is_none() && !config.general.links.show.is_empty() { - let link_port = config - .general - .links - .public_port - .unwrap_or(config.server.port); + let link_port = config.general.links.public_port.unwrap_or(listener_port); print_proxy_links(&public_host, link_port, config); } @@ -158,7 +168,7 @@ pub(crate) async fn bind_listeners( .general .links .public_port - .unwrap_or(config.server.port), + .unwrap_or(default_link_port(config)), ) } else { let ip = detected_ip_v4.or(detected_ip_v6).map(|ip| ip.to_string()); @@ -173,7 +183,7 @@ pub(crate) async fn bind_listeners( .general .links .public_port - .unwrap_or(config.server.port), + .unwrap_or(default_link_port(config)), ) }; diff --git a/src/maestro/mod.rs b/src/maestro/mod.rs index 00b3b2d..f141331 100644 --- a/src/maestro/mod.rs +++ b/src/maestro/mod.rs @@ -81,23 +81,11 @@ pub async fn run() -> std::result::Result<(), Box> { } } -#[cfg(unix)] -async fn run_inner( - daemon_opts: DaemonOptions, +// Shared maestro startup and main loop. `drop_after_bind` runs on Unix after listeners are bound +// (for privilege drop); it is a no-op on other platforms. +async fn run_telemt_core( + drop_after_bind: impl FnOnce(), ) -> std::result::Result<(), Box> { - // Acquire PID file if daemonizing or if explicitly requested - // Keep it alive until shutdown (underscore prefix = intentionally kept for RAII cleanup) - let _pid_file = if daemon_opts.daemonize || daemon_opts.pid_file.is_some() { - let mut pf = PidFile::new(daemon_opts.pid_file_path()); - if let Err(e) = pf.acquire() { - eprintln!("[telemt] {}", e); - std::process::exit(1); - } - Some(pf) - } else { - None - }; - let process_started_at = Instant::now(); let process_started_at_epoch_secs = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -761,17 +749,8 @@ async fn run_inner( std::process::exit(1); } - // Drop privileges after binding sockets (which may require root for port < 1024) - if daemon_opts.user.is_some() || daemon_opts.group.is_some() { - if let Err(e) = drop_privileges( - daemon_opts.user.as_deref(), - daemon_opts.group.as_deref(), - _pid_file.as_ref(), - ) { - error!(error = %e, "Failed to drop privileges"); - std::process::exit(1); - } - } + // On Unix, caller supplies privilege drop after bind (may require root for port < 1024). + drop_after_bind(); runtime_tasks::apply_runtime_log_filter( has_rust_log, @@ -819,3 +798,39 @@ async fn run_inner( Ok(()) } + +#[cfg(unix)] +async fn run_inner( + daemon_opts: DaemonOptions, +) -> std::result::Result<(), Box> { + // Acquire PID file if daemonizing or if explicitly requested + // Keep it alive until shutdown (underscore prefix = intentionally kept for RAII cleanup) + let _pid_file = if daemon_opts.daemonize || daemon_opts.pid_file.is_some() { + let mut pf = PidFile::new(daemon_opts.pid_file_path()); + if let Err(e) = pf.acquire() { + eprintln!("[telemt] {}", e); + std::process::exit(1); + } + Some(pf) + } else { + None + }; + + let user = daemon_opts.user.clone(); + let group = daemon_opts.group.clone(); + + run_telemt_core(|| { + if user.is_some() || group.is_some() { + if let Err(e) = drop_privileges(user.as_deref(), group.as_deref(), _pid_file.as_ref()) { + error!(error = %e, "Failed to drop privileges"); + std::process::exit(1); + } + } + }) + .await +} + +#[cfg(not(unix))] +async fn run_inner() -> std::result::Result<(), Box> { + run_telemt_core(|| {}).await +} diff --git a/src/network/probe.rs b/src/network/probe.rs index 1787b92..90484b3 100644 --- a/src/network/probe.rs +++ b/src/network/probe.rs @@ -97,6 +97,7 @@ pub async fn run_probe( let UpstreamType::Direct { interface, bind_addresses, + .. } = &upstream.upstream_type else { continue; diff --git a/src/proxy/masking.rs b/src/proxy/masking.rs index 70e72a0..d49e4c3 100644 --- a/src/proxy/masking.rs +++ b/src/proxy/masking.rs @@ -28,14 +28,10 @@ use tracing::debug; const MASK_TIMEOUT: Duration = Duration::from_secs(5); #[cfg(test)] const MASK_TIMEOUT: Duration = Duration::from_millis(50); -/// Maximum duration for the entire masking relay. -/// Limits resource consumption from slow-loris attacks and port scanners. -#[cfg(not(test))] -const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60); +/// Maximum duration for the entire masking relay under test (replaced by config at runtime). #[cfg(test)] const MASK_RELAY_TIMEOUT: Duration = Duration::from_millis(200); -#[cfg(not(test))] -const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_secs(5); +/// Per-read idle timeout for masking relay and drain paths under test (replaced by config at runtime). #[cfg(test)] const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_millis(100); const MASK_BUFFER_SIZE: usize = 8192; @@ -55,6 +51,7 @@ async fn copy_with_idle_timeout( writer: &mut W, byte_cap: usize, shutdown_on_eof: bool, + idle_timeout: Duration, ) -> CopyOutcome where R: AsyncRead + Unpin, @@ -78,7 +75,7 @@ where } let read_len = remaining_budget.min(MASK_BUFFER_SIZE); - let read_res = timeout(MASK_RELAY_IDLE_TIMEOUT, reader.read(&mut buf[..read_len])).await; + let read_res = timeout(idle_timeout, reader.read(&mut buf[..read_len])).await; let n = match read_res { Ok(Ok(n)) => n, Ok(Err(_)) | Err(_) => break, @@ -86,13 +83,13 @@ where if n == 0 { ended_by_eof = true; if shutdown_on_eof { - let _ = timeout(MASK_RELAY_IDLE_TIMEOUT, writer.shutdown()).await; + let _ = timeout(idle_timeout, writer.shutdown()).await; } break; } total = total.saturating_add(n); - let write_res = timeout(MASK_RELAY_IDLE_TIMEOUT, writer.write_all(&buf[..n])).await; + let write_res = timeout(idle_timeout, writer.write_all(&buf[..n])).await; match write_res { Ok(Ok(())) => {} Ok(Err(_)) | Err(_) => break, @@ -230,13 +227,20 @@ where } } -async fn consume_client_data_with_timeout_and_cap(reader: R, byte_cap: usize) -where +async fn consume_client_data_with_timeout_and_cap( + reader: R, + byte_cap: usize, + relay_timeout: Duration, + idle_timeout: Duration, +) where R: AsyncRead + Unpin, { - if timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader, byte_cap)) - .await - .is_err() + if timeout( + relay_timeout, + consume_client_data(reader, byte_cap, idle_timeout), + ) + .await + .is_err() { debug!("Timed out while consuming client data on masking fallback path"); } @@ -639,10 +643,18 @@ pub async fn handle_bad_client( beobachten.record(client_type, peer.ip(), ttl); } + let relay_timeout = Duration::from_millis(config.censorship.mask_relay_timeout_ms); + let idle_timeout = Duration::from_millis(config.censorship.mask_relay_idle_timeout_ms); + if !config.censorship.mask { // Masking disabled, just consume data - consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes) - .await; + consume_client_data_with_timeout_and_cap( + reader, + config.censorship.mask_relay_max_bytes, + relay_timeout, + idle_timeout, + ) + .await; return; } @@ -674,7 +686,7 @@ pub async fn handle_bad_client( return; } if timeout( - MASK_RELAY_TIMEOUT, + relay_timeout, relay_to_mask( reader, writer, @@ -688,6 +700,7 @@ pub async fn handle_bad_client( config.censorship.mask_shape_above_cap_blur_max_bytes, config.censorship.mask_shape_hardening_aggressive_mode, config.censorship.mask_relay_max_bytes, + idle_timeout, ), ) .await @@ -703,6 +716,8 @@ pub async fn handle_bad_client( consume_client_data_with_timeout_and_cap( reader, config.censorship.mask_relay_max_bytes, + relay_timeout, + idle_timeout, ) .await; wait_mask_outcome_budget(outcome_started, config).await; @@ -712,6 +727,8 @@ pub async fn handle_bad_client( consume_client_data_with_timeout_and_cap( reader, config.censorship.mask_relay_max_bytes, + relay_timeout, + idle_timeout, ) .await; wait_mask_outcome_budget(outcome_started, config).await; @@ -742,8 +759,13 @@ pub async fn handle_bad_client( local = %local_addr, "Mask target resolves to local listener; refusing self-referential masking fallback" ); - consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes) - .await; + consume_client_data_with_timeout_and_cap( + reader, + config.censorship.mask_relay_max_bytes, + relay_timeout, + idle_timeout, + ) + .await; wait_mask_outcome_budget(outcome_started, config).await; return; } @@ -777,7 +799,7 @@ pub async fn handle_bad_client( return; } if timeout( - MASK_RELAY_TIMEOUT, + relay_timeout, relay_to_mask( reader, writer, @@ -791,6 +813,7 @@ pub async fn handle_bad_client( config.censorship.mask_shape_above_cap_blur_max_bytes, config.censorship.mask_shape_hardening_aggressive_mode, config.censorship.mask_relay_max_bytes, + idle_timeout, ), ) .await @@ -806,6 +829,8 @@ pub async fn handle_bad_client( consume_client_data_with_timeout_and_cap( reader, config.censorship.mask_relay_max_bytes, + relay_timeout, + idle_timeout, ) .await; wait_mask_outcome_budget(outcome_started, config).await; @@ -815,6 +840,8 @@ pub async fn handle_bad_client( consume_client_data_with_timeout_and_cap( reader, config.censorship.mask_relay_max_bytes, + relay_timeout, + idle_timeout, ) .await; wait_mask_outcome_budget(outcome_started, config).await; @@ -836,6 +863,7 @@ async fn relay_to_mask( shape_above_cap_blur_max_bytes: usize, shape_hardening_aggressive_mode: bool, mask_relay_max_bytes: usize, + idle_timeout: Duration, ) where R: AsyncRead + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static, @@ -857,11 +885,19 @@ async fn relay_to_mask( &mut mask_write, mask_relay_max_bytes, !shape_hardening_enabled, + idle_timeout, ) .await }, async { - copy_with_idle_timeout(&mut mask_read, &mut writer, mask_relay_max_bytes, true).await + copy_with_idle_timeout( + &mut mask_read, + &mut writer, + mask_relay_max_bytes, + true, + idle_timeout, + ) + .await } ); @@ -889,7 +925,11 @@ async fn relay_to_mask( } /// Just consume all data from client without responding. -async fn consume_client_data(mut reader: R, byte_cap: usize) { +async fn consume_client_data( + mut reader: R, + byte_cap: usize, + idle_timeout: Duration, +) { if byte_cap == 0 { return; } @@ -905,7 +945,7 @@ async fn consume_client_data(mut reader: R, byte_cap: usiz } let read_len = remaining_budget.min(MASK_BUFFER_SIZE); - let n = match timeout(MASK_RELAY_IDLE_TIMEOUT, reader.read(&mut buf[..read_len])).await { + let n = match timeout(idle_timeout, reader.read(&mut buf[..read_len])).await { Ok(Ok(n)) => n, Ok(Err(_)) | Err(_) => break, }; diff --git a/src/proxy/tests/client_masking_blackhat_campaign_tests.rs b/src/proxy/tests/client_masking_blackhat_campaign_tests.rs index 917e799..c48caa0 100644 --- a/src/proxy/tests/client_masking_blackhat_campaign_tests.rs +++ b/src/proxy/tests/client_masking_blackhat_campaign_tests.rs @@ -31,11 +31,14 @@ fn new_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_budget_security_tests.rs b/src/proxy/tests/client_masking_budget_security_tests.rs index 332451c..11a72a0 100644 --- a/src/proxy/tests/client_masking_budget_security_tests.rs +++ b/src/proxy/tests/client_masking_budget_security_tests.rs @@ -27,11 +27,14 @@ fn build_harness(config: ProxyConfig) -> PipelineHarness { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_diagnostics_security_tests.rs b/src/proxy/tests/client_masking_diagnostics_security_tests.rs index 67b797b..a55bb79 100644 --- a/src/proxy/tests/client_masking_diagnostics_security_tests.rs +++ b/src/proxy/tests/client_masking_diagnostics_security_tests.rs @@ -11,11 +11,14 @@ fn new_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_fragmented_classifier_security_tests.rs b/src/proxy/tests/client_masking_fragmented_classifier_security_tests.rs index 8fa2689..5817f24 100644 --- a/src/proxy/tests/client_masking_fragmented_classifier_security_tests.rs +++ b/src/proxy/tests/client_masking_fragmented_classifier_security_tests.rs @@ -11,11 +11,14 @@ fn new_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_hard_adversarial_tests.rs b/src/proxy/tests/client_masking_hard_adversarial_tests.rs index c6b0e98..709ff49 100644 --- a/src/proxy/tests/client_masking_hard_adversarial_tests.rs +++ b/src/proxy/tests/client_masking_hard_adversarial_tests.rs @@ -25,11 +25,14 @@ fn new_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_http2_fragmented_preface_security_tests.rs b/src/proxy/tests/client_masking_http2_fragmented_preface_security_tests.rs index b5a8b4d..49c9aa6 100644 --- a/src/proxy/tests/client_masking_http2_fragmented_preface_security_tests.rs +++ b/src/proxy/tests/client_masking_http2_fragmented_preface_security_tests.rs @@ -11,11 +11,14 @@ fn new_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_prefetch_config_pipeline_integration_security_tests.rs b/src/proxy/tests/client_masking_prefetch_config_pipeline_integration_security_tests.rs index b3fd5cb..6ebaa5a 100644 --- a/src/proxy/tests/client_masking_prefetch_config_pipeline_integration_security_tests.rs +++ b/src/proxy/tests/client_masking_prefetch_config_pipeline_integration_security_tests.rs @@ -11,11 +11,14 @@ fn new_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_prefetch_invariant_security_tests.rs b/src/proxy/tests/client_masking_prefetch_invariant_security_tests.rs index b57ad51..9491e3f 100644 --- a/src/proxy/tests/client_masking_prefetch_invariant_security_tests.rs +++ b/src/proxy/tests/client_masking_prefetch_invariant_security_tests.rs @@ -38,11 +38,14 @@ fn build_harness(secret_hex: &str, mask_port: u16) -> PipelineHarness { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_probe_evasion_blackhat_tests.rs b/src/proxy/tests/client_masking_probe_evasion_blackhat_tests.rs index 9ab5f78..62a2ef8 100644 --- a/src/proxy/tests/client_masking_probe_evasion_blackhat_tests.rs +++ b/src/proxy/tests/client_masking_probe_evasion_blackhat_tests.rs @@ -16,11 +16,14 @@ fn make_test_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_redteam_expected_fail_tests.rs b/src/proxy/tests/client_masking_redteam_expected_fail_tests.rs index 2b6f600..09ec626 100644 --- a/src/proxy/tests/client_masking_redteam_expected_fail_tests.rs +++ b/src/proxy/tests/client_masking_redteam_expected_fail_tests.rs @@ -39,11 +39,14 @@ fn build_harness(secret_hex: &str, mask_port: u16) -> RedTeamHarness { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -229,11 +232,14 @@ async fn redteam_03_masking_duration_must_be_less_than_1ms_when_backend_down() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -470,11 +476,14 @@ async fn measure_invalid_probe_duration_ms(delay_ms: u64, tls_len: u16, body_sen upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -544,11 +553,14 @@ async fn capture_forwarded_probe_len(tls_len: u16, body_sent: usize) -> usize { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_replay_timing_security_tests.rs b/src/proxy/tests/client_masking_replay_timing_security_tests.rs index 97ed52a..788ce80 100644 --- a/src/proxy/tests/client_masking_replay_timing_security_tests.rs +++ b/src/proxy/tests/client_masking_replay_timing_security_tests.rs @@ -13,11 +13,14 @@ fn new_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_shape_classifier_fuzz_redteam_expected_fail_tests.rs b/src/proxy/tests/client_masking_shape_classifier_fuzz_redteam_expected_fail_tests.rs index c4dd4db..ed1ac8d 100644 --- a/src/proxy/tests/client_masking_shape_classifier_fuzz_redteam_expected_fail_tests.rs +++ b/src/proxy/tests/client_masking_shape_classifier_fuzz_redteam_expected_fail_tests.rs @@ -11,11 +11,14 @@ fn new_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_shape_hardening_adversarial_tests.rs b/src/proxy/tests/client_masking_shape_hardening_adversarial_tests.rs index 2cf98c4..45ce014 100644 --- a/src/proxy/tests/client_masking_shape_hardening_adversarial_tests.rs +++ b/src/proxy/tests/client_masking_shape_hardening_adversarial_tests.rs @@ -11,11 +11,14 @@ fn new_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_shape_hardening_redteam_expected_fail_tests.rs b/src/proxy/tests/client_masking_shape_hardening_redteam_expected_fail_tests.rs index b0bf73e..f160b01 100644 --- a/src/proxy/tests/client_masking_shape_hardening_redteam_expected_fail_tests.rs +++ b/src/proxy/tests/client_masking_shape_hardening_redteam_expected_fail_tests.rs @@ -11,11 +11,14 @@ fn new_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_shape_hardening_security_tests.rs b/src/proxy/tests/client_masking_shape_hardening_security_tests.rs index 7d2380b..9948e60 100644 --- a/src/proxy/tests/client_masking_shape_hardening_security_tests.rs +++ b/src/proxy/tests/client_masking_shape_hardening_security_tests.rs @@ -11,11 +11,14 @@ fn new_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_stress_adversarial_tests.rs b/src/proxy/tests/client_masking_stress_adversarial_tests.rs index 1c8b599..575bfb5 100644 --- a/src/proxy/tests/client_masking_stress_adversarial_tests.rs +++ b/src/proxy/tests/client_masking_stress_adversarial_tests.rs @@ -25,11 +25,14 @@ fn new_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_security_tests.rs b/src/proxy/tests/client_security_tests.rs index d585326..85af766 100644 --- a/src/proxy/tests/client_security_tests.rs +++ b/src/proxy/tests/client_security_tests.rs @@ -332,11 +332,14 @@ async fn relay_task_abort_releases_user_gate_and_ip_reservation() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -446,11 +449,14 @@ async fn relay_cutover_releases_user_gate_and_ip_reservation() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -570,11 +576,14 @@ async fn integration_route_cutover_and_quota_overlap_fails_closed_and_releases_s upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -740,11 +749,14 @@ async fn proxy_protocol_header_is_rejected_when_trust_list_is_empty() { upstream_type: crate::config::UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -817,11 +829,14 @@ async fn proxy_protocol_header_from_untrusted_peer_range_is_rejected_under_load( upstream_type: crate::config::UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -977,11 +992,14 @@ async fn short_tls_probe_is_masked_through_client_pipeline() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1065,11 +1083,14 @@ async fn tls12_record_probe_is_masked_through_client_pipeline() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1151,11 +1172,14 @@ async fn handle_client_stream_increments_connects_all_exactly_once() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1244,11 +1268,14 @@ async fn running_client_handler_increments_connects_all_exactly_once() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1334,11 +1361,14 @@ async fn idle_pooled_connection_closes_cleanly_in_generic_stream_path() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1405,11 +1435,14 @@ async fn idle_pooled_connection_closes_cleanly_in_client_handler_path() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1491,11 +1524,14 @@ async fn partial_tls_header_stall_triggers_handshake_timeout() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1816,11 +1852,14 @@ async fn valid_tls_path_does_not_fall_back_to_mask_backend() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1925,11 +1964,14 @@ async fn valid_tls_with_invalid_mtproto_falls_back_to_mask_backend() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -2032,11 +2074,14 @@ async fn client_handler_tls_bad_mtproto_is_forwarded_to_mask_backend() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -2154,11 +2199,14 @@ async fn alpn_mismatch_tls_probe_is_masked_through_client_pipeline() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -2247,11 +2295,14 @@ async fn invalid_hmac_tls_probe_is_masked_through_client_pipeline() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -2346,11 +2397,14 @@ async fn burst_invalid_tls_probes_are_masked_verbatim() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -3251,11 +3305,14 @@ async fn relay_connect_error_releases_user_and_ip_before_return() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -3812,11 +3869,14 @@ async fn untrusted_proxy_header_source_is_rejected() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -3882,11 +3942,14 @@ async fn empty_proxy_trusted_cidrs_rejects_proxy_header_by_default() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -3979,11 +4042,14 @@ async fn oversized_tls_record_is_masked_in_generic_stream_pipeline() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -4082,11 +4148,14 @@ async fn oversized_tls_record_is_masked_in_client_handler_pipeline() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -4199,11 +4268,14 @@ async fn tls_record_len_min_minus_1_is_rejected_in_generic_stream_pipeline() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -4302,11 +4374,14 @@ async fn tls_record_len_min_minus_1_is_rejected_in_client_handler_pipeline() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -4408,11 +4483,14 @@ async fn tls_record_len_16384_is_accepted_in_generic_stream_pipeline() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -4509,11 +4587,14 @@ async fn tls_record_len_16384_is_accepted_in_client_handler_pipeline() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_timing_profile_adversarial_tests.rs b/src/proxy/tests/client_timing_profile_adversarial_tests.rs index d8df19f..bc452a8 100644 --- a/src/proxy/tests/client_timing_profile_adversarial_tests.rs +++ b/src/proxy/tests/client_timing_profile_adversarial_tests.rs @@ -24,11 +24,14 @@ fn make_test_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_tls_clienthello_size_security_tests.rs b/src/proxy/tests/client_tls_clienthello_size_security_tests.rs index 14c24b7..a779c92 100644 --- a/src/proxy/tests/client_tls_clienthello_size_security_tests.rs +++ b/src/proxy/tests/client_tls_clienthello_size_security_tests.rs @@ -26,11 +26,14 @@ fn make_test_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_tls_clienthello_truncation_adversarial_tests.rs b/src/proxy/tests/client_tls_clienthello_truncation_adversarial_tests.rs index c757999..aa0b925 100644 --- a/src/proxy/tests/client_tls_clienthello_truncation_adversarial_tests.rs +++ b/src/proxy/tests/client_tls_clienthello_truncation_adversarial_tests.rs @@ -27,11 +27,14 @@ fn make_test_upstream_manager(stats: Arc) -> Arc { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_tls_mtproto_fallback_security_tests.rs b/src/proxy/tests/client_tls_mtproto_fallback_security_tests.rs index a4d5df8..edea451 100644 --- a/src/proxy/tests/client_tls_mtproto_fallback_security_tests.rs +++ b/src/proxy/tests/client_tls_mtproto_fallback_security_tests.rs @@ -41,11 +41,14 @@ fn build_harness(secret_hex: &str, mask_port: u16) -> PipelineHarness { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/direct_relay_security_tests.rs b/src/proxy/tests/direct_relay_security_tests.rs index e139923..193ff7b 100644 --- a/src/proxy/tests/direct_relay_security_tests.rs +++ b/src/proxy/tests/direct_relay_security_tests.rs @@ -1293,11 +1293,14 @@ async fn direct_relay_abort_midflight_releases_route_gauge() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1400,11 +1403,14 @@ async fn direct_relay_cutover_midflight_releases_route_gauge() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1522,11 +1528,14 @@ async fn direct_relay_cutover_storm_multi_session_keeps_generic_errors_and_relea upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1758,8 +1767,11 @@ async fn negative_direct_relay_dc_connection_refused_fails_fast() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 100, @@ -1849,8 +1861,11 @@ async fn adversarial_direct_relay_cutover_integrity() { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 100, diff --git a/src/proxy/tests/masking_additional_hardening_security_tests.rs b/src/proxy/tests/masking_additional_hardening_security_tests.rs index a6f6386..1b8ca2e 100644 --- a/src/proxy/tests/masking_additional_hardening_security_tests.rs +++ b/src/proxy/tests/masking_additional_hardening_security_tests.rs @@ -47,7 +47,7 @@ async fn consume_client_data_stops_after_byte_cap_without_eof() { }; let cap = 10_000usize; - consume_client_data(reader, cap).await; + consume_client_data(reader, cap, MASK_RELAY_IDLE_TIMEOUT).await; let total = produced.load(Ordering::Relaxed); assert!( diff --git a/src/proxy/tests/masking_consume_idle_timeout_security_tests.rs b/src/proxy/tests/masking_consume_idle_timeout_security_tests.rs index f2c39a2..fcd2e79 100644 --- a/src/proxy/tests/masking_consume_idle_timeout_security_tests.rs +++ b/src/proxy/tests/masking_consume_idle_timeout_security_tests.rs @@ -31,7 +31,7 @@ async fn stalling_client_terminates_at_idle_not_relay_timeout() { let result = tokio::time::timeout( MASK_RELAY_TIMEOUT, - consume_client_data(reader, MASK_BUFFER_SIZE * 4), + consume_client_data(reader, MASK_BUFFER_SIZE * 4, MASK_RELAY_IDLE_TIMEOUT), ) .await; @@ -57,9 +57,12 @@ async fn fast_reader_drains_to_eof() { let data = vec![0xAAu8; 32 * 1024]; let reader = std::io::Cursor::new(data); - tokio::time::timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader, usize::MAX)) - .await - .expect("consume_client_data did not complete for fast EOF reader"); + tokio::time::timeout( + MASK_RELAY_TIMEOUT, + consume_client_data(reader, usize::MAX, MASK_RELAY_IDLE_TIMEOUT), + ) + .await + .expect("consume_client_data did not complete for fast EOF reader"); } #[tokio::test] @@ -81,7 +84,7 @@ async fn io_error_terminates_cleanly() { tokio::time::timeout( MASK_RELAY_TIMEOUT, - consume_client_data(ErrReader, usize::MAX), + consume_client_data(ErrReader, usize::MAX, MASK_RELAY_IDLE_TIMEOUT), ) .await .expect("consume_client_data did not return on I/O error"); diff --git a/src/proxy/tests/masking_consume_stress_adversarial_tests.rs b/src/proxy/tests/masking_consume_stress_adversarial_tests.rs index 12287b5..7579a9c 100644 --- a/src/proxy/tests/masking_consume_stress_adversarial_tests.rs +++ b/src/proxy/tests/masking_consume_stress_adversarial_tests.rs @@ -34,7 +34,11 @@ async fn consume_stall_stress_finishes_within_idle_budget() { set.spawn(async { tokio::time::timeout( MASK_RELAY_TIMEOUT, - consume_client_data(OneByteThenStall { sent: false }, usize::MAX), + consume_client_data( + OneByteThenStall { sent: false }, + usize::MAX, + MASK_RELAY_IDLE_TIMEOUT, + ), ) .await .expect("consume_client_data exceeded relay timeout under stall load"); @@ -56,7 +60,7 @@ async fn consume_stall_stress_finishes_within_idle_budget() { #[tokio::test] async fn consume_zero_cap_returns_immediately() { let started = Instant::now(); - consume_client_data(tokio::io::empty(), 0).await; + consume_client_data(tokio::io::empty(), 0, MASK_RELAY_IDLE_TIMEOUT).await; assert!( started.elapsed() < MASK_RELAY_IDLE_TIMEOUT, "zero byte cap must return immediately" diff --git a/src/proxy/tests/masking_production_cap_regression_security_tests.rs b/src/proxy/tests/masking_production_cap_regression_security_tests.rs index 9ff51ba..c5d542e 100644 --- a/src/proxy/tests/masking_production_cap_regression_security_tests.rs +++ b/src/proxy/tests/masking_production_cap_regression_security_tests.rs @@ -127,7 +127,14 @@ async fn positive_copy_with_production_cap_stops_exactly_at_budget() { let mut reader = FinitePatternReader::new(PROD_CAP_BYTES + (256 * 1024), 4096, read_calls); let mut writer = CountingWriter::default(); - let outcome = copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await; + let outcome = copy_with_idle_timeout( + &mut reader, + &mut writer, + PROD_CAP_BYTES, + true, + MASK_RELAY_IDLE_TIMEOUT, + ) + .await; assert_eq!( outcome.total, PROD_CAP_BYTES, @@ -145,7 +152,13 @@ async fn negative_consume_with_zero_cap_performs_no_reads() { let read_calls = Arc::new(AtomicUsize::new(0)); let reader = FinitePatternReader::new(1024, 64, Arc::clone(&read_calls)); - consume_client_data_with_timeout_and_cap(reader, 0).await; + consume_client_data_with_timeout_and_cap( + reader, + 0, + MASK_RELAY_TIMEOUT, + MASK_RELAY_IDLE_TIMEOUT, + ) + .await; assert_eq!( read_calls.load(Ordering::Relaxed), @@ -161,7 +174,14 @@ async fn edge_copy_below_cap_reports_eof_without_overread() { let mut reader = FinitePatternReader::new(payload, 3072, read_calls); let mut writer = CountingWriter::default(); - let outcome = copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await; + let outcome = copy_with_idle_timeout( + &mut reader, + &mut writer, + PROD_CAP_BYTES, + true, + MASK_RELAY_IDLE_TIMEOUT, + ) + .await; assert_eq!(outcome.total, payload); assert_eq!(writer.written, payload); @@ -175,7 +195,13 @@ async fn edge_copy_below_cap_reports_eof_without_overread() { async fn adversarial_blackhat_never_ready_reader_is_bounded_by_timeout_guards() { let started = Instant::now(); - consume_client_data_with_timeout_and_cap(NeverReadyReader, PROD_CAP_BYTES).await; + consume_client_data_with_timeout_and_cap( + NeverReadyReader, + PROD_CAP_BYTES, + MASK_RELAY_TIMEOUT, + MASK_RELAY_IDLE_TIMEOUT, + ) + .await; assert!( started.elapsed() < Duration::from_millis(350), @@ -190,7 +216,12 @@ async fn integration_consume_path_honors_production_cap_for_large_payload() { let bounded = timeout( Duration::from_millis(350), - consume_client_data_with_timeout_and_cap(reader, PROD_CAP_BYTES), + consume_client_data_with_timeout_and_cap( + reader, + PROD_CAP_BYTES, + MASK_RELAY_TIMEOUT, + MASK_RELAY_IDLE_TIMEOUT, + ), ) .await; @@ -206,7 +237,13 @@ async fn adversarial_consume_path_never_reads_beyond_declared_byte_cap() { let total_read = Arc::new(AtomicUsize::new(0)); let reader = BudgetProbeReader::new(256 * 1024, Arc::clone(&total_read)); - consume_client_data_with_timeout_and_cap(reader, byte_cap).await; + consume_client_data_with_timeout_and_cap( + reader, + byte_cap, + MASK_RELAY_TIMEOUT, + MASK_RELAY_IDLE_TIMEOUT, + ) + .await; assert!( total_read.load(Ordering::Relaxed) <= byte_cap, @@ -231,7 +268,9 @@ async fn light_fuzz_cap_and_payload_matrix_preserves_min_budget_invariant() { let mut reader = FinitePatternReader::new(payload, chunk, read_calls); let mut writer = CountingWriter::default(); - let outcome = copy_with_idle_timeout(&mut reader, &mut writer, cap, true).await; + let outcome = + copy_with_idle_timeout(&mut reader, &mut writer, cap, true, MASK_RELAY_IDLE_TIMEOUT) + .await; let expected = payload.min(cap); assert_eq!( @@ -261,7 +300,14 @@ async fn stress_parallel_copy_tasks_with_production_cap_complete_without_leaks() read_calls, ); let mut writer = CountingWriter::default(); - copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await + copy_with_idle_timeout( + &mut reader, + &mut writer, + PROD_CAP_BYTES, + true, + MASK_RELAY_IDLE_TIMEOUT, + ) + .await })); } diff --git a/src/proxy/tests/masking_relay_guardrails_security_tests.rs b/src/proxy/tests/masking_relay_guardrails_security_tests.rs index 257c0f8..3613c91 100644 --- a/src/proxy/tests/masking_relay_guardrails_security_tests.rs +++ b/src/proxy/tests/masking_relay_guardrails_security_tests.rs @@ -26,6 +26,7 @@ async fn relay_to_mask_enforces_masking_session_byte_cap() { 0, false, 32 * 1024, + MASK_RELAY_IDLE_TIMEOUT, ) .await; }); @@ -81,6 +82,7 @@ async fn relay_to_mask_propagates_client_half_close_without_waiting_for_other_di 0, false, 32 * 1024, + MASK_RELAY_IDLE_TIMEOUT, ) .await; }); diff --git a/src/proxy/tests/masking_security_tests.rs b/src/proxy/tests/masking_security_tests.rs index c698b55..84a0e6f 100644 --- a/src/proxy/tests/masking_security_tests.rs +++ b/src/proxy/tests/masking_security_tests.rs @@ -1377,6 +1377,7 @@ async fn relay_to_mask_keeps_backend_to_client_flow_when_client_to_backend_stall 0, false, 5 * 1024 * 1024, + MASK_RELAY_IDLE_TIMEOUT, ) .await; }); @@ -1508,6 +1509,7 @@ async fn relay_to_mask_timeout_cancels_and_drops_all_io_endpoints() { 0, false, 5 * 1024 * 1024, + MASK_RELAY_IDLE_TIMEOUT, ), ) .await; diff --git a/src/proxy/tests/masking_self_target_loop_security_tests.rs b/src/proxy/tests/masking_self_target_loop_security_tests.rs index 7f6cb29..975b4fc 100644 --- a/src/proxy/tests/masking_self_target_loop_security_tests.rs +++ b/src/proxy/tests/masking_self_target_loop_security_tests.rs @@ -228,6 +228,7 @@ async fn relay_path_idle_timeout_eviction_remains_effective() { 0, false, 5 * 1024 * 1024, + MASK_RELAY_IDLE_TIMEOUT, ) .await; diff --git a/src/proxy/tests/masking_shape_guard_adversarial_tests.rs b/src/proxy/tests/masking_shape_guard_adversarial_tests.rs index 4fa8da7..6c3c4bf 100644 --- a/src/proxy/tests/masking_shape_guard_adversarial_tests.rs +++ b/src/proxy/tests/masking_shape_guard_adversarial_tests.rs @@ -44,6 +44,7 @@ async fn run_relay_case( above_cap_blur_max_bytes, false, 5 * 1024 * 1024, + MASK_RELAY_IDLE_TIMEOUT, ) .await; }); diff --git a/src/proxy/tests/masking_shape_hardening_adversarial_tests.rs b/src/proxy/tests/masking_shape_hardening_adversarial_tests.rs index 9abf3c0..4e0aa18 100644 --- a/src/proxy/tests/masking_shape_hardening_adversarial_tests.rs +++ b/src/proxy/tests/masking_shape_hardening_adversarial_tests.rs @@ -89,6 +89,7 @@ async fn relay_to_mask_applies_cap_clamped_padding_for_non_power_of_two_cap() { 0, false, 5 * 1024 * 1024, + MASK_RELAY_IDLE_TIMEOUT, ) .await; }); diff --git a/src/proxy/tests/proxy_shared_state_isolation_tests.rs b/src/proxy/tests/proxy_shared_state_isolation_tests.rs index 7887ef8..faa045f 100644 --- a/src/proxy/tests/proxy_shared_state_isolation_tests.rs +++ b/src/proxy/tests/proxy_shared_state_isolation_tests.rs @@ -53,11 +53,14 @@ fn new_client_harness() -> ClientHarness { upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None, + bindtodevice: None, }, weight: 1, enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 257d8f3..000bca0 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -67,10 +67,8 @@ struct FamilyReconnectOutcome { key: (i32, IpFamily), dc: i32, family: IpFamily, - alive: usize, required: usize, endpoint_count: usize, - restored: usize, } pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { @@ -82,8 +80,6 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new(); let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); - let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new(); - let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut drain_warn_next_allowed: HashMap = HashMap::new(); let mut degraded_interval = true; @@ -109,8 +105,6 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut single_endpoint_outage, &mut shadow_rotate_deadline, &mut idle_refresh_next_attempt, - &mut adaptive_idle_since, - &mut adaptive_recover_until, &mut floor_warn_next_allowed, ) .await; @@ -126,8 +120,6 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut single_endpoint_outage, &mut shadow_rotate_deadline, &mut idle_refresh_next_attempt, - &mut adaptive_idle_since, - &mut adaptive_recover_until, &mut floor_warn_next_allowed, ) .await; @@ -360,8 +352,6 @@ async fn check_family( single_endpoint_outage: &mut HashSet<(i32, IpFamily)>, shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>, idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, - adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, - adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>, ) -> bool { let enabled = match family { @@ -393,10 +383,7 @@ async fn check_family( let reconnect_budget = health_reconnect_budget(pool, dc_endpoints.len()); let reconnect_sem = Arc::new(Semaphore::new(reconnect_budget)); - if pool.floor_mode() == MeFloorMode::Static { - adaptive_idle_since.clear(); - adaptive_recover_until.clear(); - } + if pool.floor_mode() == MeFloorMode::Static {} let mut live_addr_counts = HashMap::<(i32, SocketAddr), usize>::new(); let mut live_writer_ids_by_addr = HashMap::<(i32, SocketAddr), Vec>::new(); @@ -435,8 +422,6 @@ async fn check_family( &live_addr_counts, &live_writer_ids_by_addr, &bound_clients_by_writer, - adaptive_idle_since, - adaptive_recover_until, ) .await; pool.set_adaptive_floor_runtime_caps( @@ -503,8 +488,6 @@ async fn check_family( outage_next_attempt.remove(&key); shadow_rotate_deadline.remove(&key); idle_refresh_next_attempt.remove(&key); - adaptive_idle_since.remove(&key); - adaptive_recover_until.remove(&key); info!( dc = %dc, ?family, @@ -632,22 +615,28 @@ async fn check_family( restored += 1; continue; } - pool_for_reconnect - .stats - .increment_me_floor_cap_block_total(); - pool_for_reconnect - .stats - .increment_me_floor_swap_idle_failed_total(); - debug!( - dc = %dc, - ?family, - alive, - required, - active_cap_effective_total, - "Adaptive floor cap reached, reconnect attempt blocked" - ); - break; + + let base_req = pool_for_reconnect + .required_writers_for_dc_with_floor_mode(endpoints_for_dc.len(), false); + if alive + restored >= base_req { + pool_for_reconnect + .stats + .increment_me_floor_cap_block_total(); + pool_for_reconnect + .stats + .increment_me_floor_swap_idle_failed_total(); + debug!( + dc = %dc, + ?family, + alive, + required, + active_cap_effective_total, + "Adaptive floor cap reached, reconnect attempt blocked" + ); + break; + } } + pool_for_reconnect.stats.increment_me_reconnect_attempt(); let res = tokio::time::timeout( pool_for_reconnect.reconnect_runtime.me_one_timeout, pool_for_reconnect.connect_endpoints_round_robin( @@ -663,11 +652,9 @@ async fn check_family( pool_for_reconnect.stats.increment_me_reconnect_success(); } Ok(false) => { - pool_for_reconnect.stats.increment_me_reconnect_attempt(); debug!(dc = %dc, ?family, "ME round-robin reconnect failed") } Err(_) => { - pool_for_reconnect.stats.increment_me_reconnect_attempt(); debug!(dc = %dc, ?family, "ME reconnect timed out"); } } @@ -678,10 +665,8 @@ async fn check_family( key, dc, family, - alive, required, endpoint_count: endpoints_for_dc.len(), - restored, } }); } @@ -695,7 +680,7 @@ async fn check_family( } }; let now = Instant::now(); - let now_alive = outcome.alive + outcome.restored; + let now_alive = live_active_writers_for_dc_family(pool, outcome.dc, outcome.family).await; if now_alive >= outcome.required { info!( dc = %outcome.dc, @@ -851,6 +836,33 @@ fn should_emit_rate_limited_warn( false } +async fn live_active_writers_for_dc_family(pool: &Arc, dc: i32, family: IpFamily) -> usize { + let writers = pool.writers.read().await; + writers + .iter() + .filter(|writer| { + if writer.draining.load(std::sync::atomic::Ordering::Relaxed) { + return false; + } + if writer.writer_dc != dc { + return false; + } + if !matches!( + super::pool::WriterContour::from_u8( + writer.contour.load(std::sync::atomic::Ordering::Relaxed), + ), + super::pool::WriterContour::Active + ) { + return false; + } + match family { + IpFamily::V4 => writer.addr.is_ipv4(), + IpFamily::V6 => writer.addr.is_ipv6(), + } + }) + .count() +} + fn adaptive_floor_class_min( pool: &Arc, endpoint_count: usize, @@ -904,8 +916,6 @@ async fn build_family_floor_plan( live_addr_counts: &HashMap<(i32, SocketAddr), usize>, live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec>, bound_clients_by_writer: &HashMap, - adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, - adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, ) -> FamilyFloorPlan { let mut entries = Vec::::new(); let mut by_dc = HashMap::::new(); @@ -921,18 +931,7 @@ async fn build_family_floor_plan( if endpoints.is_empty() { continue; } - let key = (*dc, family); - let reduce_for_idle = should_reduce_floor_for_idle( - pool, - key, - *dc, - endpoints, - live_writer_ids_by_addr, - bound_clients_by_writer, - adaptive_idle_since, - adaptive_recover_until, - ) - .await; + let _key = (*dc, family); let base_required = pool.required_writers_for_dc(endpoints.len()).max(1); let min_required = if is_adaptive { adaptive_floor_class_min(pool, endpoints.len(), base_required) @@ -947,11 +946,11 @@ async fn build_family_floor_plan( if max_required < min_required { max_required = min_required; } - let desired_raw = if is_adaptive && reduce_for_idle { - min_required - } else { - base_required - }; + // We initialize target_required at base_required to prevent 0-writer blackouts + // caused by proactively dropping an idle DC to a single fragile connection. + // The Adaptive Floor constraint loop below will gracefully compress idle DCs + // (prioritized via has_bound_clients = false) to min_required only when global capacity is reached. + let desired_raw = base_required; let target_required = desired_raw.clamp(min_required, max_required); let alive = endpoints .iter() @@ -1278,43 +1277,6 @@ async fn maybe_refresh_idle_writer_for_dc( ); } -async fn should_reduce_floor_for_idle( - pool: &Arc, - key: (i32, IpFamily), - dc: i32, - endpoints: &[SocketAddr], - live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec>, - bound_clients_by_writer: &HashMap, - adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, - adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, -) -> bool { - if pool.floor_mode() != MeFloorMode::Adaptive { - adaptive_idle_since.remove(&key); - adaptive_recover_until.remove(&key); - return false; - } - - let now = Instant::now(); - let writer_ids = list_writer_ids_for_endpoints(dc, endpoints, live_writer_ids_by_addr); - let has_bound_clients = has_bound_clients_on_endpoint(&writer_ids, bound_clients_by_writer); - if has_bound_clients { - adaptive_idle_since.remove(&key); - adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration()); - return false; - } - - if let Some(recover_until) = adaptive_recover_until.get(&key) - && now < *recover_until - { - adaptive_idle_since.remove(&key); - return false; - } - adaptive_recover_until.remove(&key); - - let idle_since = adaptive_idle_since.entry(key).or_insert(now); - now.saturating_duration_since(*idle_since) >= pool.adaptive_floor_idle_duration() -} - fn has_bound_clients_on_endpoint( writer_ids: &[u64], bound_clients_by_writer: &HashMap, @@ -1364,6 +1326,7 @@ async fn recover_single_endpoint_outage( ); return; }; + pool.stats.increment_me_reconnect_attempt(); pool.stats .increment_me_single_endpoint_outage_reconnect_attempt_total(); @@ -1439,7 +1402,6 @@ async fn recover_single_endpoint_outage( return; } - pool.stats.increment_me_reconnect_attempt(); let current_ms = *outage_backoff.get(&key).unwrap_or(&min_backoff_ms); let next_ms = current_ms.saturating_mul(2).min(max_backoff_ms); outage_backoff.insert(key, next_ms); diff --git a/src/transport/middle_proxy/ping.rs b/src/transport/middle_proxy/ping.rs index bff088b..85888bd 100644 --- a/src/transport/middle_proxy/ping.rs +++ b/src/transport/middle_proxy/ping.rs @@ -67,6 +67,7 @@ pub fn format_sample_line(sample: &MePingSample) -> String { fn format_direct_with_config( interface: &Option, bind_addresses: &Option>, + bindtodevice: &Option, ) -> Option { let mut direct_parts: Vec = Vec::new(); if let Some(dev) = interface.as_deref().filter(|v| !v.is_empty()) { @@ -75,6 +76,9 @@ fn format_direct_with_config( if let Some(src) = bind_addresses.as_ref().filter(|v| !v.is_empty()) { direct_parts.push(format!("src={}", src.join(","))); } + if let Some(device) = bindtodevice.as_deref().filter(|v| !v.is_empty()) { + direct_parts.push(format!("bindtodevice={device}")); + } if direct_parts.is_empty() { None } else { @@ -231,8 +235,11 @@ pub async fn format_me_route( UpstreamType::Direct { interface, bind_addresses, + bindtodevice, } => { - if let Some(route) = format_direct_with_config(interface, bind_addresses) { + if let Some(route) = + format_direct_with_config(interface, bind_addresses, bindtodevice) + { route } else { detect_direct_route_details(reports, prefer_ipv6, v4_ok, v6_ok) diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 249d387..b89a844 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -1422,22 +1422,6 @@ impl MePool { MeFloorMode::from_u8(self.floor_runtime.me_floor_mode.load(Ordering::Relaxed)) } - pub(super) fn adaptive_floor_idle_duration(&self) -> Duration { - Duration::from_secs( - self.floor_runtime - .me_adaptive_floor_idle_secs - .load(Ordering::Relaxed), - ) - } - - pub(super) fn adaptive_floor_recover_grace_duration(&self) -> Duration { - Duration::from_secs( - self.floor_runtime - .me_adaptive_floor_recover_grace_secs - .load(Ordering::Relaxed), - ) - } - pub(super) fn adaptive_floor_min_writers_multi_endpoint(&self) -> usize { (self .floor_runtime @@ -1659,6 +1643,7 @@ impl MePool { &self, contour: WriterContour, allow_coverage_override: bool, + writer_dc: i32, ) -> bool { let (active_writers, warm_writers, _) = self.non_draining_writer_counts_by_contour().await; match contour { @@ -1670,6 +1655,43 @@ impl MePool { if !allow_coverage_override { return false; } + + let mut endpoints_len = 0; + let now_epoch = Self::now_epoch_secs(); + if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch) { + if let Some(addrs) = self.proxy_map_v4.read().await.get(&writer_dc) { + endpoints_len += addrs.len(); + } + } + if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch) { + if let Some(addrs) = self.proxy_map_v6.read().await.get(&writer_dc) { + endpoints_len += addrs.len(); + } + } + + if endpoints_len > 0 { + let base_req = + self.required_writers_for_dc_with_floor_mode(endpoints_len, false); + let active_for_dc = { + let ws = self.writers.read().await; + ws.iter() + .filter(|w| { + !w.draining.load(std::sync::atomic::Ordering::Relaxed) + && w.writer_dc == writer_dc + && matches!( + WriterContour::from_u8( + w.contour.load(std::sync::atomic::Ordering::Relaxed), + ), + WriterContour::Active + ) + }) + .count() + }; + if active_for_dc < base_req { + return true; + } + } + let coverage_required = self.active_coverage_required_total().await; active_writers < coverage_required } diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index 69d8aa0..bb62604 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -77,6 +77,12 @@ impl MePool { return Vec::new(); } + if endpoints.len() == 1 && self.single_endpoint_outage_disable_quarantine() { + let mut guard = self.endpoint_quarantine.lock().await; + guard.retain(|_, expiry| *expiry > Instant::now()); + return endpoints.to_vec(); + } + let mut guard = self.endpoint_quarantine.lock().await; let now = Instant::now(); guard.retain(|_, expiry| *expiry > now); @@ -236,8 +242,18 @@ impl MePool { let fast_retries = self.reconnect_runtime.me_reconnect_fast_retry_count.max(1); let mut total_attempts = 0u32; let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await; + let dc_endpoints = self.endpoints_for_dc(writer_dc).await; + let single_endpoint_dc = dc_endpoints.len() == 1 && dc_endpoints[0] == addr; + let bypass_quarantine_for_single_endpoint = + single_endpoint_dc && self.single_endpoint_outage_disable_quarantine(); - if !same_endpoint_quarantined { + if !same_endpoint_quarantined || bypass_quarantine_for_single_endpoint { + if same_endpoint_quarantined && bypass_quarantine_for_single_endpoint { + debug!( + %addr, + "Bypassing quarantine for immediate reconnect on single-endpoint DC" + ); + } for attempt in 0..fast_retries { if total_attempts >= ME_REFILL_TOTAL_ATTEMPT_CAP { break; @@ -276,7 +292,6 @@ impl MePool { ); } - let dc_endpoints = self.endpoints_for_dc(writer_dc).await; if dc_endpoints.is_empty() { self.stats.increment_me_refill_failed_total(); return false; diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index fae68b9..52c8fae 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -342,7 +342,7 @@ impl MePool { allow_coverage_override: bool, ) -> Result<()> { if !self - .can_open_writer_for_contour(contour, allow_coverage_override) + .can_open_writer_for_contour(contour, allow_coverage_override, writer_dc) .await { return Err(ProxyError::Proxy(format!( diff --git a/src/transport/middle_proxy/tests/pool_refill_security_tests.rs b/src/transport/middle_proxy/tests/pool_refill_security_tests.rs index 90c8382..4463444 100644 --- a/src/transport/middle_proxy/tests/pool_refill_security_tests.rs +++ b/src/transport/middle_proxy/tests/pool_refill_security_tests.rs @@ -109,18 +109,16 @@ async fn connectable_endpoints_waits_until_quarantine_expires() { { let mut guard = pool.endpoint_quarantine.lock().await; - guard.insert(addr, Instant::now() + Duration::from_millis(80)); + guard.insert(addr, Instant::now() + Duration::from_millis(500)); } - let started = Instant::now(); - let endpoints = pool.connectable_endpoints_for_test(&[addr]).await; - let elapsed = started.elapsed(); - + let endpoints = tokio::time::timeout( + Duration::from_millis(120), + pool.connectable_endpoints_for_test(&[addr]), + ) + .await + .expect("single-endpoint outage mode should bypass quarantine delay"); assert_eq!(endpoints, vec![addr]); - assert!( - elapsed >= Duration::from_millis(50), - "single-endpoint DC should honor quarantine before retry" - ); } #[tokio::test] diff --git a/src/transport/socket.rs b/src/transport/socket.rs index b751a30..58d3b97 100644 --- a/src/transport/socket.rs +++ b/src/transport/socket.rs @@ -158,6 +158,56 @@ pub fn create_outgoing_socket_bound(addr: SocketAddr, bind_addr: Option) Ok(socket) } +/// Pin an outgoing socket to a specific Linux network interface via SO_BINDTODEVICE. +#[cfg(target_os = "linux")] +pub fn bind_outgoing_socket_to_device(socket: &Socket, device: &str) -> Result<()> { + use std::io::{Error, ErrorKind}; + use std::os::fd::AsRawFd; + + let name = device.trim(); + if name.is_empty() { + return Err(Error::new( + ErrorKind::InvalidInput, + "bindtodevice must not be empty", + )); + } + + // The kernel expects an interface name buffer with a trailing NUL. + if name.len() >= libc::IFNAMSIZ { + return Err(Error::new( + ErrorKind::InvalidInput, + "bindtodevice exceeds IFNAMSIZ", + )); + } + let mut ifname = [0u8; libc::IFNAMSIZ]; + ifname[..name.len()].copy_from_slice(name.as_bytes()); + + let rc = unsafe { + libc::setsockopt( + socket.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_BINDTODEVICE, + ifname.as_ptr().cast::(), + (name.len() + 1) as libc::socklen_t, + ) + }; + if rc != 0 { + return Err(Error::last_os_error()); + } + debug!("Pinned outgoing socket to interface {}", name); + Ok(()) +} + +/// Stub for non-Linux targets where SO_BINDTODEVICE is unavailable. +#[cfg(not(target_os = "linux"))] +pub fn bind_outgoing_socket_to_device(_socket: &Socket, _device: &str) -> Result<()> { + use std::io::{Error, ErrorKind}; + Err(Error::new( + ErrorKind::Unsupported, + "bindtodevice is supported only on Linux", + )) +} + /// Get local address of a socket #[allow(dead_code)] pub fn get_local_addr(stream: &TcpStream) -> Option { diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index 674f0f0..791fc00 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -26,7 +26,9 @@ use crate::stats::Stats; use crate::transport::shadowsocks::{ ShadowsocksStream, connect_shadowsocks, sanitize_shadowsocks_url, }; -use crate::transport::socket::{create_outgoing_socket_bound, resolve_interface_ip}; +use crate::transport::socket::{ + bind_outgoing_socket_to_device, create_outgoing_socket_bound, resolve_interface_ip, +}; use crate::transport::socks::{connect_socks4, connect_socks5}; /// Number of Telegram datacenters @@ -327,6 +329,17 @@ pub struct UpstreamManager { } impl UpstreamManager { + fn is_unscoped_upstream(upstream: &UpstreamConfig) -> bool { + upstream.scopes.is_empty() + } + + fn should_check_in_default_dc_connectivity( + has_unscoped: bool, + upstream: &UpstreamConfig, + ) -> bool { + !has_unscoped || Self::is_unscoped_upstream(upstream) + } + pub fn new( configs: Vec, connect_retry_attempts: u32, @@ -453,6 +466,87 @@ impl UpstreamManager { } } + fn resolve_probe_dc_families( + upstream: &UpstreamConfig, + ipv4_available: bool, + ipv6_available: bool, + ) -> (bool, bool) { + ( + upstream.ipv4.unwrap_or(ipv4_available), + upstream.ipv6.unwrap_or(ipv6_available), + ) + } + + fn resolve_runtime_dc_families( + upstream: &UpstreamConfig, + dc_preference: IpPreference, + ) -> (bool, bool) { + let (auto_ipv4, auto_ipv6) = match dc_preference { + IpPreference::PreferV4 => (true, false), + IpPreference::PreferV6 => (false, true), + IpPreference::BothWork | IpPreference::Unknown | IpPreference::Unavailable => { + (true, true) + } + }; + + ( + upstream.ipv4.unwrap_or(auto_ipv4), + upstream.ipv6.unwrap_or(auto_ipv6), + ) + } + + fn dc_table_addr(dc_idx: i16, ipv6: bool, port: u16) -> Option { + let arr_idx = UpstreamState::dc_array_idx(dc_idx)?; + let ip = if ipv6 { + TG_DATACENTERS_V6[arr_idx] + } else { + TG_DATACENTERS_V4[arr_idx] + }; + Some(SocketAddr::new(ip, port)) + } + + fn resolve_runtime_dc_target( + target: SocketAddr, + dc_idx: Option, + upstream: &UpstreamConfig, + dc_preference: IpPreference, + ) -> Result { + let (allow_ipv4, allow_ipv6) = Self::resolve_runtime_dc_families(upstream, dc_preference); + if (target.is_ipv4() && allow_ipv4) || (target.is_ipv6() && allow_ipv6) { + return Ok(target); + } + + if !allow_ipv4 && !allow_ipv6 { + return Err(ProxyError::Config(format!( + "Upstream DC family policy blocks all families for target {target}" + ))); + } + + let Some(dc_idx) = dc_idx else { + return Err(ProxyError::Config(format!( + "Upstream DC family policy cannot remap target {target} without dc_idx" + ))); + }; + + let remapped = if target.is_ipv4() { + if allow_ipv6 { + Self::dc_table_addr(dc_idx, true, target.port()) + } else { + None + } + } else if allow_ipv4 { + Self::dc_table_addr(dc_idx, false, target.port()) + } else { + None + }; + + remapped.ok_or_else(|| { + ProxyError::Config(format!( + "Upstream DC family policy rejected target {target} (dc_idx={dc_idx})" + )) + }) + } + #[cfg(unix)] fn resolve_interface_addrs(name: &str, want_ipv6: bool) -> Vec { use nix::ifaddrs::getifaddrs; @@ -726,18 +820,28 @@ impl UpstreamManager { .await .ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?; - let mut upstream = { + let (mut upstream, bind_rr, dc_preference) = { let guard = self.upstreams.read().await; - guard[idx].config.clone() + let state = &guard[idx]; + let dc_preference = dc_idx + .and_then(UpstreamState::dc_array_idx) + .map(|dc_array_idx| state.dc_ip_pref[dc_array_idx]) + .unwrap_or(IpPreference::Unknown); + ( + state.config.clone(), + Some(state.bind_rr.clone()), + dc_preference, + ) }; if let Some(s) = scope { upstream.selected_scope = s.to_string(); } - let bind_rr = { - let guard = self.upstreams.read().await; - guard.get(idx).map(|u| u.bind_rr.clone()) + let target = if dc_idx.is_some() { + Self::resolve_runtime_dc_target(target, dc_idx, &upstream, dc_preference)? + } else { + target }; let (stream, _) = self @@ -758,9 +862,18 @@ impl UpstreamManager { .await .ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?; - let mut upstream = { + let (mut upstream, bind_rr, dc_preference) = { let guard = self.upstreams.read().await; - guard[idx].config.clone() + let state = &guard[idx]; + let dc_preference = dc_idx + .and_then(UpstreamState::dc_array_idx) + .map(|dc_array_idx| state.dc_ip_pref[dc_array_idx]) + .unwrap_or(IpPreference::Unknown); + ( + state.config.clone(), + Some(state.bind_rr.clone()), + dc_preference, + ) }; // Set scope for configuration copy @@ -768,9 +881,10 @@ impl UpstreamManager { upstream.selected_scope = s.to_string(); } - let bind_rr = { - let guard = self.upstreams.read().await; - guard.get(idx).map(|u| u.bind_rr.clone()) + let target = if dc_idx.is_some() { + Self::resolve_runtime_dc_target(target, dc_idx, &upstream, dc_preference)? + } else { + target }; let (stream, egress) = self @@ -928,6 +1042,7 @@ impl UpstreamManager { UpstreamType::Direct { interface, bind_addresses, + bindtodevice, } => { let bind_ip = Self::resolve_bind_address( interface, @@ -943,6 +1058,10 @@ impl UpstreamManager { } let socket = create_outgoing_socket_bound(target, bind_ip)?; + if let Some(device) = bindtodevice.as_deref().filter(|value| !value.is_empty()) { + bind_outgoing_socket_to_device(&socket, device).map_err(ProxyError::Io)?; + debug!(bindtodevice = %device, target = %target, "Pinned socket to interface"); + } if let Some(ip) = bind_ip { debug!(bind = %ip, target = %target, "Bound outgoing socket"); } else if interface.is_some() || bind_addresses.is_some() { @@ -1201,14 +1320,26 @@ impl UpstreamManager { .map(|(i, u)| (i, u.config.clone(), u.bind_rr.clone())) .collect() }; + let has_unscoped = upstreams + .iter() + .any(|(_, cfg, _)| Self::is_unscoped_upstream(cfg)); let mut all_results = Vec::new(); for (upstream_idx, upstream_config, bind_rr) in &upstreams { + // DC connectivity checks should follow the default routing path. + // Scoped upstreams are included only when no unscoped upstream exists. + if !Self::should_check_in_default_dc_connectivity(has_unscoped, upstream_config) { + continue; + } + + let (upstream_ipv4_enabled, upstream_ipv6_enabled) = + Self::resolve_probe_dc_families(upstream_config, ipv4_enabled, ipv6_enabled); let upstream_name = match &upstream_config.upstream_type { UpstreamType::Direct { interface, bind_addresses, + bindtodevice, } => { let mut direct_parts = Vec::new(); if let Some(dev) = interface.as_deref().filter(|v| !v.is_empty()) { @@ -1217,6 +1348,9 @@ impl UpstreamManager { if let Some(src) = bind_addresses.as_ref().filter(|v| !v.is_empty()) { direct_parts.push(format!("src={}", src.join(","))); } + if let Some(device) = bindtodevice.as_deref().filter(|v| !v.is_empty()) { + direct_parts.push(format!("bindtodevice={device}")); + } if direct_parts.is_empty() { "direct".to_string() } else { @@ -1233,7 +1367,7 @@ impl UpstreamManager { }; let mut v6_results = Vec::with_capacity(NUM_DCS); - if ipv6_enabled { + if upstream_ipv6_enabled { for dc_zero_idx in 0..NUM_DCS { let dc_v6 = TG_DATACENTERS_V6[dc_zero_idx]; let addr_v6 = SocketAddr::new(dc_v6, TG_DATACENTER_PORT); @@ -1284,13 +1418,17 @@ impl UpstreamManager { dc_idx: dc_zero_idx + 1, dc_addr: SocketAddr::new(dc_v6, TG_DATACENTER_PORT), rtt_ms: None, - error: Some("ipv6 disabled".to_string()), + error: Some(if ipv6_enabled { + "ipv6 disabled by upstream policy".to_string() + } else { + "ipv6 disabled".to_string() + }), }); } } let mut v4_results = Vec::with_capacity(NUM_DCS); - if ipv4_enabled { + if upstream_ipv4_enabled { for dc_zero_idx in 0..NUM_DCS { let dc_v4 = TG_DATACENTERS_V4[dc_zero_idx]; let addr_v4 = SocketAddr::new(dc_v4, TG_DATACENTER_PORT); @@ -1341,7 +1479,11 @@ impl UpstreamManager { dc_idx: dc_zero_idx + 1, dc_addr: SocketAddr::new(dc_v4, TG_DATACENTER_PORT), rtt_ms: None, - error: Some("ipv4 disabled".to_string()), + error: Some(if ipv4_enabled { + "ipv4 disabled by upstream policy".to_string() + } else { + "ipv4 disabled".to_string() + }), }); } } @@ -1361,7 +1503,9 @@ impl UpstreamManager { match addr_str.parse::() { Ok(addr) => { let is_v6 = addr.is_ipv6(); - if (is_v6 && !ipv6_enabled) || (!is_v6 && !ipv4_enabled) { + if (is_v6 && !upstream_ipv6_enabled) + || (!is_v6 && !upstream_ipv4_enabled) + { continue; } let result = tokio::time::timeout( @@ -1596,13 +1740,32 @@ impl UpstreamManager { continue; } - let count = self.upstreams.read().await.len(); - for i in 0..count { + let target_upstreams: Vec = { + let guard = self.upstreams.read().await; + let has_unscoped = guard + .iter() + .any(|upstream| Self::is_unscoped_upstream(&upstream.config)); + guard + .iter() + .enumerate() + .filter(|(_, upstream)| { + Self::should_check_in_default_dc_connectivity( + has_unscoped, + &upstream.config, + ) + }) + .map(|(idx, _)| idx) + .collect() + }; + + for i in target_upstreams { let (config, bind_rr) = { let guard = self.upstreams.read().await; let u = &guard[i]; (u.config.clone(), u.bind_rr.clone()) }; + let (upstream_ipv4_enabled, upstream_ipv6_enabled) = + Self::resolve_probe_dc_families(&config, ipv4_enabled, ipv6_enabled); let mut healthy_groups = 0usize; let mut latency_updates: Vec<(usize, f64)> = Vec::new(); @@ -1618,14 +1781,30 @@ impl UpstreamManager { continue; } - let rotation_key = (i, group.dc_idx, is_primary); - let start_idx = - *endpoint_rotation.entry(rotation_key).or_insert(0) % endpoints.len(); - let mut next_idx = (start_idx + 1) % endpoints.len(); + let filtered_endpoints: Vec = endpoints + .iter() + .copied() + .filter(|endpoint| { + if endpoint.is_ipv4() { + upstream_ipv4_enabled + } else { + upstream_ipv6_enabled + } + }) + .collect(); - for step in 0..endpoints.len() { - let endpoint_idx = (start_idx + step) % endpoints.len(); - let endpoint = endpoints[endpoint_idx]; + if filtered_endpoints.is_empty() { + continue; + } + + let rotation_key = (i, group.dc_idx, is_primary); + let start_idx = *endpoint_rotation.entry(rotation_key).or_insert(0) + % filtered_endpoints.len(); + let mut next_idx = (start_idx + 1) % filtered_endpoints.len(); + + for step in 0..filtered_endpoints.len() { + let endpoint_idx = (start_idx + step) % filtered_endpoints.len(); + let endpoint = filtered_endpoints[endpoint_idx]; let start = Instant::now(); let result = tokio::time::timeout( @@ -1644,7 +1823,7 @@ impl UpstreamManager { Ok(Ok(_stream)) => { group_ok = true; group_rtt_ms = Some(start.elapsed().as_secs_f64() * 1000.0); - next_idx = (endpoint_idx + 1) % endpoints.len(); + next_idx = (endpoint_idx + 1) % filtered_endpoints.len(); break; } Ok(Err(e)) => { @@ -1859,6 +2038,33 @@ mod tests { assert!(!UpstreamManager::is_hard_connect_error(&error)); } + #[test] + fn unscoped_selection_detects_default_route_upstream() { + let mut upstream = UpstreamConfig { + upstream_type: UpstreamType::Direct { + interface: None, + bind_addresses: None, + bindtodevice: None, + }, + weight: 1, + enabled: true, + scopes: String::new(), + selected_scope: String::new(), + ipv4: None, + ipv6: None, + }; + + assert!(UpstreamManager::is_unscoped_upstream(&upstream)); + upstream.scopes = "local".to_string(); + assert!(!UpstreamManager::is_unscoped_upstream(&upstream)); + assert!(!UpstreamManager::should_check_in_default_dc_connectivity( + true, &upstream + )); + assert!(UpstreamManager::should_check_in_default_dc_connectivity( + false, &upstream + )); + } + #[test] fn resolve_bind_address_prefers_explicit_bind_ip() { let target = "203.0.113.10:443".parse::().unwrap(); @@ -1899,6 +2105,8 @@ mod tests { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 100,