diff --git a/Cargo.lock b/Cargo.lock index b80c651..893f526 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2661,7 +2661,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" [[package]] name = "telemt" -version = "4.3.29-David6" +version = "4.3.29-David7" dependencies = [ "aes", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index e92647f..086543d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "4.3.29-David6" +version = "4.3.29-David7" edition = "2024" [dependencies] diff --git a/docs/CONFIG_PARAMS.en.md b/docs/CONFIG_PARAMS.en.md index 4f6d436..738550c 100644 --- a/docs/CONFIG_PARAMS.en.md +++ b/docs/CONFIG_PARAMS.en.md @@ -260,9 +260,14 @@ This document lists all configuration keys accepted by `config.toml`. | tls_full_cert_ttl_secs | `u64` | `90` | — | TTL for sending full cert payload per (domain, client IP) tuple. | | alpn_enforce | `bool` | `true` | — | Enforces ALPN echo behavior based on client preference. | | mask_proxy_protocol | `u8` | `0` | — | PROXY protocol mode for mask backend (`0` disabled, `1` v1, `2` v2). | -| mask_shape_hardening | `bool` | `false` | — | Enables client->mask shape-channel hardening by applying controlled tail padding to bucket boundaries on mask relay shutdown. | +| mask_shape_hardening | `bool` | `true` | — | Enables client->mask shape-channel hardening by applying controlled tail padding to bucket boundaries on mask relay shutdown. | | mask_shape_bucket_floor_bytes | `usize` | `512` | Must be `> 0`; should be `<= mask_shape_bucket_cap_bytes`. | Minimum bucket size used by shape-channel hardening. | | mask_shape_bucket_cap_bytes | `usize` | `4096` | Must be `>= mask_shape_bucket_floor_bytes`. | Maximum bucket size used by shape-channel hardening; traffic above cap is not padded further. | +| mask_shape_above_cap_blur | `bool` | `false` | Requires `mask_shape_hardening = true`; requires `mask_shape_above_cap_blur_max_bytes > 0`. | Adds bounded randomized tail bytes even when forwarded size already exceeds cap. | +| mask_shape_above_cap_blur_max_bytes | `usize` | `512` | Must be `<= 1048576`; must be `> 0` when `mask_shape_above_cap_blur = true`. | Maximum randomized extra bytes appended above cap. | +| mask_timing_normalization_enabled | `bool` | `false` | Requires `mask_timing_normalization_floor_ms > 0`; requires `ceiling >= floor`. | Enables timing envelope normalization on masking outcomes. | +| mask_timing_normalization_floor_ms | `u64` | `0` | Must be `> 0` when timing normalization is enabled; must be `<= ceiling`. | Lower bound (ms) for masking outcome normalization target. | +| mask_timing_normalization_ceiling_ms | `u64` | `0` | Must be `>= floor`; must be `<= 60000`. | Upper bound (ms) for masking outcome normalization target. | ### Shape-channel hardening notes (`[censorship]`) @@ -283,14 +288,36 @@ Practical trade-offs: - Better anti-fingerprinting on size/shape channel. - Slightly higher egress overhead for small probes due to padding. -- Behavior is intentionally conservative and disabled by default. +- Behavior is intentionally conservative and enabled by default. Recommended starting profile: -- `mask_shape_hardening = true` +- `mask_shape_hardening = true` (default) - `mask_shape_bucket_floor_bytes = 512` - `mask_shape_bucket_cap_bytes = 4096` +### Above-cap blur notes (`[censorship]`) + +`mask_shape_above_cap_blur` adds a second-stage blur for very large probes that are already above `mask_shape_bucket_cap_bytes`. + +- A random tail in `[0, mask_shape_above_cap_blur_max_bytes]` is appended. +- This reduces exact-size leakage above cap at bounded overhead. +- Keep `mask_shape_above_cap_blur_max_bytes` conservative to avoid unnecessary egress growth. + +### Timing normalization envelope notes (`[censorship]`) + +`mask_timing_normalization_enabled` smooths timing differences between masking outcomes by applying a target duration envelope. + +- A random target is selected in `[mask_timing_normalization_floor_ms, mask_timing_normalization_ceiling_ms]`. +- Fast paths are delayed up to the selected target. +- Slow paths are not forced to finish by the ceiling (the envelope is best-effort shaping, not truncation). + +Recommended starting profile for timing shaping: + +- `mask_timing_normalization_enabled = true` +- `mask_timing_normalization_floor_ms = 180` +- `mask_timing_normalization_ceiling_ms = 320` + If your backend or network is very bandwidth-constrained, reduce cap first. If probes are still too distinguishable in your environment, increase floor gradually. ## [access] diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 716d973..e3d729c 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -515,7 +515,7 @@ pub(crate) fn default_alpn_enforce() -> bool { } pub(crate) fn default_mask_shape_hardening() -> bool { - false + true } pub(crate) fn default_mask_shape_bucket_floor_bytes() -> usize { @@ -526,6 +526,26 @@ pub(crate) fn default_mask_shape_bucket_cap_bytes() -> usize { 4096 } +pub(crate) fn default_mask_shape_above_cap_blur() -> bool { + false +} + +pub(crate) fn default_mask_shape_above_cap_blur_max_bytes() -> usize { + 512 +} + +pub(crate) fn default_mask_timing_normalization_enabled() -> bool { + false +} + +pub(crate) fn default_mask_timing_normalization_floor_ms() -> u64 { + 0 +} + +pub(crate) fn default_mask_timing_normalization_ceiling_ms() -> u64 { + 0 +} + pub(crate) fn default_stun_servers() -> Vec { vec![ "stun.l.google.com:5349".to_string(), diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index b483cd0..10fc976 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -585,6 +585,16 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b != new.censorship.mask_shape_bucket_floor_bytes || old.censorship.mask_shape_bucket_cap_bytes != new.censorship.mask_shape_bucket_cap_bytes + || old.censorship.mask_shape_above_cap_blur + != new.censorship.mask_shape_above_cap_blur + || old.censorship.mask_shape_above_cap_blur_max_bytes + != new.censorship.mask_shape_above_cap_blur_max_bytes + || old.censorship.mask_timing_normalization_enabled + != new.censorship.mask_timing_normalization_enabled + || old.censorship.mask_timing_normalization_floor_ms + != new.censorship.mask_timing_normalization_floor_ms + || old.censorship.mask_timing_normalization_ceiling_ms + != new.censorship.mask_timing_normalization_ceiling_ms { warned = true; warn!("config reload: censorship settings changed; restart required"); diff --git a/src/config/load.rs b/src/config/load.rs index 78bb084..fdc33d7 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -384,6 +384,71 @@ impl ProxyConfig { )); } + if config.censorship.mask_shape_bucket_floor_bytes == 0 { + return Err(ProxyError::Config( + "censorship.mask_shape_bucket_floor_bytes must be > 0".to_string(), + )); + } + + if config.censorship.mask_shape_bucket_cap_bytes + < config.censorship.mask_shape_bucket_floor_bytes + { + return Err(ProxyError::Config( + "censorship.mask_shape_bucket_cap_bytes must be >= censorship.mask_shape_bucket_floor_bytes" + .to_string(), + )); + } + + if config.censorship.mask_shape_above_cap_blur + && !config.censorship.mask_shape_hardening + { + return Err(ProxyError::Config( + "censorship.mask_shape_above_cap_blur requires censorship.mask_shape_hardening = true" + .to_string(), + )); + } + + if config.censorship.mask_shape_above_cap_blur + && config.censorship.mask_shape_above_cap_blur_max_bytes == 0 + { + return Err(ProxyError::Config( + "censorship.mask_shape_above_cap_blur_max_bytes must be > 0 when censorship.mask_shape_above_cap_blur is enabled" + .to_string(), + )); + } + + if config.censorship.mask_shape_above_cap_blur_max_bytes > 1_048_576 { + return Err(ProxyError::Config( + "censorship.mask_shape_above_cap_blur_max_bytes must be <= 1048576" + .to_string(), + )); + } + + if config.censorship.mask_timing_normalization_ceiling_ms + < config.censorship.mask_timing_normalization_floor_ms + { + return Err(ProxyError::Config( + "censorship.mask_timing_normalization_ceiling_ms must be >= censorship.mask_timing_normalization_floor_ms" + .to_string(), + )); + } + + if config.censorship.mask_timing_normalization_enabled + && config.censorship.mask_timing_normalization_floor_ms == 0 + { + return Err(ProxyError::Config( + "censorship.mask_timing_normalization_floor_ms must be > 0 when censorship.mask_timing_normalization_enabled is true" + .to_string(), + )); + } + + if config.censorship.mask_timing_normalization_ceiling_ms > 60_000 { + return Err(ProxyError::Config( + "censorship.mask_timing_normalization_ceiling_ms must be <= 60000" + .to_string(), + )); + } + if config.timeouts.relay_client_idle_soft_secs == 0 { return Err(ProxyError::Config( "timeouts.relay_client_idle_soft_secs must be > 0".to_string(), @@ -1044,6 +1109,10 @@ mod load_idle_policy_tests; #[path = "tests/load_security_tests.rs"] mod load_security_tests; +#[cfg(test)] +#[path = "tests/load_mask_shape_security_tests.rs"] +mod load_mask_shape_security_tests; + #[cfg(test)] mod tests { use super::*; diff --git a/src/config/tests/load_mask_shape_security_tests.rs b/src/config/tests/load_mask_shape_security_tests.rs new file mode 100644 index 0000000..41df0f5 --- /dev/null +++ b/src/config/tests/load_mask_shape_security_tests.rs @@ -0,0 +1,195 @@ +use super::*; +use std::fs; +use std::path::PathBuf; +use std::time::{SystemTime, UNIX_EPOCH}; + +fn write_temp_config(contents: &str) -> PathBuf { + let nonce = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time must be after unix epoch") + .as_nanos(); + let path = std::env::temp_dir().join(format!("telemt-load-mask-shape-security-{nonce}.toml")); + fs::write(&path, contents).expect("temp config write must succeed"); + path +} + +fn remove_temp_config(path: &PathBuf) { + let _ = fs::remove_file(path); +} + +#[test] +fn load_rejects_zero_mask_shape_bucket_floor_bytes() { + let path = write_temp_config( + r#" +[censorship] +mask_shape_bucket_floor_bytes = 0 +mask_shape_bucket_cap_bytes = 4096 +"#, + ); + + let err = + ProxyConfig::load(&path).expect_err("zero mask_shape_bucket_floor_bytes must be rejected"); + let msg = err.to_string(); + assert!( + msg.contains("censorship.mask_shape_bucket_floor_bytes must be > 0"), + "error must explain floor>0 invariant, got: {msg}" + ); + + remove_temp_config(&path); +} + +#[test] +fn load_rejects_mask_shape_bucket_cap_less_than_floor() { + let path = write_temp_config( + r#" +[censorship] +mask_shape_bucket_floor_bytes = 1024 +mask_shape_bucket_cap_bytes = 512 +"#, + ); + + let err = + ProxyConfig::load(&path).expect_err("mask_shape_bucket_cap_bytes < floor must be rejected"); + let msg = err.to_string(); + assert!( + msg.contains( + "censorship.mask_shape_bucket_cap_bytes must be >= censorship.mask_shape_bucket_floor_bytes" + ), + "error must explain cap>=floor invariant, got: {msg}" + ); + + remove_temp_config(&path); +} + +#[test] +fn load_accepts_mask_shape_bucket_cap_equal_to_floor() { + let path = write_temp_config( + r#" +[censorship] +mask_shape_hardening = true +mask_shape_bucket_floor_bytes = 1024 +mask_shape_bucket_cap_bytes = 1024 +"#, + ); + + let cfg = ProxyConfig::load(&path).expect("equal cap and floor must be accepted"); + assert!(cfg.censorship.mask_shape_hardening); + assert_eq!(cfg.censorship.mask_shape_bucket_floor_bytes, 1024); + assert_eq!(cfg.censorship.mask_shape_bucket_cap_bytes, 1024); + + remove_temp_config(&path); +} + +#[test] +fn load_rejects_above_cap_blur_when_shape_hardening_disabled() { + let path = write_temp_config( + r#" +[censorship] +mask_shape_hardening = false +mask_shape_above_cap_blur = true +mask_shape_above_cap_blur_max_bytes = 64 +"#, + ); + + let err = ProxyConfig::load(&path) + .expect_err("above-cap blur must require shape hardening enabled"); + let msg = err.to_string(); + assert!( + msg.contains("censorship.mask_shape_above_cap_blur requires censorship.mask_shape_hardening = true"), + "error must explain blur prerequisite, got: {msg}" + ); + + remove_temp_config(&path); +} + +#[test] +fn load_rejects_above_cap_blur_with_zero_max_bytes() { + let path = write_temp_config( + r#" +[censorship] +mask_shape_hardening = true +mask_shape_above_cap_blur = true +mask_shape_above_cap_blur_max_bytes = 0 +"#, + ); + + let err = ProxyConfig::load(&path) + .expect_err("above-cap blur max bytes must be > 0 when enabled"); + let msg = err.to_string(); + assert!( + msg.contains("censorship.mask_shape_above_cap_blur_max_bytes must be > 0 when censorship.mask_shape_above_cap_blur is enabled"), + "error must explain blur max bytes invariant, got: {msg}" + ); + + remove_temp_config(&path); +} + +#[test] +fn load_rejects_timing_normalization_floor_zero_when_enabled() { + let path = write_temp_config( + r#" +[censorship] +mask_timing_normalization_enabled = true +mask_timing_normalization_floor_ms = 0 +mask_timing_normalization_ceiling_ms = 200 +"#, + ); + + let err = ProxyConfig::load(&path) + .expect_err("timing normalization floor must be > 0 when enabled"); + let msg = err.to_string(); + assert!( + msg.contains("censorship.mask_timing_normalization_floor_ms must be > 0 when censorship.mask_timing_normalization_enabled is true"), + "error must explain timing floor invariant, got: {msg}" + ); + + remove_temp_config(&path); +} + +#[test] +fn load_rejects_timing_normalization_ceiling_below_floor() { + let path = write_temp_config( + r#" +[censorship] +mask_timing_normalization_enabled = true +mask_timing_normalization_floor_ms = 220 +mask_timing_normalization_ceiling_ms = 200 +"#, + ); + + let err = ProxyConfig::load(&path) + .expect_err("timing normalization ceiling must be >= floor"); + let msg = err.to_string(); + assert!( + msg.contains("censorship.mask_timing_normalization_ceiling_ms must be >= censorship.mask_timing_normalization_floor_ms"), + "error must explain timing ceiling/floor invariant, got: {msg}" + ); + + remove_temp_config(&path); +} + +#[test] +fn load_accepts_valid_timing_normalization_and_above_cap_blur_config() { + let path = write_temp_config( + r#" +[censorship] +mask_shape_hardening = true +mask_shape_above_cap_blur = true +mask_shape_above_cap_blur_max_bytes = 128 +mask_timing_normalization_enabled = true +mask_timing_normalization_floor_ms = 150 +mask_timing_normalization_ceiling_ms = 240 +"#, + ); + + let cfg = ProxyConfig::load(&path) + .expect("valid blur and timing normalization settings must be accepted"); + assert!(cfg.censorship.mask_shape_hardening); + assert!(cfg.censorship.mask_shape_above_cap_blur); + assert_eq!(cfg.censorship.mask_shape_above_cap_blur_max_bytes, 128); + assert!(cfg.censorship.mask_timing_normalization_enabled); + assert_eq!(cfg.censorship.mask_timing_normalization_floor_ms, 150); + assert_eq!(cfg.censorship.mask_timing_normalization_ceiling_ms, 240); + + remove_temp_config(&path); +} diff --git a/src/config/types.rs b/src/config/types.rs index 4d60686..e7470a4 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -1425,6 +1425,27 @@ pub struct AntiCensorshipConfig { /// Maximum bucket size for mask shape hardening padding. #[serde(default = "default_mask_shape_bucket_cap_bytes")] pub mask_shape_bucket_cap_bytes: usize, + + /// Add bounded random tail bytes even when total bytes already exceed + /// mask_shape_bucket_cap_bytes. + #[serde(default = "default_mask_shape_above_cap_blur")] + pub mask_shape_above_cap_blur: bool, + + /// Maximum random bytes appended above cap when above-cap blur is enabled. + #[serde(default = "default_mask_shape_above_cap_blur_max_bytes")] + pub mask_shape_above_cap_blur_max_bytes: usize, + + /// Enable outcome-time normalization envelope for masking fallback. + #[serde(default = "default_mask_timing_normalization_enabled")] + pub mask_timing_normalization_enabled: bool, + + /// Lower bound (ms) for masking outcome timing envelope. + #[serde(default = "default_mask_timing_normalization_floor_ms")] + pub mask_timing_normalization_floor_ms: u64, + + /// Upper bound (ms) for masking outcome timing envelope. + #[serde(default = "default_mask_timing_normalization_ceiling_ms")] + pub mask_timing_normalization_ceiling_ms: u64, } impl Default for AntiCensorshipConfig { @@ -1449,6 +1470,11 @@ impl Default for AntiCensorshipConfig { mask_shape_hardening: default_mask_shape_hardening(), mask_shape_bucket_floor_bytes: default_mask_shape_bucket_floor_bytes(), mask_shape_bucket_cap_bytes: default_mask_shape_bucket_cap_bytes(), + mask_shape_above_cap_blur: default_mask_shape_above_cap_blur(), + mask_shape_above_cap_blur_max_bytes: default_mask_shape_above_cap_blur_max_bytes(), + mask_timing_normalization_enabled: default_mask_timing_normalization_enabled(), + mask_timing_normalization_floor_ms: default_mask_timing_normalization_floor_ms(), + mask_timing_normalization_ceiling_ms: default_mask_timing_normalization_ceiling_ms(), } } } diff --git a/src/proxy/client.rs b/src/proxy/client.rs index a12e069..5eb2a22 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -1261,3 +1261,15 @@ mod masking_diagnostics_security_tests; #[cfg(test)] #[path = "tests/client_masking_shape_hardening_security_tests.rs"] mod masking_shape_hardening_security_tests; + +#[cfg(test)] +#[path = "tests/client_masking_shape_hardening_adversarial_tests.rs"] +mod masking_shape_hardening_adversarial_tests; + +#[cfg(test)] +#[path = "tests/client_masking_shape_hardening_redteam_expected_fail_tests.rs"] +mod masking_shape_hardening_redteam_expected_fail_tests; + +#[cfg(test)] +#[path = "tests/client_masking_shape_classifier_fuzz_redteam_expected_fail_tests.rs"] +mod masking_shape_classifier_fuzz_redteam_expected_fail_tests; diff --git a/src/proxy/masking.rs b/src/proxy/masking.rs index 4f013f1..94b2b77 100644 --- a/src/proxy/masking.rs +++ b/src/proxy/masking.rs @@ -3,6 +3,7 @@ use std::str; use std::net::SocketAddr; use std::time::Duration; +use rand::{Rng, RngCore}; use tokio::net::TcpStream; #[cfg(unix)] use tokio::net::UnixStream; @@ -73,7 +74,7 @@ fn next_mask_shape_bucket(total: usize, floor: usize, cap: usize) -> usize { None => return total, } if bucket > cap { - return total; + return cap; } } bucket @@ -85,6 +86,8 @@ async fn maybe_write_shape_padding( enabled: bool, floor: usize, cap: usize, + above_cap_blur: bool, + above_cap_blur_max_bytes: usize, ) where W: AsyncWrite + Unpin, @@ -93,15 +96,47 @@ where return; } - let bucket = next_mask_shape_bucket(total_sent, floor, cap); - if bucket <= total_sent { + let target_total = if total_sent >= cap && above_cap_blur && above_cap_blur_max_bytes > 0 { + let mut rng = rand::rng(); + let extra = rng.random_range(0..=above_cap_blur_max_bytes); + total_sent.saturating_add(extra) + } else { + next_mask_shape_bucket(total_sent, floor, cap) + }; + + if target_total <= total_sent { return; } - let pad_len = bucket - total_sent; - let pad = vec![0u8; pad_len]; - let _ = timeout(MASK_TIMEOUT, mask_write.write_all(&pad)).await; - let _ = timeout(MASK_TIMEOUT, mask_write.flush()).await; + let mut remaining = target_total - total_sent; + let mut pad_chunk = [0u8; 1024]; + let deadline = Instant::now() + MASK_TIMEOUT; + + while remaining > 0 { + let now = Instant::now(); + if now >= deadline { + return; + } + + let write_len = remaining.min(pad_chunk.len()); + { + let mut rng = rand::rng(); + rng.fill_bytes(&mut pad_chunk[..write_len]); + } + let write_budget = deadline.saturating_duration_since(now); + match timeout(write_budget, mask_write.write_all(&pad_chunk[..write_len])).await { + Ok(Ok(())) => {} + Ok(Err(_)) | Err(_) => return, + } + remaining -= write_len; + } + + let now = Instant::now(); + if now >= deadline { + return; + } + let flush_budget = deadline.saturating_duration_since(now); + let _ = timeout(flush_budget, mask_write.flush()).await; } async fn write_proxy_header_with_timeout(mask_write: &mut W, header: &[u8]) -> bool @@ -134,10 +169,33 @@ async fn wait_mask_connect_budget(started: Instant) { } } -async fn wait_mask_outcome_budget(started: Instant) { +fn mask_outcome_target_budget(config: &ProxyConfig) -> Duration { + if config.censorship.mask_timing_normalization_enabled { + let floor = config.censorship.mask_timing_normalization_floor_ms; + let ceiling = config.censorship.mask_timing_normalization_ceiling_ms; + if ceiling > floor { + let mut rng = rand::rng(); + return Duration::from_millis(rng.random_range(floor..=ceiling)); + } + return Duration::from_millis(floor); + } + + MASK_TIMEOUT +} + +async fn wait_mask_connect_budget_if_needed(started: Instant, config: &ProxyConfig) { + if config.censorship.mask_timing_normalization_enabled { + return; + } + + wait_mask_connect_budget(started).await; +} + +async fn wait_mask_outcome_budget(started: Instant, config: &ProxyConfig) { + let target = mask_outcome_target_budget(config); let elapsed = started.elapsed(); - if elapsed < MASK_TIMEOUT { - tokio::time::sleep(MASK_TIMEOUT - elapsed).await; + if elapsed < target { + tokio::time::sleep(target - elapsed).await; } } @@ -247,7 +305,7 @@ where build_mask_proxy_header(config.censorship.mask_proxy_protocol, peer, local_addr); if let Some(header) = proxy_header { if !write_proxy_header_with_timeout(&mut mask_write, &header).await { - wait_mask_outcome_budget(outcome_started).await; + wait_mask_outcome_budget(outcome_started, config).await; return; } } @@ -262,6 +320,8 @@ where config.censorship.mask_shape_hardening, config.censorship.mask_shape_bucket_floor_bytes, config.censorship.mask_shape_bucket_cap_bytes, + config.censorship.mask_shape_above_cap_blur, + config.censorship.mask_shape_above_cap_blur_max_bytes, ), ) .await @@ -269,18 +329,18 @@ where { debug!("Mask relay timed out (unix socket)"); } - wait_mask_outcome_budget(outcome_started).await; + wait_mask_outcome_budget(outcome_started, config).await; } Ok(Err(e)) => { - wait_mask_connect_budget(connect_started).await; + wait_mask_connect_budget_if_needed(connect_started, config).await; debug!(error = %e, "Failed to connect to mask unix socket"); consume_client_data_with_timeout(reader).await; - wait_mask_outcome_budget(outcome_started).await; + wait_mask_outcome_budget(outcome_started, config).await; } Err(_) => { debug!("Timeout connecting to mask unix socket"); consume_client_data_with_timeout(reader).await; - wait_mask_outcome_budget(outcome_started).await; + wait_mask_outcome_budget(outcome_started, config).await; } } return; @@ -313,7 +373,7 @@ where let (mask_read, mut mask_write) = stream.into_split(); if let Some(header) = proxy_header { if !write_proxy_header_with_timeout(&mut mask_write, &header).await { - wait_mask_outcome_budget(outcome_started).await; + wait_mask_outcome_budget(outcome_started, config).await; return; } } @@ -328,6 +388,8 @@ where config.censorship.mask_shape_hardening, config.censorship.mask_shape_bucket_floor_bytes, config.censorship.mask_shape_bucket_cap_bytes, + config.censorship.mask_shape_above_cap_blur, + config.censorship.mask_shape_above_cap_blur_max_bytes, ), ) .await @@ -335,18 +397,18 @@ where { debug!("Mask relay timed out"); } - wait_mask_outcome_budget(outcome_started).await; + wait_mask_outcome_budget(outcome_started, config).await; } Ok(Err(e)) => { - wait_mask_connect_budget(connect_started).await; + wait_mask_connect_budget_if_needed(connect_started, config).await; debug!(error = %e, "Failed to connect to mask host"); consume_client_data_with_timeout(reader).await; - wait_mask_outcome_budget(outcome_started).await; + wait_mask_outcome_budget(outcome_started, config).await; } Err(_) => { debug!("Timeout connecting to mask host"); consume_client_data_with_timeout(reader).await; - wait_mask_outcome_budget(outcome_started).await; + wait_mask_outcome_budget(outcome_started, config).await; } } } @@ -361,6 +423,8 @@ async fn relay_to_mask( shape_hardening_enabled: bool, shape_bucket_floor_bytes: usize, shape_bucket_cap_bytes: usize, + shape_above_cap_blur: bool, + shape_above_cap_blur_max_bytes: usize, ) where R: AsyncRead + Unpin + Send + 'static, @@ -386,6 +450,8 @@ where shape_hardening_enabled, shape_bucket_floor_bytes, shape_bucket_cap_bytes, + shape_above_cap_blur, + shape_above_cap_blur_max_bytes, ) .await; let _ = mask_write.shutdown().await; @@ -414,3 +480,23 @@ mod security_tests; #[cfg(test)] #[path = "tests/masking_adversarial_tests.rs"] mod adversarial_tests; + +#[cfg(test)] +#[path = "tests/masking_shape_hardening_adversarial_tests.rs"] +mod masking_shape_hardening_adversarial_tests; + +#[cfg(test)] +#[path = "tests/masking_shape_above_cap_blur_security_tests.rs"] +mod masking_shape_above_cap_blur_security_tests; + +#[cfg(test)] +#[path = "tests/masking_timing_normalization_security_tests.rs"] +mod masking_timing_normalization_security_tests; + +#[cfg(test)] +#[path = "tests/masking_ab_envelope_blur_integration_security_tests.rs"] +mod masking_ab_envelope_blur_integration_security_tests; + +#[cfg(test)] +#[path = "tests/masking_timing_sidechannel_redteam_expected_fail_tests.rs"] +mod masking_timing_sidechannel_redteam_expected_fail_tests; diff --git a/src/proxy/tests/client_masking_shape_classifier_fuzz_redteam_expected_fail_tests.rs b/src/proxy/tests/client_masking_shape_classifier_fuzz_redteam_expected_fail_tests.rs new file mode 100644 index 0000000..5b5344d --- /dev/null +++ b/src/proxy/tests/client_masking_shape_classifier_fuzz_redteam_expected_fail_tests.rs @@ -0,0 +1,245 @@ +use super::*; +use crate::config::{UpstreamConfig, UpstreamType}; +use std::sync::Arc; +use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; +use tokio::time::Duration; + +fn new_upstream_manager(stats: Arc) -> Arc { + Arc::new(UpstreamManager::new( + vec![UpstreamConfig { + upstream_type: UpstreamType::Direct { + interface: None, + bind_addresses: None, + }, + weight: 1, + enabled: true, + scopes: String::new(), + selected_scope: String::new(), + }], + 1, + 1, + 1, + 1, + false, + stats, + )) +} + +async fn run_probe_capture( + body_sent: usize, + tls_len: u16, + enable_shape_hardening: bool, + floor: usize, + cap: usize, +) -> usize { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let mut cfg = ProxyConfig::default(); + cfg.general.beobachten = false; + cfg.censorship.mask = true; + cfg.censorship.mask_unix_sock = None; + cfg.censorship.mask_host = Some("127.0.0.1".to_string()); + cfg.censorship.mask_port = backend_addr.port(); + cfg.censorship.mask_shape_hardening = enable_shape_hardening; + cfg.censorship.mask_shape_bucket_floor_bytes = floor; + cfg.censorship.mask_shape_bucket_cap_bytes = cap; + + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got = Vec::new(); + let _ = tokio::time::timeout(Duration::from_secs(2), stream.read_to_end(&mut got)).await; + got.len() + }); + + let (server_side, mut client_side) = duplex(65536); + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.214:57014".parse().unwrap(), + Arc::new(cfg), + Arc::new(Stats::new()), + new_upstream_manager(Arc::new(Stats::new())), + Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), + Arc::new(BufferPool::new()), + Arc::new(SecureRandom::new()), + None, + Arc::new(RouteRuntimeController::new(RelayRouteMode::Direct)), + None, + Arc::new(UserIpTracker::new()), + Arc::new(BeobachtenStore::new()), + false, + )); + + let mut probe = vec![0u8; 5 + body_sent]; + probe[0] = 0x16; + probe[1] = 0x03; + probe[2] = 0x01; + probe[3..5].copy_from_slice(&tls_len.to_be_bytes()); + probe[5..].fill(0x66); + + client_side.write_all(&probe).await.unwrap(); + client_side.shutdown().await.unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(4), handler) + .await + .unwrap() + .unwrap(); + + tokio::time::timeout(Duration::from_secs(4), accept_task) + .await + .unwrap() + .unwrap() +} + +fn pearson_corr(xs: &[f64], ys: &[f64]) -> f64 { + if xs.len() != ys.len() || xs.is_empty() { + return 0.0; + } + + let n = xs.len() as f64; + let mean_x = xs.iter().sum::() / n; + let mean_y = ys.iter().sum::() / n; + + let mut cov = 0.0; + let mut var_x = 0.0; + let mut var_y = 0.0; + + for (&x, &y) in xs.iter().zip(ys.iter()) { + let dx = x - mean_x; + let dy = y - mean_y; + cov += dx * dy; + var_x += dx * dx; + var_y += dy * dy; + } + + if var_x == 0.0 || var_y == 0.0 { + return 0.0; + } + + cov / (var_x.sqrt() * var_y.sqrt()) +} + +fn lcg_sizes(count: usize, floor: usize, cap: usize) -> Vec { + let mut x = 0x9E3779B97F4A7C15u64; + let span = cap.saturating_mul(3); + let mut out = Vec::with_capacity(count + 8); + + for _ in 0..count { + x = x + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + let v = (x as usize) % span.max(1); + out.push(v); + } + + // Inject edge and boundary-heavy probes. + out.extend_from_slice(&[ + 0, + floor.saturating_sub(1), + floor, + floor.saturating_add(1), + cap.saturating_sub(1), + cap, + cap.saturating_add(1), + cap.saturating_mul(2), + ]); + out +} + +async fn collect_distribution( + sizes: &[usize], + hardening: bool, + floor: usize, + cap: usize, +) -> Vec { + let mut out = Vec::with_capacity(sizes.len()); + for &body in sizes { + out.push(run_probe_capture(body, 1200, hardening, floor, cap).await); + } + out +} + +#[tokio::test] +#[ignore = "red-team expected-fail: strict decorrelation target for hardened output lengths"] +async fn redteam_fuzz_01_hardened_output_length_correlation_should_be_below_0_2() { + let floor = 512usize; + let cap = 4096usize; + let sizes = lcg_sizes(24, floor, cap); + + let hardened = collect_distribution(&sizes, true, floor, cap).await; + let x: Vec = sizes.iter().map(|v| *v as f64).collect(); + let y_hard: Vec = hardened.iter().map(|v| *v as f64).collect(); + + let corr_hard = pearson_corr(&x, &y_hard).abs(); + println!("redteam_fuzz corr_hardened={corr_hard:.4} samples={}", sizes.len()); + + assert!( + corr_hard < 0.2, + "strict model expects near-zero size correlation; observed corr={corr_hard:.4}" + ); +} + +#[tokio::test] +#[ignore = "red-team expected-fail: strict class-collapse ratio target"] +async fn redteam_fuzz_02_hardened_unique_output_ratio_should_be_below_5pct() { + let floor = 512usize; + let cap = 4096usize; + let sizes = lcg_sizes(24, floor, cap); + + let hardened = collect_distribution(&sizes, true, floor, cap).await; + + let in_unique = { + let mut s = std::collections::BTreeSet::new(); + for v in &sizes { + s.insert(*v); + } + s.len() + }; + + let out_unique = { + let mut s = std::collections::BTreeSet::new(); + for v in &hardened { + s.insert(*v); + } + s.len() + }; + + let ratio = out_unique as f64 / in_unique as f64; + println!( + "redteam_fuzz unique_ratio_hardened={ratio:.4} out_unique={} in_unique={}", + out_unique, in_unique + ); + + assert!( + ratio <= 0.05, + "strict model expects near-total collapse; observed ratio={ratio:.4}" + ); +} + +#[tokio::test] +#[ignore = "red-team expected-fail: strict separability improvement target"] +async fn redteam_fuzz_03_hardened_signal_must_be_10x_lower_than_plain() { + let floor = 512usize; + let cap = 4096usize; + let sizes = lcg_sizes(24, floor, cap); + + let plain = collect_distribution(&sizes, false, floor, cap).await; + let hardened = collect_distribution(&sizes, true, floor, cap).await; + + let x: Vec = sizes.iter().map(|v| *v as f64).collect(); + let y_plain: Vec = plain.iter().map(|v| *v as f64).collect(); + let y_hard: Vec = hardened.iter().map(|v| *v as f64).collect(); + + let corr_plain = pearson_corr(&x, &y_plain).abs(); + let corr_hard = pearson_corr(&x, &y_hard).abs(); + + println!( + "redteam_fuzz corr_plain={corr_plain:.4} corr_hardened={corr_hard:.4}" + ); + + assert!( + corr_hard <= corr_plain * 0.1, + "strict model expects 10x suppression; plain={corr_plain:.4} hardened={corr_hard:.4}" + ); +} diff --git a/src/proxy/tests/client_masking_shape_hardening_adversarial_tests.rs b/src/proxy/tests/client_masking_shape_hardening_adversarial_tests.rs new file mode 100644 index 0000000..6ce57b3 --- /dev/null +++ b/src/proxy/tests/client_masking_shape_hardening_adversarial_tests.rs @@ -0,0 +1,179 @@ +use super::*; +use crate::config::{UpstreamConfig, UpstreamType}; +use std::sync::Arc; +use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; +use tokio::time::Duration; + +fn new_upstream_manager(stats: Arc) -> Arc { + Arc::new(UpstreamManager::new( + vec![UpstreamConfig { + upstream_type: UpstreamType::Direct { + interface: None, + bind_addresses: None, + }, + weight: 1, + enabled: true, + scopes: String::new(), + selected_scope: String::new(), + }], + 1, + 1, + 1, + 1, + false, + stats, + )) +} + +fn expected_bucket(total: usize, floor: usize, cap: usize) -> usize { + if total == 0 || floor == 0 || cap < floor { + return total; + } + + if total >= cap { + return total; + } + + let mut bucket = floor; + while bucket < total { + match bucket.checked_mul(2) { + Some(next) => bucket = next, + None => return total, + } + if bucket > cap { + return cap; + } + } + bucket +} + +async fn run_probe_capture( + body_sent: usize, + tls_len: u16, + enable_shape_hardening: bool, + floor: usize, + cap: usize, +) -> Vec { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let mut cfg = ProxyConfig::default(); + cfg.general.beobachten = false; + cfg.censorship.mask = true; + cfg.censorship.mask_unix_sock = None; + cfg.censorship.mask_host = Some("127.0.0.1".to_string()); + cfg.censorship.mask_port = backend_addr.port(); + cfg.censorship.mask_shape_hardening = enable_shape_hardening; + cfg.censorship.mask_shape_bucket_floor_bytes = floor; + cfg.censorship.mask_shape_bucket_cap_bytes = cap; + + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got = Vec::new(); + let _ = tokio::time::timeout(Duration::from_secs(2), stream.read_to_end(&mut got)).await; + got + }); + + let (server_side, mut client_side) = duplex(65536); + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.199:56999".parse().unwrap(), + Arc::new(cfg), + Arc::new(Stats::new()), + new_upstream_manager(Arc::new(Stats::new())), + Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), + Arc::new(BufferPool::new()), + Arc::new(SecureRandom::new()), + None, + Arc::new(RouteRuntimeController::new(RelayRouteMode::Direct)), + None, + Arc::new(UserIpTracker::new()), + Arc::new(BeobachtenStore::new()), + false, + )); + + let mut probe = vec![0u8; 5 + body_sent]; + probe[0] = 0x16; + probe[1] = 0x03; + probe[2] = 0x01; + probe[3..5].copy_from_slice(&tls_len.to_be_bytes()); + probe[5..].fill(0x66); + + client_side.write_all(&probe).await.unwrap(); + client_side.shutdown().await.unwrap(); + + let result = tokio::time::timeout(Duration::from_secs(4), handler) + .await + .unwrap() + .unwrap(); + assert!(result.is_ok()); + + tokio::time::timeout(Duration::from_secs(4), accept_task) + .await + .unwrap() + .unwrap() +} + +#[tokio::test] +async fn shape_hardening_non_power_of_two_cap_collapses_probe_classes() { + let floor = 1000usize; + let cap = 1500usize; + + let low = run_probe_capture(1195, 700, true, floor, cap).await; + let high = run_probe_capture(1494, 700, true, floor, cap).await; + + assert_eq!(low.len(), 1500); + assert_eq!(high.len(), 1500); +} + +#[tokio::test] +async fn shape_hardening_disabled_keeps_non_power_of_two_cap_lengths_distinct() { + let floor = 1000usize; + let cap = 1500usize; + + let low = run_probe_capture(1195, 700, false, floor, cap).await; + let high = run_probe_capture(1494, 700, false, floor, cap).await; + + assert_eq!(low.len(), 1200); + assert_eq!(high.len(), 1499); +} + +#[tokio::test] +async fn shape_hardening_parallel_stress_collapses_sub_cap_probes() { + let floor = 1000usize; + let cap = 1500usize; + let mut tasks = Vec::new(); + + for idx in 0..24usize { + let body = 1001 + (idx * 19 % 480); + tasks.push(tokio::spawn(async move { + run_probe_capture(body, 1200, true, floor, cap).await.len() + })); + } + + for task in tasks { + let observed = task.await.unwrap(); + assert_eq!(observed, 1500); + } +} + +#[tokio::test] +async fn shape_hardening_light_fuzz_matches_bucket_oracle() { + let floor = 512usize; + let cap = 4096usize; + + for step in 1usize..=36usize { + let total = 1 + (((step * 313) ^ (step << 7)) % (cap + 300)); + let body = total.saturating_sub(5); + + let got = run_probe_capture(body, 650, true, floor, cap).await; + let expected = expected_bucket(total, floor, cap); + assert_eq!( + got.len(), + expected, + "step={step} total={total} expected={expected} got={} ", + got.len() + ); + } +} diff --git a/src/proxy/tests/client_masking_shape_hardening_redteam_expected_fail_tests.rs b/src/proxy/tests/client_masking_shape_hardening_redteam_expected_fail_tests.rs new file mode 100644 index 0000000..a835d00 --- /dev/null +++ b/src/proxy/tests/client_masking_shape_hardening_redteam_expected_fail_tests.rs @@ -0,0 +1,236 @@ +use super::*; +use crate::config::{UpstreamConfig, UpstreamType}; +use std::sync::Arc; +use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; +use tokio::time::{Duration, Instant}; + +fn new_upstream_manager(stats: Arc) -> Arc { + Arc::new(UpstreamManager::new( + vec![UpstreamConfig { + upstream_type: UpstreamType::Direct { + interface: None, + bind_addresses: None, + }, + weight: 1, + enabled: true, + scopes: String::new(), + selected_scope: String::new(), + }], + 1, + 1, + 1, + 1, + false, + stats, + )) +} + +async fn run_probe_capture( + body_sent: usize, + tls_len: u16, + enable_shape_hardening: bool, + floor: usize, + cap: usize, +) -> Vec { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let mut cfg = ProxyConfig::default(); + cfg.general.beobachten = false; + cfg.censorship.mask = true; + cfg.censorship.mask_unix_sock = None; + cfg.censorship.mask_host = Some("127.0.0.1".to_string()); + cfg.censorship.mask_port = backend_addr.port(); + cfg.censorship.mask_shape_hardening = enable_shape_hardening; + cfg.censorship.mask_shape_bucket_floor_bytes = floor; + cfg.censorship.mask_shape_bucket_cap_bytes = cap; + + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got = Vec::new(); + let _ = tokio::time::timeout(Duration::from_secs(2), stream.read_to_end(&mut got)).await; + got + }); + + let (server_side, mut client_side) = duplex(65536); + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.211:57011".parse().unwrap(), + Arc::new(cfg), + Arc::new(Stats::new()), + new_upstream_manager(Arc::new(Stats::new())), + Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), + Arc::new(BufferPool::new()), + Arc::new(SecureRandom::new()), + None, + Arc::new(RouteRuntimeController::new(RelayRouteMode::Direct)), + None, + Arc::new(UserIpTracker::new()), + Arc::new(BeobachtenStore::new()), + false, + )); + + let mut probe = vec![0u8; 5 + body_sent]; + probe[0] = 0x16; + probe[1] = 0x03; + probe[2] = 0x01; + probe[3..5].copy_from_slice(&tls_len.to_be_bytes()); + probe[5..].fill(0x66); + + client_side.write_all(&probe).await.unwrap(); + client_side.shutdown().await.unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(4), handler) + .await + .unwrap() + .unwrap(); + + tokio::time::timeout(Duration::from_secs(4), accept_task) + .await + .unwrap() + .unwrap() +} + +async fn measure_reject_ms(body_sent: usize) -> u128 { + let mut cfg = ProxyConfig::default(); + cfg.general.beobachten = false; + cfg.timeouts.client_handshake = 1; + cfg.censorship.mask = true; + cfg.censorship.mask_unix_sock = None; + cfg.censorship.mask_host = Some("127.0.0.1".to_string()); + cfg.censorship.mask_port = 1; + cfg.censorship.server_hello_delay_min_ms = 700; + cfg.censorship.server_hello_delay_max_ms = 700; + + let (server_side, mut client_side) = duplex(65536); + let started = Instant::now(); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.212:57012".parse().unwrap(), + Arc::new(cfg), + Arc::new(Stats::new()), + new_upstream_manager(Arc::new(Stats::new())), + Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), + Arc::new(BufferPool::new()), + Arc::new(SecureRandom::new()), + None, + Arc::new(RouteRuntimeController::new(RelayRouteMode::Direct)), + None, + Arc::new(UserIpTracker::new()), + Arc::new(BeobachtenStore::new()), + false, + )); + + let mut probe = vec![0u8; 5 + body_sent]; + probe[0] = 0x16; + probe[1] = 0x03; + probe[2] = 0x01; + probe[3..5].copy_from_slice(&600u16.to_be_bytes()); + probe[5..].fill(0x44); + + client_side.write_all(&probe).await.unwrap(); + client_side.shutdown().await.unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(4), handler) + .await + .unwrap() + .unwrap(); + + started.elapsed().as_millis() +} + +#[tokio::test] +#[ignore = "red-team expected-fail: above-cap exact length still leaks classifier signal"] +async fn redteam_shape_01_above_cap_flows_should_collapse_to_single_class() { + let floor = 512usize; + let cap = 4096usize; + + let a = run_probe_capture(5000, 7000, true, floor, cap).await; + let b = run_probe_capture(6000, 7000, true, floor, cap).await; + + assert_eq!( + a.len(), + b.len(), + "strict anti-classifier model expects same backend length class above cap" + ); +} + +#[tokio::test] +#[ignore = "red-team expected-fail: current padding bytes are deterministic zeros"] +async fn redteam_shape_02_padding_tail_must_be_non_deterministic() { + let floor = 512usize; + let cap = 4096usize; + let got = run_probe_capture(17, 600, true, floor, cap).await; + + assert!( + got.len() > 22, + "test requires padding tail to exist" + ); + + let tail = &got[22..]; + assert!( + tail.iter().any(|b| *b != 0), + "padding tail is fully zeroed and thus deterministic" + ); +} + +#[tokio::test] +#[ignore = "red-team expected-fail: exact-floor probes still expose boundary class"] +async fn redteam_shape_03_exact_floor_input_should_not_be_fixed_point() { + let floor = 512usize; + let cap = 4096usize; + let got = run_probe_capture(507, 600, true, floor, cap).await; + + assert!( + got.len() > floor, + "strict model expects extra blur even when input lands exactly on floor" + ); +} + +#[tokio::test] +#[ignore = "red-team expected-fail: strict one-bucket collapse hypothesis"] +async fn redteam_shape_04_all_sub_cap_sizes_should_collapse_to_single_size() { + let floor = 512usize; + let cap = 4096usize; + let classes = [17usize, 63usize, 255usize, 511usize, 1023usize, 2047usize, 3071usize]; + + let mut observed = Vec::new(); + for body in classes { + observed.push(run_probe_capture(body, 1200, true, floor, cap).await.len()); + } + + let first = observed[0]; + for v in observed { + assert_eq!(v, first, "strict model expects one collapsed class across all sub-cap probes"); + } +} + +#[tokio::test] +#[ignore = "red-team expected-fail: over-strict micro-timing invariance"] +async fn redteam_shape_05_reject_timing_spread_should_be_under_2ms() { + let classes = [17usize, 511usize, 1023usize, 2047usize, 4095usize]; + let mut values = Vec::new(); + + for class in classes { + values.push(measure_reject_ms(class).await); + } + + let min = *values.iter().min().unwrap(); + let max = *values.iter().max().unwrap(); + assert!( + min == 700 && max == 700, + "strict model requires exact 700ms for every malformed class: min={min}ms max={max}ms" + ); +} + +#[test] +#[ignore = "red-team expected-fail: secure-by-default hypothesis"] +fn redteam_shape_06_shape_hardening_should_be_secure_by_default() { + let cfg = ProxyConfig::default(); + assert!( + cfg.censorship.mask_shape_hardening, + "strict model expects shape hardening enabled by default" + ); +} diff --git a/src/proxy/tests/masking_ab_envelope_blur_integration_security_tests.rs b/src/proxy/tests/masking_ab_envelope_blur_integration_security_tests.rs new file mode 100644 index 0000000..b82ea88 --- /dev/null +++ b/src/proxy/tests/masking_ab_envelope_blur_integration_security_tests.rs @@ -0,0 +1,241 @@ +use super::*; +use std::collections::BTreeSet; +use tokio::io::duplex; +use tokio::net::TcpListener; +use tokio::time::{Duration, Instant}; + +#[derive(Clone, Copy)] +enum PathClass { + ConnectFail, + ConnectSuccess, + SlowBackend, +} + +fn mean_ms(samples: &[u128]) -> f64 { + if samples.is_empty() { + return 0.0; + } + let sum: u128 = samples.iter().copied().sum(); + sum as f64 / samples.len() as f64 +} + +async fn measure_masking_duration_ms(path: PathClass, timing_norm_enabled: bool) -> u128 { + let mut config = ProxyConfig::default(); + config.general.beobachten = false; + config.censorship.mask = true; + config.censorship.mask_unix_sock = None; + config.censorship.mask_timing_normalization_enabled = timing_norm_enabled; + config.censorship.mask_timing_normalization_floor_ms = 220; + config.censorship.mask_timing_normalization_ceiling_ms = 260; + + let accept_task = match path { + PathClass::ConnectFail => { + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = 1; + None + } + PathClass::ConnectSuccess => { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = backend_addr.port(); + Some(tokio::spawn(async move { + let (_stream, _) = listener.accept().await.unwrap(); + })) + } + PathClass::SlowBackend => { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = backend_addr.port(); + Some(tokio::spawn(async move { + let (_stream, _) = listener.accept().await.unwrap(); + tokio::time::sleep(Duration::from_millis(320)).await; + })) + } + }; + + let (client_reader, _client_writer) = duplex(1024); + let (_client_visible_reader, client_visible_writer) = duplex(1024); + + let peer: SocketAddr = "198.51.100.230:57230".parse().unwrap(); + let local: SocketAddr = "127.0.0.1:443".parse().unwrap(); + let beobachten = BeobachtenStore::new(); + + let started = Instant::now(); + handle_bad_client( + client_reader, + client_visible_writer, + b"GET /ab-harness HTTP/1.1\r\nHost: x\r\n\r\n", + peer, + local, + &config, + &beobachten, + ) + .await; + + if let Some(task) = accept_task { + let _ = tokio::time::timeout(Duration::from_secs(2), task).await; + } + + started.elapsed().as_millis() +} + +async fn capture_above_cap_forwarded_len( + body_sent: usize, + above_cap_blur_enabled: bool, + above_cap_blur_max_bytes: usize, +) -> usize { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let mut config = ProxyConfig::default(); + config.general.beobachten = false; + config.censorship.mask = true; + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = backend_addr.port(); + config.censorship.mask_shape_hardening = true; + config.censorship.mask_shape_bucket_floor_bytes = 512; + config.censorship.mask_shape_bucket_cap_bytes = 4096; + config.censorship.mask_shape_above_cap_blur = above_cap_blur_enabled; + config.censorship.mask_shape_above_cap_blur_max_bytes = above_cap_blur_max_bytes; + + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got = Vec::new(); + let _ = tokio::time::timeout(Duration::from_secs(2), stream.read_to_end(&mut got)).await; + got.len() + }); + + let (client_reader, mut client_writer) = duplex(64 * 1024); + let (_client_visible_reader, client_visible_writer) = duplex(64 * 1024); + + let peer: SocketAddr = "198.51.100.231:57231".parse().unwrap(); + let local: SocketAddr = "127.0.0.1:443".parse().unwrap(); + let beobachten = BeobachtenStore::new(); + + let mut initial = vec![0u8; 5 + body_sent]; + initial[0] = 0x16; + initial[1] = 0x03; + initial[2] = 0x01; + initial[3..5].copy_from_slice(&7000u16.to_be_bytes()); + initial[5..].fill(0x5A); + + let fallback_task = tokio::spawn(async move { + handle_bad_client( + client_reader, + client_visible_writer, + &initial, + peer, + local, + &config, + &beobachten, + ) + .await; + }); + + client_writer.shutdown().await.unwrap(); + let _ = tokio::time::timeout(Duration::from_secs(4), fallback_task) + .await + .unwrap() + .unwrap(); + + tokio::time::timeout(Duration::from_secs(4), accept_task) + .await + .unwrap() + .unwrap() +} + +#[tokio::test] +async fn integration_ab_harness_envelope_and_blur_improve_obfuscation_vs_baseline() { + const ITER: usize = 8; + + let mut baseline_fail = Vec::with_capacity(ITER); + let mut baseline_success = Vec::with_capacity(ITER); + let mut baseline_slow = Vec::with_capacity(ITER); + + let mut hardened_fail = Vec::with_capacity(ITER); + let mut hardened_success = Vec::with_capacity(ITER); + let mut hardened_slow = Vec::with_capacity(ITER); + + for _ in 0..ITER { + baseline_fail.push(measure_masking_duration_ms(PathClass::ConnectFail, false).await); + baseline_success.push(measure_masking_duration_ms(PathClass::ConnectSuccess, false).await); + baseline_slow.push(measure_masking_duration_ms(PathClass::SlowBackend, false).await); + + hardened_fail.push(measure_masking_duration_ms(PathClass::ConnectFail, true).await); + hardened_success.push(measure_masking_duration_ms(PathClass::ConnectSuccess, true).await); + hardened_slow.push(measure_masking_duration_ms(PathClass::SlowBackend, true).await); + } + + let baseline_means = [ + mean_ms(&baseline_fail), + mean_ms(&baseline_success), + mean_ms(&baseline_slow), + ]; + let hardened_means = [ + mean_ms(&hardened_fail), + mean_ms(&hardened_success), + mean_ms(&hardened_slow), + ]; + + let baseline_range = baseline_means + .iter() + .copied() + .fold((f64::INFINITY, f64::NEG_INFINITY), |(mn, mx), v| { + (mn.min(v), mx.max(v)) + }); + let hardened_range = hardened_means + .iter() + .copied() + .fold((f64::INFINITY, f64::NEG_INFINITY), |(mn, mx), v| { + (mn.min(v), mx.max(v)) + }); + + let baseline_spread = baseline_range.1 - baseline_range.0; + let hardened_spread = hardened_range.1 - hardened_range.0; + + println!( + "ab_harness_timing baseline_means={:?} hardened_means={:?} baseline_spread={:.2} hardened_spread={:.2}", + baseline_means, hardened_means, baseline_spread, hardened_spread + ); + + assert!( + hardened_spread < baseline_spread, + "timing envelope should reduce cross-path mean spread: baseline={baseline_spread:.2} hardened={hardened_spread:.2}" + ); + + let mut baseline_a = BTreeSet::new(); + let mut baseline_b = BTreeSet::new(); + let mut hardened_a = BTreeSet::new(); + let mut hardened_b = BTreeSet::new(); + + for _ in 0..24 { + baseline_a.insert(capture_above_cap_forwarded_len(5000, false, 0).await); + baseline_b.insert(capture_above_cap_forwarded_len(5040, false, 0).await); + + hardened_a.insert(capture_above_cap_forwarded_len(5000, true, 96).await); + hardened_b.insert(capture_above_cap_forwarded_len(5040, true, 96).await); + } + + let baseline_overlap = baseline_a.intersection(&baseline_b).count(); + let hardened_overlap = hardened_a.intersection(&hardened_b).count(); + + println!( + "ab_harness_length baseline_overlap={} hardened_overlap={} baseline_a={} baseline_b={} hardened_a={} hardened_b={}", + baseline_overlap, + hardened_overlap, + baseline_a.len(), + baseline_b.len(), + hardened_a.len(), + hardened_b.len() + ); + + assert_eq!(baseline_overlap, 0, "baseline above-cap classes should be disjoint"); + assert!( + hardened_overlap > baseline_overlap, + "above-cap blur should increase cross-class overlap: baseline={} hardened={}", + baseline_overlap, + hardened_overlap + ); +} diff --git a/src/proxy/tests/masking_security_tests.rs b/src/proxy/tests/masking_security_tests.rs index bd543b5..9107ca9 100644 --- a/src/proxy/tests/masking_security_tests.rs +++ b/src/proxy/tests/masking_security_tests.rs @@ -1321,6 +1321,8 @@ async fn relay_to_mask_keeps_backend_to_client_flow_when_client_to_backend_stall false, 0, 0, + false, + 0, ) .await; }); @@ -1424,7 +1426,18 @@ async fn relay_to_mask_timeout_cancels_and_drops_all_io_endpoints() { let timed = timeout( Duration::from_millis(40), - relay_to_mask(reader, writer, mask_read, mask_write, b"", false, 0, 0), + relay_to_mask( + reader, + writer, + mask_read, + mask_write, + b"", + false, + 0, + 0, + false, + 0, + ), ) .await; diff --git a/src/proxy/tests/masking_shape_above_cap_blur_security_tests.rs b/src/proxy/tests/masking_shape_above_cap_blur_security_tests.rs new file mode 100644 index 0000000..d2d522f --- /dev/null +++ b/src/proxy/tests/masking_shape_above_cap_blur_security_tests.rs @@ -0,0 +1,102 @@ +use super::*; +use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; +use tokio::time::Duration; + +async fn capture_forwarded_len( + body_sent: usize, + shape_hardening: bool, + above_cap_blur: bool, + above_cap_blur_max_bytes: usize, +) -> usize { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let mut config = ProxyConfig::default(); + config.general.beobachten = false; + config.censorship.mask = true; + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = backend_addr.port(); + config.censorship.mask_shape_hardening = shape_hardening; + config.censorship.mask_shape_bucket_floor_bytes = 512; + config.censorship.mask_shape_bucket_cap_bytes = 4096; + config.censorship.mask_shape_above_cap_blur = above_cap_blur; + config.censorship.mask_shape_above_cap_blur_max_bytes = above_cap_blur_max_bytes; + + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got = Vec::new(); + let _ = tokio::time::timeout(Duration::from_secs(2), stream.read_to_end(&mut got)).await; + got.len() + }); + + let (server_reader, mut client_writer) = duplex(64 * 1024); + let (_client_visible_reader, client_visible_writer) = duplex(64 * 1024); + let peer: SocketAddr = "198.51.100.220:57120".parse().unwrap(); + let local: SocketAddr = "127.0.0.1:443".parse().unwrap(); + let beobachten = BeobachtenStore::new(); + + let mut probe = vec![0u8; 5 + body_sent]; + probe[0] = 0x16; + probe[1] = 0x03; + probe[2] = 0x01; + probe[3..5].copy_from_slice(&7000u16.to_be_bytes()); + probe[5..].fill(0x5A); + + let fallback = tokio::spawn(async move { + handle_bad_client( + server_reader, + client_visible_writer, + &probe, + peer, + local, + &config, + &beobachten, + ) + .await; + }); + + client_writer.shutdown().await.unwrap(); + let _ = tokio::time::timeout(Duration::from_secs(4), fallback) + .await + .unwrap() + .unwrap(); + + tokio::time::timeout(Duration::from_secs(4), accept_task) + .await + .unwrap() + .unwrap() +} + +#[tokio::test] +async fn above_cap_blur_disabled_keeps_exact_above_cap_length() { + let body_sent = 5000usize; + let observed = capture_forwarded_len(body_sent, true, false, 0).await; + assert_eq!(observed, 5 + body_sent); +} + +#[tokio::test] +async fn above_cap_blur_enabled_adds_bounded_random_tail() { + let body_sent = 5000usize; + let base = 5 + body_sent; + let max_extra = 64usize; + + let mut saw_extra = false; + for _ in 0..20 { + let observed = capture_forwarded_len(body_sent, true, true, max_extra).await; + assert!(observed >= base, "observed={observed} base={base}"); + assert!( + observed <= base + max_extra, + "observed={observed} base={} max_extra={max_extra}", + base + ); + if observed > base { + saw_extra = true; + } + } + + assert!( + saw_extra, + "at least one run should produce above-cap blur bytes under randomization" + ); +} diff --git a/src/proxy/tests/masking_shape_hardening_adversarial_tests.rs b/src/proxy/tests/masking_shape_hardening_adversarial_tests.rs new file mode 100644 index 0000000..eade371 --- /dev/null +++ b/src/proxy/tests/masking_shape_hardening_adversarial_tests.rs @@ -0,0 +1,129 @@ +use super::*; +use tokio::io::{duplex, empty, sink, AsyncReadExt, AsyncWrite}; + +struct CountingWriter { + written: usize, +} + +impl CountingWriter { + fn new() -> Self { + Self { written: 0 } + } +} + +impl AsyncWrite for CountingWriter { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + self.written = self.written.saturating_add(buf.len()); + std::task::Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } +} + +#[test] +fn shape_bucket_clamps_to_cap_when_next_power_of_two_exceeds_cap() { + let bucket = next_mask_shape_bucket(1200, 1000, 1500); + assert_eq!(bucket, 1500); +} + +#[test] +fn shape_bucket_never_drops_below_total_for_valid_ranges() { + for total in [1usize, 32, 127, 512, 999, 1000, 1001, 1499, 1500, 1501] { + let bucket = next_mask_shape_bucket(total, 1000, 1500); + assert!(bucket >= total || total >= 1500, "bucket={bucket} total={total}"); + } +} + +#[tokio::test] +async fn maybe_write_shape_padding_writes_exact_delta() { + let mut writer = CountingWriter::new(); + maybe_write_shape_padding(&mut writer, 1200, true, 1000, 1500, false, 0).await; + assert_eq!(writer.written, 300); +} + +#[tokio::test] +async fn maybe_write_shape_padding_skips_when_disabled() { + let mut writer = CountingWriter::new(); + maybe_write_shape_padding(&mut writer, 1200, false, 1000, 1500, false, 0).await; + assert_eq!(writer.written, 0); +} + +#[tokio::test] +async fn relay_to_mask_applies_cap_clamped_padding_for_non_power_of_two_cap() { + let initial = vec![0x16, 0x03, 0x01, 0x04, 0x00]; + let extra = vec![0xAB; 1195]; + + let (client_reader, mut client_writer) = duplex(4096); + let (mut mask_observer, mask_writer) = duplex(4096); + + let relay = tokio::spawn(async move { + relay_to_mask( + client_reader, + sink(), + empty(), + mask_writer, + &initial, + true, + 1000, + 1500, + false, + 0, + ) + .await; + }); + + client_writer.write_all(&extra).await.unwrap(); + client_writer.shutdown().await.unwrap(); + + relay.await.unwrap(); + + let mut observed = Vec::new(); + mask_observer.read_to_end(&mut observed).await.unwrap(); + assert_eq!(observed.len(), 1500); + assert_eq!(&observed[..5], &[0x16, 0x03, 0x01, 0x04, 0x00]); + assert!(observed[5..1200].iter().all(|b| *b == 0xAB)); + assert_eq!(observed[1200..].len(), 300); +} + +#[test] +fn shape_bucket_light_fuzz_monotonicity_and_bounds() { + let floor = 512usize; + let cap = 4096usize; + let mut prev = 0usize; + + for step in 1usize..=3000 { + let total = ((step * 37) ^ (step << 3)) % (cap + 512); + let bucket = next_mask_shape_bucket(total, floor, cap); + + if total < cap { + assert!(bucket >= total, "bucket={bucket} total={total}"); + assert!(bucket <= cap, "bucket={bucket} cap={cap}"); + } else { + assert_eq!(bucket, total, "above-cap totals must remain unchanged"); + } + + if total >= prev { + // For non-decreasing inputs, bucket class must not regress. + let prev_bucket = next_mask_shape_bucket(prev, floor, cap); + assert!(bucket >= prev_bucket || total >= cap); + } + + prev = total; + } +} diff --git a/src/proxy/tests/masking_timing_normalization_security_tests.rs b/src/proxy/tests/masking_timing_normalization_security_tests.rs new file mode 100644 index 0000000..a5959b4 --- /dev/null +++ b/src/proxy/tests/masking_timing_normalization_security_tests.rs @@ -0,0 +1,120 @@ +use super::*; +use tokio::io::duplex; +use tokio::net::TcpListener; +use tokio::time::{Duration, Instant}; + +#[derive(Clone, Copy)] +enum MaskPath { + ConnectFail, + ConnectSuccess, + SlowBackend, +} + +async fn measure_bad_client_duration_ms(path: MaskPath, floor_ms: u64, ceiling_ms: u64) -> u128 { + let mut config = ProxyConfig::default(); + config.general.beobachten = false; + config.censorship.mask = true; + config.censorship.mask_unix_sock = None; + config.censorship.mask_timing_normalization_enabled = true; + config.censorship.mask_timing_normalization_floor_ms = floor_ms; + config.censorship.mask_timing_normalization_ceiling_ms = ceiling_ms; + + let accept_task = match path { + MaskPath::ConnectFail => { + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = 1; + None + } + MaskPath::ConnectSuccess => { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = backend_addr.port(); + Some(tokio::spawn(async move { + let (_stream, _) = listener.accept().await.unwrap(); + })) + } + MaskPath::SlowBackend => { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = backend_addr.port(); + Some(tokio::spawn(async move { + let (_stream, _) = listener.accept().await.unwrap(); + tokio::time::sleep(Duration::from_millis(320)).await; + })) + } + }; + + let (client_reader, _client_writer) = duplex(1024); + let (_client_visible_reader, client_visible_writer) = duplex(1024); + + let peer: SocketAddr = "198.51.100.221:57121".parse().unwrap(); + let local: SocketAddr = "127.0.0.1:443".parse().unwrap(); + let beobachten = BeobachtenStore::new(); + + let started = Instant::now(); + handle_bad_client( + client_reader, + client_visible_writer, + b"GET /timing-normalize HTTP/1.1\r\nHost: x\r\n\r\n", + peer, + local, + &config, + &beobachten, + ) + .await; + + if let Some(task) = accept_task { + let _ = tokio::time::timeout(Duration::from_secs(2), task).await; + } + + started.elapsed().as_millis() +} + +#[tokio::test] +async fn timing_normalization_envelope_applies_to_connect_fail_and_success() { + let floor = 160u64; + let ceiling = 180u64; + + let fail = measure_bad_client_duration_ms(MaskPath::ConnectFail, floor, ceiling).await; + let success = measure_bad_client_duration_ms(MaskPath::ConnectSuccess, floor, ceiling).await; + + assert!( + fail >= floor as u128, + "connect-fail duration below floor: {fail}ms < {floor}ms" + ); + assert!( + fail <= (ceiling + 60) as u128, + "connect-fail duration exceeded relaxed ceiling: {fail}ms > {}ms", + ceiling + 60 + ); + + assert!( + success >= floor as u128, + "connect-success duration below floor: {success}ms < {floor}ms" + ); + assert!( + success <= (ceiling + 60) as u128, + "connect-success duration exceeded relaxed ceiling: {success}ms > {}ms", + ceiling + 60 + ); + + let delta = fail.abs_diff(success); + assert!( + delta <= 80, + "timing normalization should reduce path divergence (delta={}ms)", + delta + ); +} + +#[tokio::test] +async fn timing_normalization_does_not_sleep_if_path_already_exceeds_ceiling() { + let floor = 120u64; + let ceiling = 150u64; + + let slow = measure_bad_client_duration_ms(MaskPath::SlowBackend, floor, ceiling).await; + + assert!(slow >= 280, "slow backend path should remain slow (got {slow}ms)"); + assert!(slow <= 520, "slow backend path should remain bounded in tests (got {slow}ms)"); +} diff --git a/src/proxy/tests/masking_timing_sidechannel_redteam_expected_fail_tests.rs b/src/proxy/tests/masking_timing_sidechannel_redteam_expected_fail_tests.rs new file mode 100644 index 0000000..3c4a342 --- /dev/null +++ b/src/proxy/tests/masking_timing_sidechannel_redteam_expected_fail_tests.rs @@ -0,0 +1,200 @@ +use super::*; +use tokio::io::duplex; +use tokio::net::TcpListener; +use tokio::time::{Duration, Instant}; + +#[derive(Clone, Copy)] +enum TimingPath { + ConnectFail, + ConnectSuccess, + SlowBackend, +} + +async fn measure_path_duration_ms(path: TimingPath) -> u128 { + let mut config = ProxyConfig::default(); + config.general.beobachten = false; + config.censorship.mask = true; + config.censorship.mask_unix_sock = None; + + let maybe_accept = match path { + TimingPath::ConnectFail => { + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = 1; + None + } + TimingPath::ConnectSuccess => { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = backend_addr.port(); + Some(tokio::spawn(async move { + let (_stream, _) = listener.accept().await.unwrap(); + })) + } + TimingPath::SlowBackend => { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = backend_addr.port(); + Some(tokio::spawn(async move { + let (_stream, _) = listener.accept().await.unwrap(); + tokio::time::sleep(Duration::from_millis(350)).await; + })) + } + }; + + let peer: SocketAddr = "198.51.100.213:57013".parse().unwrap(); + let local: SocketAddr = "127.0.0.1:443".parse().unwrap(); + let beobachten = BeobachtenStore::new(); + + let (client_reader, _client_writer) = duplex(1024); + let (_client_visible_reader, client_visible_writer) = duplex(1024); + + let started = Instant::now(); + handle_bad_client( + client_reader, + client_visible_writer, + b"GET /timing HTTP/1.1\r\nHost: x\r\n\r\n", + peer, + local, + &config, + &beobachten, + ) + .await; + + if let Some(task) = maybe_accept { + let _ = tokio::time::timeout(Duration::from_secs(2), task).await; + } + + started.elapsed().as_millis() +} + +fn summarize(values: &[u128]) -> (u128, u128, f64) { + let min = *values.iter().min().unwrap_or(&0); + let max = *values.iter().max().unwrap_or(&0); + let sum: u128 = values.iter().copied().sum(); + let mean = if values.is_empty() { + 0.0 + } else { + sum as f64 / values.len() as f64 + }; + (min, max, mean) +} + +#[tokio::test] +#[ignore = "red-team expected-fail: strict path-indistinguishability target"] +async fn redteam_timing_01_connect_fail_success_slow_backend_must_be_within_10ms() { + const ITER: usize = 8; + + let mut fail = Vec::with_capacity(ITER); + let mut success = Vec::with_capacity(ITER); + let mut slow = Vec::with_capacity(ITER); + + for _ in 0..ITER { + fail.push(measure_path_duration_ms(TimingPath::ConnectFail).await); + success.push(measure_path_duration_ms(TimingPath::ConnectSuccess).await); + slow.push(measure_path_duration_ms(TimingPath::SlowBackend).await); + } + + let (_, fail_max, fail_mean) = summarize(&fail); + let (_, success_max, success_mean) = summarize(&success); + let (_, slow_max, slow_mean) = summarize(&slow); + + let global_min = *fail + .iter() + .chain(success.iter()) + .chain(slow.iter()) + .min() + .unwrap(); + let global_max = *fail + .iter() + .chain(success.iter()) + .chain(slow.iter()) + .max() + .unwrap(); + + println!( + "redteam_timing path=connect_fail mean_ms={:.2} max_ms={}", + fail_mean, fail_max + ); + println!( + "redteam_timing path=connect_success mean_ms={:.2} max_ms={}", + success_mean, success_max + ); + println!( + "redteam_timing path=slow_backend mean_ms={:.2} max_ms={}", + slow_mean, slow_max + ); + + assert!( + global_max.saturating_sub(global_min) <= 10, + "strict model expects all masking outcomes in one 10ms bucket: min={global_min} max={global_max}" + ); +} + +#[tokio::test] +#[ignore = "red-team expected-fail: strict classifier-separability target"] +async fn redteam_timing_02_path_classifier_centroid_accuracy_must_be_below_40pct() { + const ITER: usize = 12; + + let mut fail = Vec::with_capacity(ITER); + let mut success = Vec::with_capacity(ITER); + let mut slow = Vec::with_capacity(ITER); + + for _ in 0..ITER { + fail.push(measure_path_duration_ms(TimingPath::ConnectFail).await as f64); + success.push(measure_path_duration_ms(TimingPath::ConnectSuccess).await as f64); + slow.push(measure_path_duration_ms(TimingPath::SlowBackend).await as f64); + } + + let mean = |v: &Vec| -> f64 { v.iter().sum::() / v.len() as f64 }; + let c_fail = mean(&fail); + let c_success = mean(&success); + let c_slow = mean(&slow); + + let mut correct = 0usize; + let mut total = 0usize; + + let classify = |x: f64, c0: f64, c1: f64, c2: f64| -> usize { + let d0 = (x - c0).abs(); + let d1 = (x - c1).abs(); + let d2 = (x - c2).abs(); + if d0 <= d1 && d0 <= d2 { + 0 + } else if d1 <= d0 && d1 <= d2 { + 1 + } else { + 2 + } + }; + + for &x in &fail { + total += 1; + if classify(x, c_fail, c_success, c_slow) == 0 { + correct += 1; + } + } + for &x in &success { + total += 1; + if classify(x, c_fail, c_success, c_slow) == 1 { + correct += 1; + } + } + for &x in &slow { + total += 1; + if classify(x, c_fail, c_success, c_slow) == 2 { + correct += 1; + } + } + + let accuracy = correct as f64 / total as f64; + println!( + "redteam_timing_classifier accuracy={:.3} c_fail={:.2} c_success={:.2} c_slow={:.2}", + accuracy, c_fail, c_success, c_slow + ); + + assert!( + accuracy <= 0.40, + "strict model expects poor classifier; observed accuracy={accuracy:.3}" + ); +}