Add comprehensive security tests for masking and shape hardening features

- Introduced red-team expected-fail tests for client masking shape hardening.
- Added integration tests for masking AB envelope blur to improve obfuscation.
- Implemented masking security tests to validate the behavior of masking under various conditions.
- Created tests for masking shape above-cap blur to ensure proper functionality.
- Developed adversarial tests for masking shape hardening to evaluate robustness against attacks.
- Added timing normalization security tests to assess the effectiveness of timing obfuscation.
- Implemented red-team expected-fail tests for timing side-channel vulnerabilities.
This commit is contained in:
David Osipov 2026-03-21 00:30:51 +04:00
parent 8814854ae4
commit bb355e916f
No known key found for this signature in database
GPG Key ID: 0E55C4A47454E82E
19 changed files with 1937 additions and 27 deletions

2
Cargo.lock generated
View File

@ -2661,7 +2661,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]] [[package]]
name = "telemt" name = "telemt"
version = "4.3.29-David6" version = "4.3.29-David7"
dependencies = [ dependencies = [
"aes", "aes",
"anyhow", "anyhow",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "4.3.29-David6" version = "4.3.29-David7"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -260,9 +260,14 @@ This document lists all configuration keys accepted by `config.toml`.
| tls_full_cert_ttl_secs | `u64` | `90` | — | TTL for sending full cert payload per (domain, client IP) tuple. | | tls_full_cert_ttl_secs | `u64` | `90` | — | TTL for sending full cert payload per (domain, client IP) tuple. |
| alpn_enforce | `bool` | `true` | — | Enforces ALPN echo behavior based on client preference. | | alpn_enforce | `bool` | `true` | — | Enforces ALPN echo behavior based on client preference. |
| mask_proxy_protocol | `u8` | `0` | — | PROXY protocol mode for mask backend (`0` disabled, `1` v1, `2` v2). | | mask_proxy_protocol | `u8` | `0` | — | PROXY protocol mode for mask backend (`0` disabled, `1` v1, `2` v2). |
| mask_shape_hardening | `bool` | `false` | — | Enables client->mask shape-channel hardening by applying controlled tail padding to bucket boundaries on mask relay shutdown. | | mask_shape_hardening | `bool` | `true` | — | Enables client->mask shape-channel hardening by applying controlled tail padding to bucket boundaries on mask relay shutdown. |
| mask_shape_bucket_floor_bytes | `usize` | `512` | Must be `> 0`; should be `<= mask_shape_bucket_cap_bytes`. | Minimum bucket size used by shape-channel hardening. | | mask_shape_bucket_floor_bytes | `usize` | `512` | Must be `> 0`; should be `<= mask_shape_bucket_cap_bytes`. | Minimum bucket size used by shape-channel hardening. |
| mask_shape_bucket_cap_bytes | `usize` | `4096` | Must be `>= mask_shape_bucket_floor_bytes`. | Maximum bucket size used by shape-channel hardening; traffic above cap is not padded further. | | mask_shape_bucket_cap_bytes | `usize` | `4096` | Must be `>= mask_shape_bucket_floor_bytes`. | Maximum bucket size used by shape-channel hardening; traffic above cap is not padded further. |
| mask_shape_above_cap_blur | `bool` | `false` | Requires `mask_shape_hardening = true`; requires `mask_shape_above_cap_blur_max_bytes > 0`. | Adds bounded randomized tail bytes even when forwarded size already exceeds cap. |
| mask_shape_above_cap_blur_max_bytes | `usize` | `512` | Must be `<= 1048576`; must be `> 0` when `mask_shape_above_cap_blur = true`. | Maximum randomized extra bytes appended above cap. |
| mask_timing_normalization_enabled | `bool` | `false` | Requires `mask_timing_normalization_floor_ms > 0`; requires `ceiling >= floor`. | Enables timing envelope normalization on masking outcomes. |
| mask_timing_normalization_floor_ms | `u64` | `0` | Must be `> 0` when timing normalization is enabled; must be `<= ceiling`. | Lower bound (ms) for masking outcome normalization target. |
| mask_timing_normalization_ceiling_ms | `u64` | `0` | Must be `>= floor`; must be `<= 60000`. | Upper bound (ms) for masking outcome normalization target. |
### Shape-channel hardening notes (`[censorship]`) ### Shape-channel hardening notes (`[censorship]`)
@ -283,14 +288,36 @@ Practical trade-offs:
- Better anti-fingerprinting on size/shape channel. - Better anti-fingerprinting on size/shape channel.
- Slightly higher egress overhead for small probes due to padding. - Slightly higher egress overhead for small probes due to padding.
- Behavior is intentionally conservative and disabled by default. - Behavior is intentionally conservative and enabled by default.
Recommended starting profile: Recommended starting profile:
- `mask_shape_hardening = true` - `mask_shape_hardening = true` (default)
- `mask_shape_bucket_floor_bytes = 512` - `mask_shape_bucket_floor_bytes = 512`
- `mask_shape_bucket_cap_bytes = 4096` - `mask_shape_bucket_cap_bytes = 4096`
### Above-cap blur notes (`[censorship]`)
`mask_shape_above_cap_blur` adds a second-stage blur for very large probes that are already above `mask_shape_bucket_cap_bytes`.
- A random tail in `[0, mask_shape_above_cap_blur_max_bytes]` is appended.
- This reduces exact-size leakage above cap at bounded overhead.
- Keep `mask_shape_above_cap_blur_max_bytes` conservative to avoid unnecessary egress growth.
### Timing normalization envelope notes (`[censorship]`)
`mask_timing_normalization_enabled` smooths timing differences between masking outcomes by applying a target duration envelope.
- A random target is selected in `[mask_timing_normalization_floor_ms, mask_timing_normalization_ceiling_ms]`.
- Fast paths are delayed up to the selected target.
- Slow paths are not forced to finish by the ceiling (the envelope is best-effort shaping, not truncation).
Recommended starting profile for timing shaping:
- `mask_timing_normalization_enabled = true`
- `mask_timing_normalization_floor_ms = 180`
- `mask_timing_normalization_ceiling_ms = 320`
If your backend or network is very bandwidth-constrained, reduce cap first. If probes are still too distinguishable in your environment, increase floor gradually. If your backend or network is very bandwidth-constrained, reduce cap first. If probes are still too distinguishable in your environment, increase floor gradually.
## [access] ## [access]

View File

@ -515,7 +515,7 @@ pub(crate) fn default_alpn_enforce() -> bool {
} }
pub(crate) fn default_mask_shape_hardening() -> bool { pub(crate) fn default_mask_shape_hardening() -> bool {
false true
} }
pub(crate) fn default_mask_shape_bucket_floor_bytes() -> usize { pub(crate) fn default_mask_shape_bucket_floor_bytes() -> usize {
@ -526,6 +526,26 @@ pub(crate) fn default_mask_shape_bucket_cap_bytes() -> usize {
4096 4096
} }
pub(crate) fn default_mask_shape_above_cap_blur() -> bool {
false
}
pub(crate) fn default_mask_shape_above_cap_blur_max_bytes() -> usize {
512
}
pub(crate) fn default_mask_timing_normalization_enabled() -> bool {
false
}
pub(crate) fn default_mask_timing_normalization_floor_ms() -> u64 {
0
}
pub(crate) fn default_mask_timing_normalization_ceiling_ms() -> u64 {
0
}
pub(crate) fn default_stun_servers() -> Vec<String> { pub(crate) fn default_stun_servers() -> Vec<String> {
vec![ vec![
"stun.l.google.com:5349".to_string(), "stun.l.google.com:5349".to_string(),

View File

@ -585,6 +585,16 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
!= new.censorship.mask_shape_bucket_floor_bytes != new.censorship.mask_shape_bucket_floor_bytes
|| old.censorship.mask_shape_bucket_cap_bytes || old.censorship.mask_shape_bucket_cap_bytes
!= new.censorship.mask_shape_bucket_cap_bytes != new.censorship.mask_shape_bucket_cap_bytes
|| old.censorship.mask_shape_above_cap_blur
!= new.censorship.mask_shape_above_cap_blur
|| old.censorship.mask_shape_above_cap_blur_max_bytes
!= new.censorship.mask_shape_above_cap_blur_max_bytes
|| old.censorship.mask_timing_normalization_enabled
!= new.censorship.mask_timing_normalization_enabled
|| old.censorship.mask_timing_normalization_floor_ms
!= new.censorship.mask_timing_normalization_floor_ms
|| old.censorship.mask_timing_normalization_ceiling_ms
!= new.censorship.mask_timing_normalization_ceiling_ms
{ {
warned = true; warned = true;
warn!("config reload: censorship settings changed; restart required"); warn!("config reload: censorship settings changed; restart required");

View File

@ -384,6 +384,71 @@ impl ProxyConfig {
)); ));
} }
if config.censorship.mask_shape_bucket_floor_bytes == 0 {
return Err(ProxyError::Config(
"censorship.mask_shape_bucket_floor_bytes must be > 0".to_string(),
));
}
if config.censorship.mask_shape_bucket_cap_bytes
< config.censorship.mask_shape_bucket_floor_bytes
{
return Err(ProxyError::Config(
"censorship.mask_shape_bucket_cap_bytes must be >= censorship.mask_shape_bucket_floor_bytes"
.to_string(),
));
}
if config.censorship.mask_shape_above_cap_blur
&& !config.censorship.mask_shape_hardening
{
return Err(ProxyError::Config(
"censorship.mask_shape_above_cap_blur requires censorship.mask_shape_hardening = true"
.to_string(),
));
}
if config.censorship.mask_shape_above_cap_blur
&& config.censorship.mask_shape_above_cap_blur_max_bytes == 0
{
return Err(ProxyError::Config(
"censorship.mask_shape_above_cap_blur_max_bytes must be > 0 when censorship.mask_shape_above_cap_blur is enabled"
.to_string(),
));
}
if config.censorship.mask_shape_above_cap_blur_max_bytes > 1_048_576 {
return Err(ProxyError::Config(
"censorship.mask_shape_above_cap_blur_max_bytes must be <= 1048576"
.to_string(),
));
}
if config.censorship.mask_timing_normalization_ceiling_ms
< config.censorship.mask_timing_normalization_floor_ms
{
return Err(ProxyError::Config(
"censorship.mask_timing_normalization_ceiling_ms must be >= censorship.mask_timing_normalization_floor_ms"
.to_string(),
));
}
if config.censorship.mask_timing_normalization_enabled
&& config.censorship.mask_timing_normalization_floor_ms == 0
{
return Err(ProxyError::Config(
"censorship.mask_timing_normalization_floor_ms must be > 0 when censorship.mask_timing_normalization_enabled is true"
.to_string(),
));
}
if config.censorship.mask_timing_normalization_ceiling_ms > 60_000 {
return Err(ProxyError::Config(
"censorship.mask_timing_normalization_ceiling_ms must be <= 60000"
.to_string(),
));
}
if config.timeouts.relay_client_idle_soft_secs == 0 { if config.timeouts.relay_client_idle_soft_secs == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"timeouts.relay_client_idle_soft_secs must be > 0".to_string(), "timeouts.relay_client_idle_soft_secs must be > 0".to_string(),
@ -1044,6 +1109,10 @@ mod load_idle_policy_tests;
#[path = "tests/load_security_tests.rs"] #[path = "tests/load_security_tests.rs"]
mod load_security_tests; mod load_security_tests;
#[cfg(test)]
#[path = "tests/load_mask_shape_security_tests.rs"]
mod load_mask_shape_security_tests;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -0,0 +1,195 @@
use super::*;
use std::fs;
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
fn write_temp_config(contents: &str) -> PathBuf {
let nonce = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time must be after unix epoch")
.as_nanos();
let path = std::env::temp_dir().join(format!("telemt-load-mask-shape-security-{nonce}.toml"));
fs::write(&path, contents).expect("temp config write must succeed");
path
}
fn remove_temp_config(path: &PathBuf) {
let _ = fs::remove_file(path);
}
#[test]
fn load_rejects_zero_mask_shape_bucket_floor_bytes() {
let path = write_temp_config(
r#"
[censorship]
mask_shape_bucket_floor_bytes = 0
mask_shape_bucket_cap_bytes = 4096
"#,
);
let err =
ProxyConfig::load(&path).expect_err("zero mask_shape_bucket_floor_bytes must be rejected");
let msg = err.to_string();
assert!(
msg.contains("censorship.mask_shape_bucket_floor_bytes must be > 0"),
"error must explain floor>0 invariant, got: {msg}"
);
remove_temp_config(&path);
}
#[test]
fn load_rejects_mask_shape_bucket_cap_less_than_floor() {
let path = write_temp_config(
r#"
[censorship]
mask_shape_bucket_floor_bytes = 1024
mask_shape_bucket_cap_bytes = 512
"#,
);
let err =
ProxyConfig::load(&path).expect_err("mask_shape_bucket_cap_bytes < floor must be rejected");
let msg = err.to_string();
assert!(
msg.contains(
"censorship.mask_shape_bucket_cap_bytes must be >= censorship.mask_shape_bucket_floor_bytes"
),
"error must explain cap>=floor invariant, got: {msg}"
);
remove_temp_config(&path);
}
#[test]
fn load_accepts_mask_shape_bucket_cap_equal_to_floor() {
let path = write_temp_config(
r#"
[censorship]
mask_shape_hardening = true
mask_shape_bucket_floor_bytes = 1024
mask_shape_bucket_cap_bytes = 1024
"#,
);
let cfg = ProxyConfig::load(&path).expect("equal cap and floor must be accepted");
assert!(cfg.censorship.mask_shape_hardening);
assert_eq!(cfg.censorship.mask_shape_bucket_floor_bytes, 1024);
assert_eq!(cfg.censorship.mask_shape_bucket_cap_bytes, 1024);
remove_temp_config(&path);
}
#[test]
fn load_rejects_above_cap_blur_when_shape_hardening_disabled() {
let path = write_temp_config(
r#"
[censorship]
mask_shape_hardening = false
mask_shape_above_cap_blur = true
mask_shape_above_cap_blur_max_bytes = 64
"#,
);
let err = ProxyConfig::load(&path)
.expect_err("above-cap blur must require shape hardening enabled");
let msg = err.to_string();
assert!(
msg.contains("censorship.mask_shape_above_cap_blur requires censorship.mask_shape_hardening = true"),
"error must explain blur prerequisite, got: {msg}"
);
remove_temp_config(&path);
}
#[test]
fn load_rejects_above_cap_blur_with_zero_max_bytes() {
let path = write_temp_config(
r#"
[censorship]
mask_shape_hardening = true
mask_shape_above_cap_blur = true
mask_shape_above_cap_blur_max_bytes = 0
"#,
);
let err = ProxyConfig::load(&path)
.expect_err("above-cap blur max bytes must be > 0 when enabled");
let msg = err.to_string();
assert!(
msg.contains("censorship.mask_shape_above_cap_blur_max_bytes must be > 0 when censorship.mask_shape_above_cap_blur is enabled"),
"error must explain blur max bytes invariant, got: {msg}"
);
remove_temp_config(&path);
}
#[test]
fn load_rejects_timing_normalization_floor_zero_when_enabled() {
let path = write_temp_config(
r#"
[censorship]
mask_timing_normalization_enabled = true
mask_timing_normalization_floor_ms = 0
mask_timing_normalization_ceiling_ms = 200
"#,
);
let err = ProxyConfig::load(&path)
.expect_err("timing normalization floor must be > 0 when enabled");
let msg = err.to_string();
assert!(
msg.contains("censorship.mask_timing_normalization_floor_ms must be > 0 when censorship.mask_timing_normalization_enabled is true"),
"error must explain timing floor invariant, got: {msg}"
);
remove_temp_config(&path);
}
#[test]
fn load_rejects_timing_normalization_ceiling_below_floor() {
let path = write_temp_config(
r#"
[censorship]
mask_timing_normalization_enabled = true
mask_timing_normalization_floor_ms = 220
mask_timing_normalization_ceiling_ms = 200
"#,
);
let err = ProxyConfig::load(&path)
.expect_err("timing normalization ceiling must be >= floor");
let msg = err.to_string();
assert!(
msg.contains("censorship.mask_timing_normalization_ceiling_ms must be >= censorship.mask_timing_normalization_floor_ms"),
"error must explain timing ceiling/floor invariant, got: {msg}"
);
remove_temp_config(&path);
}
#[test]
fn load_accepts_valid_timing_normalization_and_above_cap_blur_config() {
let path = write_temp_config(
r#"
[censorship]
mask_shape_hardening = true
mask_shape_above_cap_blur = true
mask_shape_above_cap_blur_max_bytes = 128
mask_timing_normalization_enabled = true
mask_timing_normalization_floor_ms = 150
mask_timing_normalization_ceiling_ms = 240
"#,
);
let cfg = ProxyConfig::load(&path)
.expect("valid blur and timing normalization settings must be accepted");
assert!(cfg.censorship.mask_shape_hardening);
assert!(cfg.censorship.mask_shape_above_cap_blur);
assert_eq!(cfg.censorship.mask_shape_above_cap_blur_max_bytes, 128);
assert!(cfg.censorship.mask_timing_normalization_enabled);
assert_eq!(cfg.censorship.mask_timing_normalization_floor_ms, 150);
assert_eq!(cfg.censorship.mask_timing_normalization_ceiling_ms, 240);
remove_temp_config(&path);
}

View File

@ -1425,6 +1425,27 @@ pub struct AntiCensorshipConfig {
/// Maximum bucket size for mask shape hardening padding. /// Maximum bucket size for mask shape hardening padding.
#[serde(default = "default_mask_shape_bucket_cap_bytes")] #[serde(default = "default_mask_shape_bucket_cap_bytes")]
pub mask_shape_bucket_cap_bytes: usize, pub mask_shape_bucket_cap_bytes: usize,
/// Add bounded random tail bytes even when total bytes already exceed
/// mask_shape_bucket_cap_bytes.
#[serde(default = "default_mask_shape_above_cap_blur")]
pub mask_shape_above_cap_blur: bool,
/// Maximum random bytes appended above cap when above-cap blur is enabled.
#[serde(default = "default_mask_shape_above_cap_blur_max_bytes")]
pub mask_shape_above_cap_blur_max_bytes: usize,
/// Enable outcome-time normalization envelope for masking fallback.
#[serde(default = "default_mask_timing_normalization_enabled")]
pub mask_timing_normalization_enabled: bool,
/// Lower bound (ms) for masking outcome timing envelope.
#[serde(default = "default_mask_timing_normalization_floor_ms")]
pub mask_timing_normalization_floor_ms: u64,
/// Upper bound (ms) for masking outcome timing envelope.
#[serde(default = "default_mask_timing_normalization_ceiling_ms")]
pub mask_timing_normalization_ceiling_ms: u64,
} }
impl Default for AntiCensorshipConfig { impl Default for AntiCensorshipConfig {
@ -1449,6 +1470,11 @@ impl Default for AntiCensorshipConfig {
mask_shape_hardening: default_mask_shape_hardening(), mask_shape_hardening: default_mask_shape_hardening(),
mask_shape_bucket_floor_bytes: default_mask_shape_bucket_floor_bytes(), mask_shape_bucket_floor_bytes: default_mask_shape_bucket_floor_bytes(),
mask_shape_bucket_cap_bytes: default_mask_shape_bucket_cap_bytes(), mask_shape_bucket_cap_bytes: default_mask_shape_bucket_cap_bytes(),
mask_shape_above_cap_blur: default_mask_shape_above_cap_blur(),
mask_shape_above_cap_blur_max_bytes: default_mask_shape_above_cap_blur_max_bytes(),
mask_timing_normalization_enabled: default_mask_timing_normalization_enabled(),
mask_timing_normalization_floor_ms: default_mask_timing_normalization_floor_ms(),
mask_timing_normalization_ceiling_ms: default_mask_timing_normalization_ceiling_ms(),
} }
} }
} }

View File

@ -1261,3 +1261,15 @@ mod masking_diagnostics_security_tests;
#[cfg(test)] #[cfg(test)]
#[path = "tests/client_masking_shape_hardening_security_tests.rs"] #[path = "tests/client_masking_shape_hardening_security_tests.rs"]
mod masking_shape_hardening_security_tests; mod masking_shape_hardening_security_tests;
#[cfg(test)]
#[path = "tests/client_masking_shape_hardening_adversarial_tests.rs"]
mod masking_shape_hardening_adversarial_tests;
#[cfg(test)]
#[path = "tests/client_masking_shape_hardening_redteam_expected_fail_tests.rs"]
mod masking_shape_hardening_redteam_expected_fail_tests;
#[cfg(test)]
#[path = "tests/client_masking_shape_classifier_fuzz_redteam_expected_fail_tests.rs"]
mod masking_shape_classifier_fuzz_redteam_expected_fail_tests;

View File

@ -3,6 +3,7 @@
use std::str; use std::str;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Duration; use std::time::Duration;
use rand::{Rng, RngCore};
use tokio::net::TcpStream; use tokio::net::TcpStream;
#[cfg(unix)] #[cfg(unix)]
use tokio::net::UnixStream; use tokio::net::UnixStream;
@ -73,7 +74,7 @@ fn next_mask_shape_bucket(total: usize, floor: usize, cap: usize) -> usize {
None => return total, None => return total,
} }
if bucket > cap { if bucket > cap {
return total; return cap;
} }
} }
bucket bucket
@ -85,6 +86,8 @@ async fn maybe_write_shape_padding<W>(
enabled: bool, enabled: bool,
floor: usize, floor: usize,
cap: usize, cap: usize,
above_cap_blur: bool,
above_cap_blur_max_bytes: usize,
) )
where where
W: AsyncWrite + Unpin, W: AsyncWrite + Unpin,
@ -93,15 +96,47 @@ where
return; return;
} }
let bucket = next_mask_shape_bucket(total_sent, floor, cap); let target_total = if total_sent >= cap && above_cap_blur && above_cap_blur_max_bytes > 0 {
if bucket <= total_sent { let mut rng = rand::rng();
let extra = rng.random_range(0..=above_cap_blur_max_bytes);
total_sent.saturating_add(extra)
} else {
next_mask_shape_bucket(total_sent, floor, cap)
};
if target_total <= total_sent {
return; return;
} }
let pad_len = bucket - total_sent; let mut remaining = target_total - total_sent;
let pad = vec![0u8; pad_len]; let mut pad_chunk = [0u8; 1024];
let _ = timeout(MASK_TIMEOUT, mask_write.write_all(&pad)).await; let deadline = Instant::now() + MASK_TIMEOUT;
let _ = timeout(MASK_TIMEOUT, mask_write.flush()).await;
while remaining > 0 {
let now = Instant::now();
if now >= deadline {
return;
}
let write_len = remaining.min(pad_chunk.len());
{
let mut rng = rand::rng();
rng.fill_bytes(&mut pad_chunk[..write_len]);
}
let write_budget = deadline.saturating_duration_since(now);
match timeout(write_budget, mask_write.write_all(&pad_chunk[..write_len])).await {
Ok(Ok(())) => {}
Ok(Err(_)) | Err(_) => return,
}
remaining -= write_len;
}
let now = Instant::now();
if now >= deadline {
return;
}
let flush_budget = deadline.saturating_duration_since(now);
let _ = timeout(flush_budget, mask_write.flush()).await;
} }
async fn write_proxy_header_with_timeout<W>(mask_write: &mut W, header: &[u8]) -> bool async fn write_proxy_header_with_timeout<W>(mask_write: &mut W, header: &[u8]) -> bool
@ -134,10 +169,33 @@ async fn wait_mask_connect_budget(started: Instant) {
} }
} }
async fn wait_mask_outcome_budget(started: Instant) { fn mask_outcome_target_budget(config: &ProxyConfig) -> Duration {
if config.censorship.mask_timing_normalization_enabled {
let floor = config.censorship.mask_timing_normalization_floor_ms;
let ceiling = config.censorship.mask_timing_normalization_ceiling_ms;
if ceiling > floor {
let mut rng = rand::rng();
return Duration::from_millis(rng.random_range(floor..=ceiling));
}
return Duration::from_millis(floor);
}
MASK_TIMEOUT
}
async fn wait_mask_connect_budget_if_needed(started: Instant, config: &ProxyConfig) {
if config.censorship.mask_timing_normalization_enabled {
return;
}
wait_mask_connect_budget(started).await;
}
async fn wait_mask_outcome_budget(started: Instant, config: &ProxyConfig) {
let target = mask_outcome_target_budget(config);
let elapsed = started.elapsed(); let elapsed = started.elapsed();
if elapsed < MASK_TIMEOUT { if elapsed < target {
tokio::time::sleep(MASK_TIMEOUT - elapsed).await; tokio::time::sleep(target - elapsed).await;
} }
} }
@ -247,7 +305,7 @@ where
build_mask_proxy_header(config.censorship.mask_proxy_protocol, peer, local_addr); build_mask_proxy_header(config.censorship.mask_proxy_protocol, peer, local_addr);
if let Some(header) = proxy_header { if let Some(header) = proxy_header {
if !write_proxy_header_with_timeout(&mut mask_write, &header).await { if !write_proxy_header_with_timeout(&mut mask_write, &header).await {
wait_mask_outcome_budget(outcome_started).await; wait_mask_outcome_budget(outcome_started, config).await;
return; return;
} }
} }
@ -262,6 +320,8 @@ where
config.censorship.mask_shape_hardening, config.censorship.mask_shape_hardening,
config.censorship.mask_shape_bucket_floor_bytes, config.censorship.mask_shape_bucket_floor_bytes,
config.censorship.mask_shape_bucket_cap_bytes, config.censorship.mask_shape_bucket_cap_bytes,
config.censorship.mask_shape_above_cap_blur,
config.censorship.mask_shape_above_cap_blur_max_bytes,
), ),
) )
.await .await
@ -269,18 +329,18 @@ where
{ {
debug!("Mask relay timed out (unix socket)"); debug!("Mask relay timed out (unix socket)");
} }
wait_mask_outcome_budget(outcome_started).await; wait_mask_outcome_budget(outcome_started, config).await;
} }
Ok(Err(e)) => { Ok(Err(e)) => {
wait_mask_connect_budget(connect_started).await; wait_mask_connect_budget_if_needed(connect_started, config).await;
debug!(error = %e, "Failed to connect to mask unix socket"); debug!(error = %e, "Failed to connect to mask unix socket");
consume_client_data_with_timeout(reader).await; consume_client_data_with_timeout(reader).await;
wait_mask_outcome_budget(outcome_started).await; wait_mask_outcome_budget(outcome_started, config).await;
} }
Err(_) => { Err(_) => {
debug!("Timeout connecting to mask unix socket"); debug!("Timeout connecting to mask unix socket");
consume_client_data_with_timeout(reader).await; consume_client_data_with_timeout(reader).await;
wait_mask_outcome_budget(outcome_started).await; wait_mask_outcome_budget(outcome_started, config).await;
} }
} }
return; return;
@ -313,7 +373,7 @@ where
let (mask_read, mut mask_write) = stream.into_split(); let (mask_read, mut mask_write) = stream.into_split();
if let Some(header) = proxy_header { if let Some(header) = proxy_header {
if !write_proxy_header_with_timeout(&mut mask_write, &header).await { if !write_proxy_header_with_timeout(&mut mask_write, &header).await {
wait_mask_outcome_budget(outcome_started).await; wait_mask_outcome_budget(outcome_started, config).await;
return; return;
} }
} }
@ -328,6 +388,8 @@ where
config.censorship.mask_shape_hardening, config.censorship.mask_shape_hardening,
config.censorship.mask_shape_bucket_floor_bytes, config.censorship.mask_shape_bucket_floor_bytes,
config.censorship.mask_shape_bucket_cap_bytes, config.censorship.mask_shape_bucket_cap_bytes,
config.censorship.mask_shape_above_cap_blur,
config.censorship.mask_shape_above_cap_blur_max_bytes,
), ),
) )
.await .await
@ -335,18 +397,18 @@ where
{ {
debug!("Mask relay timed out"); debug!("Mask relay timed out");
} }
wait_mask_outcome_budget(outcome_started).await; wait_mask_outcome_budget(outcome_started, config).await;
} }
Ok(Err(e)) => { Ok(Err(e)) => {
wait_mask_connect_budget(connect_started).await; wait_mask_connect_budget_if_needed(connect_started, config).await;
debug!(error = %e, "Failed to connect to mask host"); debug!(error = %e, "Failed to connect to mask host");
consume_client_data_with_timeout(reader).await; consume_client_data_with_timeout(reader).await;
wait_mask_outcome_budget(outcome_started).await; wait_mask_outcome_budget(outcome_started, config).await;
} }
Err(_) => { Err(_) => {
debug!("Timeout connecting to mask host"); debug!("Timeout connecting to mask host");
consume_client_data_with_timeout(reader).await; consume_client_data_with_timeout(reader).await;
wait_mask_outcome_budget(outcome_started).await; wait_mask_outcome_budget(outcome_started, config).await;
} }
} }
} }
@ -361,6 +423,8 @@ async fn relay_to_mask<R, W, MR, MW>(
shape_hardening_enabled: bool, shape_hardening_enabled: bool,
shape_bucket_floor_bytes: usize, shape_bucket_floor_bytes: usize,
shape_bucket_cap_bytes: usize, shape_bucket_cap_bytes: usize,
shape_above_cap_blur: bool,
shape_above_cap_blur_max_bytes: usize,
) )
where where
R: AsyncRead + Unpin + Send + 'static, R: AsyncRead + Unpin + Send + 'static,
@ -386,6 +450,8 @@ where
shape_hardening_enabled, shape_hardening_enabled,
shape_bucket_floor_bytes, shape_bucket_floor_bytes,
shape_bucket_cap_bytes, shape_bucket_cap_bytes,
shape_above_cap_blur,
shape_above_cap_blur_max_bytes,
) )
.await; .await;
let _ = mask_write.shutdown().await; let _ = mask_write.shutdown().await;
@ -414,3 +480,23 @@ mod security_tests;
#[cfg(test)] #[cfg(test)]
#[path = "tests/masking_adversarial_tests.rs"] #[path = "tests/masking_adversarial_tests.rs"]
mod adversarial_tests; mod adversarial_tests;
#[cfg(test)]
#[path = "tests/masking_shape_hardening_adversarial_tests.rs"]
mod masking_shape_hardening_adversarial_tests;
#[cfg(test)]
#[path = "tests/masking_shape_above_cap_blur_security_tests.rs"]
mod masking_shape_above_cap_blur_security_tests;
#[cfg(test)]
#[path = "tests/masking_timing_normalization_security_tests.rs"]
mod masking_timing_normalization_security_tests;
#[cfg(test)]
#[path = "tests/masking_ab_envelope_blur_integration_security_tests.rs"]
mod masking_ab_envelope_blur_integration_security_tests;
#[cfg(test)]
#[path = "tests/masking_timing_sidechannel_redteam_expected_fail_tests.rs"]
mod masking_timing_sidechannel_redteam_expected_fail_tests;

View File

@ -0,0 +1,245 @@
use super::*;
use crate::config::{UpstreamConfig, UpstreamType};
use std::sync::Arc;
use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::time::Duration;
fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
Arc::new(UpstreamManager::new(
vec![UpstreamConfig {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
}],
1,
1,
1,
1,
false,
stats,
))
}
async fn run_probe_capture(
body_sent: usize,
tls_len: u16,
enable_shape_hardening: bool,
floor: usize,
cap: usize,
) -> usize {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
let mut cfg = ProxyConfig::default();
cfg.general.beobachten = false;
cfg.censorship.mask = true;
cfg.censorship.mask_unix_sock = None;
cfg.censorship.mask_host = Some("127.0.0.1".to_string());
cfg.censorship.mask_port = backend_addr.port();
cfg.censorship.mask_shape_hardening = enable_shape_hardening;
cfg.censorship.mask_shape_bucket_floor_bytes = floor;
cfg.censorship.mask_shape_bucket_cap_bytes = cap;
let accept_task = tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
let mut got = Vec::new();
let _ = tokio::time::timeout(Duration::from_secs(2), stream.read_to_end(&mut got)).await;
got.len()
});
let (server_side, mut client_side) = duplex(65536);
let handler = tokio::spawn(handle_client_stream(
server_side,
"198.51.100.214:57014".parse().unwrap(),
Arc::new(cfg),
Arc::new(Stats::new()),
new_upstream_manager(Arc::new(Stats::new())),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),
None,
Arc::new(RouteRuntimeController::new(RelayRouteMode::Direct)),
None,
Arc::new(UserIpTracker::new()),
Arc::new(BeobachtenStore::new()),
false,
));
let mut probe = vec![0u8; 5 + body_sent];
probe[0] = 0x16;
probe[1] = 0x03;
probe[2] = 0x01;
probe[3..5].copy_from_slice(&tls_len.to_be_bytes());
probe[5..].fill(0x66);
client_side.write_all(&probe).await.unwrap();
client_side.shutdown().await.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(4), handler)
.await
.unwrap()
.unwrap();
tokio::time::timeout(Duration::from_secs(4), accept_task)
.await
.unwrap()
.unwrap()
}
fn pearson_corr(xs: &[f64], ys: &[f64]) -> f64 {
if xs.len() != ys.len() || xs.is_empty() {
return 0.0;
}
let n = xs.len() as f64;
let mean_x = xs.iter().sum::<f64>() / n;
let mean_y = ys.iter().sum::<f64>() / n;
let mut cov = 0.0;
let mut var_x = 0.0;
let mut var_y = 0.0;
for (&x, &y) in xs.iter().zip(ys.iter()) {
let dx = x - mean_x;
let dy = y - mean_y;
cov += dx * dy;
var_x += dx * dx;
var_y += dy * dy;
}
if var_x == 0.0 || var_y == 0.0 {
return 0.0;
}
cov / (var_x.sqrt() * var_y.sqrt())
}
fn lcg_sizes(count: usize, floor: usize, cap: usize) -> Vec<usize> {
let mut x = 0x9E3779B97F4A7C15u64;
let span = cap.saturating_mul(3);
let mut out = Vec::with_capacity(count + 8);
for _ in 0..count {
x = x
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
let v = (x as usize) % span.max(1);
out.push(v);
}
// Inject edge and boundary-heavy probes.
out.extend_from_slice(&[
0,
floor.saturating_sub(1),
floor,
floor.saturating_add(1),
cap.saturating_sub(1),
cap,
cap.saturating_add(1),
cap.saturating_mul(2),
]);
out
}
async fn collect_distribution(
sizes: &[usize],
hardening: bool,
floor: usize,
cap: usize,
) -> Vec<usize> {
let mut out = Vec::with_capacity(sizes.len());
for &body in sizes {
out.push(run_probe_capture(body, 1200, hardening, floor, cap).await);
}
out
}
#[tokio::test]
#[ignore = "red-team expected-fail: strict decorrelation target for hardened output lengths"]
async fn redteam_fuzz_01_hardened_output_length_correlation_should_be_below_0_2() {
let floor = 512usize;
let cap = 4096usize;
let sizes = lcg_sizes(24, floor, cap);
let hardened = collect_distribution(&sizes, true, floor, cap).await;
let x: Vec<f64> = sizes.iter().map(|v| *v as f64).collect();
let y_hard: Vec<f64> = hardened.iter().map(|v| *v as f64).collect();
let corr_hard = pearson_corr(&x, &y_hard).abs();
println!("redteam_fuzz corr_hardened={corr_hard:.4} samples={}", sizes.len());
assert!(
corr_hard < 0.2,
"strict model expects near-zero size correlation; observed corr={corr_hard:.4}"
);
}
#[tokio::test]
#[ignore = "red-team expected-fail: strict class-collapse ratio target"]
async fn redteam_fuzz_02_hardened_unique_output_ratio_should_be_below_5pct() {
let floor = 512usize;
let cap = 4096usize;
let sizes = lcg_sizes(24, floor, cap);
let hardened = collect_distribution(&sizes, true, floor, cap).await;
let in_unique = {
let mut s = std::collections::BTreeSet::new();
for v in &sizes {
s.insert(*v);
}
s.len()
};
let out_unique = {
let mut s = std::collections::BTreeSet::new();
for v in &hardened {
s.insert(*v);
}
s.len()
};
let ratio = out_unique as f64 / in_unique as f64;
println!(
"redteam_fuzz unique_ratio_hardened={ratio:.4} out_unique={} in_unique={}",
out_unique, in_unique
);
assert!(
ratio <= 0.05,
"strict model expects near-total collapse; observed ratio={ratio:.4}"
);
}
#[tokio::test]
#[ignore = "red-team expected-fail: strict separability improvement target"]
async fn redteam_fuzz_03_hardened_signal_must_be_10x_lower_than_plain() {
let floor = 512usize;
let cap = 4096usize;
let sizes = lcg_sizes(24, floor, cap);
let plain = collect_distribution(&sizes, false, floor, cap).await;
let hardened = collect_distribution(&sizes, true, floor, cap).await;
let x: Vec<f64> = sizes.iter().map(|v| *v as f64).collect();
let y_plain: Vec<f64> = plain.iter().map(|v| *v as f64).collect();
let y_hard: Vec<f64> = hardened.iter().map(|v| *v as f64).collect();
let corr_plain = pearson_corr(&x, &y_plain).abs();
let corr_hard = pearson_corr(&x, &y_hard).abs();
println!(
"redteam_fuzz corr_plain={corr_plain:.4} corr_hardened={corr_hard:.4}"
);
assert!(
corr_hard <= corr_plain * 0.1,
"strict model expects 10x suppression; plain={corr_plain:.4} hardened={corr_hard:.4}"
);
}

View File

@ -0,0 +1,179 @@
use super::*;
use crate::config::{UpstreamConfig, UpstreamType};
use std::sync::Arc;
use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::time::Duration;
fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
Arc::new(UpstreamManager::new(
vec![UpstreamConfig {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
}],
1,
1,
1,
1,
false,
stats,
))
}
fn expected_bucket(total: usize, floor: usize, cap: usize) -> usize {
if total == 0 || floor == 0 || cap < floor {
return total;
}
if total >= cap {
return total;
}
let mut bucket = floor;
while bucket < total {
match bucket.checked_mul(2) {
Some(next) => bucket = next,
None => return total,
}
if bucket > cap {
return cap;
}
}
bucket
}
async fn run_probe_capture(
body_sent: usize,
tls_len: u16,
enable_shape_hardening: bool,
floor: usize,
cap: usize,
) -> Vec<u8> {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
let mut cfg = ProxyConfig::default();
cfg.general.beobachten = false;
cfg.censorship.mask = true;
cfg.censorship.mask_unix_sock = None;
cfg.censorship.mask_host = Some("127.0.0.1".to_string());
cfg.censorship.mask_port = backend_addr.port();
cfg.censorship.mask_shape_hardening = enable_shape_hardening;
cfg.censorship.mask_shape_bucket_floor_bytes = floor;
cfg.censorship.mask_shape_bucket_cap_bytes = cap;
let accept_task = tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
let mut got = Vec::new();
let _ = tokio::time::timeout(Duration::from_secs(2), stream.read_to_end(&mut got)).await;
got
});
let (server_side, mut client_side) = duplex(65536);
let handler = tokio::spawn(handle_client_stream(
server_side,
"198.51.100.199:56999".parse().unwrap(),
Arc::new(cfg),
Arc::new(Stats::new()),
new_upstream_manager(Arc::new(Stats::new())),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),
None,
Arc::new(RouteRuntimeController::new(RelayRouteMode::Direct)),
None,
Arc::new(UserIpTracker::new()),
Arc::new(BeobachtenStore::new()),
false,
));
let mut probe = vec![0u8; 5 + body_sent];
probe[0] = 0x16;
probe[1] = 0x03;
probe[2] = 0x01;
probe[3..5].copy_from_slice(&tls_len.to_be_bytes());
probe[5..].fill(0x66);
client_side.write_all(&probe).await.unwrap();
client_side.shutdown().await.unwrap();
let result = tokio::time::timeout(Duration::from_secs(4), handler)
.await
.unwrap()
.unwrap();
assert!(result.is_ok());
tokio::time::timeout(Duration::from_secs(4), accept_task)
.await
.unwrap()
.unwrap()
}
#[tokio::test]
async fn shape_hardening_non_power_of_two_cap_collapses_probe_classes() {
let floor = 1000usize;
let cap = 1500usize;
let low = run_probe_capture(1195, 700, true, floor, cap).await;
let high = run_probe_capture(1494, 700, true, floor, cap).await;
assert_eq!(low.len(), 1500);
assert_eq!(high.len(), 1500);
}
#[tokio::test]
async fn shape_hardening_disabled_keeps_non_power_of_two_cap_lengths_distinct() {
let floor = 1000usize;
let cap = 1500usize;
let low = run_probe_capture(1195, 700, false, floor, cap).await;
let high = run_probe_capture(1494, 700, false, floor, cap).await;
assert_eq!(low.len(), 1200);
assert_eq!(high.len(), 1499);
}
#[tokio::test]
async fn shape_hardening_parallel_stress_collapses_sub_cap_probes() {
let floor = 1000usize;
let cap = 1500usize;
let mut tasks = Vec::new();
for idx in 0..24usize {
let body = 1001 + (idx * 19 % 480);
tasks.push(tokio::spawn(async move {
run_probe_capture(body, 1200, true, floor, cap).await.len()
}));
}
for task in tasks {
let observed = task.await.unwrap();
assert_eq!(observed, 1500);
}
}
#[tokio::test]
async fn shape_hardening_light_fuzz_matches_bucket_oracle() {
let floor = 512usize;
let cap = 4096usize;
for step in 1usize..=36usize {
let total = 1 + (((step * 313) ^ (step << 7)) % (cap + 300));
let body = total.saturating_sub(5);
let got = run_probe_capture(body, 650, true, floor, cap).await;
let expected = expected_bucket(total, floor, cap);
assert_eq!(
got.len(),
expected,
"step={step} total={total} expected={expected} got={} ",
got.len()
);
}
}

View File

@ -0,0 +1,236 @@
use super::*;
use crate::config::{UpstreamConfig, UpstreamType};
use std::sync::Arc;
use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::time::{Duration, Instant};
fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
Arc::new(UpstreamManager::new(
vec![UpstreamConfig {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
}],
1,
1,
1,
1,
false,
stats,
))
}
async fn run_probe_capture(
body_sent: usize,
tls_len: u16,
enable_shape_hardening: bool,
floor: usize,
cap: usize,
) -> Vec<u8> {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
let mut cfg = ProxyConfig::default();
cfg.general.beobachten = false;
cfg.censorship.mask = true;
cfg.censorship.mask_unix_sock = None;
cfg.censorship.mask_host = Some("127.0.0.1".to_string());
cfg.censorship.mask_port = backend_addr.port();
cfg.censorship.mask_shape_hardening = enable_shape_hardening;
cfg.censorship.mask_shape_bucket_floor_bytes = floor;
cfg.censorship.mask_shape_bucket_cap_bytes = cap;
let accept_task = tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
let mut got = Vec::new();
let _ = tokio::time::timeout(Duration::from_secs(2), stream.read_to_end(&mut got)).await;
got
});
let (server_side, mut client_side) = duplex(65536);
let handler = tokio::spawn(handle_client_stream(
server_side,
"198.51.100.211:57011".parse().unwrap(),
Arc::new(cfg),
Arc::new(Stats::new()),
new_upstream_manager(Arc::new(Stats::new())),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),
None,
Arc::new(RouteRuntimeController::new(RelayRouteMode::Direct)),
None,
Arc::new(UserIpTracker::new()),
Arc::new(BeobachtenStore::new()),
false,
));
let mut probe = vec![0u8; 5 + body_sent];
probe[0] = 0x16;
probe[1] = 0x03;
probe[2] = 0x01;
probe[3..5].copy_from_slice(&tls_len.to_be_bytes());
probe[5..].fill(0x66);
client_side.write_all(&probe).await.unwrap();
client_side.shutdown().await.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(4), handler)
.await
.unwrap()
.unwrap();
tokio::time::timeout(Duration::from_secs(4), accept_task)
.await
.unwrap()
.unwrap()
}
async fn measure_reject_ms(body_sent: usize) -> u128 {
let mut cfg = ProxyConfig::default();
cfg.general.beobachten = false;
cfg.timeouts.client_handshake = 1;
cfg.censorship.mask = true;
cfg.censorship.mask_unix_sock = None;
cfg.censorship.mask_host = Some("127.0.0.1".to_string());
cfg.censorship.mask_port = 1;
cfg.censorship.server_hello_delay_min_ms = 700;
cfg.censorship.server_hello_delay_max_ms = 700;
let (server_side, mut client_side) = duplex(65536);
let started = Instant::now();
let handler = tokio::spawn(handle_client_stream(
server_side,
"198.51.100.212:57012".parse().unwrap(),
Arc::new(cfg),
Arc::new(Stats::new()),
new_upstream_manager(Arc::new(Stats::new())),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),
None,
Arc::new(RouteRuntimeController::new(RelayRouteMode::Direct)),
None,
Arc::new(UserIpTracker::new()),
Arc::new(BeobachtenStore::new()),
false,
));
let mut probe = vec![0u8; 5 + body_sent];
probe[0] = 0x16;
probe[1] = 0x03;
probe[2] = 0x01;
probe[3..5].copy_from_slice(&600u16.to_be_bytes());
probe[5..].fill(0x44);
client_side.write_all(&probe).await.unwrap();
client_side.shutdown().await.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(4), handler)
.await
.unwrap()
.unwrap();
started.elapsed().as_millis()
}
#[tokio::test]
#[ignore = "red-team expected-fail: above-cap exact length still leaks classifier signal"]
async fn redteam_shape_01_above_cap_flows_should_collapse_to_single_class() {
let floor = 512usize;
let cap = 4096usize;
let a = run_probe_capture(5000, 7000, true, floor, cap).await;
let b = run_probe_capture(6000, 7000, true, floor, cap).await;
assert_eq!(
a.len(),
b.len(),
"strict anti-classifier model expects same backend length class above cap"
);
}
#[tokio::test]
#[ignore = "red-team expected-fail: current padding bytes are deterministic zeros"]
async fn redteam_shape_02_padding_tail_must_be_non_deterministic() {
let floor = 512usize;
let cap = 4096usize;
let got = run_probe_capture(17, 600, true, floor, cap).await;
assert!(
got.len() > 22,
"test requires padding tail to exist"
);
let tail = &got[22..];
assert!(
tail.iter().any(|b| *b != 0),
"padding tail is fully zeroed and thus deterministic"
);
}
#[tokio::test]
#[ignore = "red-team expected-fail: exact-floor probes still expose boundary class"]
async fn redteam_shape_03_exact_floor_input_should_not_be_fixed_point() {
let floor = 512usize;
let cap = 4096usize;
let got = run_probe_capture(507, 600, true, floor, cap).await;
assert!(
got.len() > floor,
"strict model expects extra blur even when input lands exactly on floor"
);
}
#[tokio::test]
#[ignore = "red-team expected-fail: strict one-bucket collapse hypothesis"]
async fn redteam_shape_04_all_sub_cap_sizes_should_collapse_to_single_size() {
let floor = 512usize;
let cap = 4096usize;
let classes = [17usize, 63usize, 255usize, 511usize, 1023usize, 2047usize, 3071usize];
let mut observed = Vec::new();
for body in classes {
observed.push(run_probe_capture(body, 1200, true, floor, cap).await.len());
}
let first = observed[0];
for v in observed {
assert_eq!(v, first, "strict model expects one collapsed class across all sub-cap probes");
}
}
#[tokio::test]
#[ignore = "red-team expected-fail: over-strict micro-timing invariance"]
async fn redteam_shape_05_reject_timing_spread_should_be_under_2ms() {
let classes = [17usize, 511usize, 1023usize, 2047usize, 4095usize];
let mut values = Vec::new();
for class in classes {
values.push(measure_reject_ms(class).await);
}
let min = *values.iter().min().unwrap();
let max = *values.iter().max().unwrap();
assert!(
min == 700 && max == 700,
"strict model requires exact 700ms for every malformed class: min={min}ms max={max}ms"
);
}
#[test]
#[ignore = "red-team expected-fail: secure-by-default hypothesis"]
fn redteam_shape_06_shape_hardening_should_be_secure_by_default() {
let cfg = ProxyConfig::default();
assert!(
cfg.censorship.mask_shape_hardening,
"strict model expects shape hardening enabled by default"
);
}

View File

@ -0,0 +1,241 @@
use super::*;
use std::collections::BTreeSet;
use tokio::io::duplex;
use tokio::net::TcpListener;
use tokio::time::{Duration, Instant};
#[derive(Clone, Copy)]
enum PathClass {
ConnectFail,
ConnectSuccess,
SlowBackend,
}
fn mean_ms(samples: &[u128]) -> f64 {
if samples.is_empty() {
return 0.0;
}
let sum: u128 = samples.iter().copied().sum();
sum as f64 / samples.len() as f64
}
async fn measure_masking_duration_ms(path: PathClass, timing_norm_enabled: bool) -> u128 {
let mut config = ProxyConfig::default();
config.general.beobachten = false;
config.censorship.mask = true;
config.censorship.mask_unix_sock = None;
config.censorship.mask_timing_normalization_enabled = timing_norm_enabled;
config.censorship.mask_timing_normalization_floor_ms = 220;
config.censorship.mask_timing_normalization_ceiling_ms = 260;
let accept_task = match path {
PathClass::ConnectFail => {
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = 1;
None
}
PathClass::ConnectSuccess => {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = backend_addr.port();
Some(tokio::spawn(async move {
let (_stream, _) = listener.accept().await.unwrap();
}))
}
PathClass::SlowBackend => {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = backend_addr.port();
Some(tokio::spawn(async move {
let (_stream, _) = listener.accept().await.unwrap();
tokio::time::sleep(Duration::from_millis(320)).await;
}))
}
};
let (client_reader, _client_writer) = duplex(1024);
let (_client_visible_reader, client_visible_writer) = duplex(1024);
let peer: SocketAddr = "198.51.100.230:57230".parse().unwrap();
let local: SocketAddr = "127.0.0.1:443".parse().unwrap();
let beobachten = BeobachtenStore::new();
let started = Instant::now();
handle_bad_client(
client_reader,
client_visible_writer,
b"GET /ab-harness HTTP/1.1\r\nHost: x\r\n\r\n",
peer,
local,
&config,
&beobachten,
)
.await;
if let Some(task) = accept_task {
let _ = tokio::time::timeout(Duration::from_secs(2), task).await;
}
started.elapsed().as_millis()
}
async fn capture_above_cap_forwarded_len(
body_sent: usize,
above_cap_blur_enabled: bool,
above_cap_blur_max_bytes: usize,
) -> usize {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
let mut config = ProxyConfig::default();
config.general.beobachten = false;
config.censorship.mask = true;
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = backend_addr.port();
config.censorship.mask_shape_hardening = true;
config.censorship.mask_shape_bucket_floor_bytes = 512;
config.censorship.mask_shape_bucket_cap_bytes = 4096;
config.censorship.mask_shape_above_cap_blur = above_cap_blur_enabled;
config.censorship.mask_shape_above_cap_blur_max_bytes = above_cap_blur_max_bytes;
let accept_task = tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
let mut got = Vec::new();
let _ = tokio::time::timeout(Duration::from_secs(2), stream.read_to_end(&mut got)).await;
got.len()
});
let (client_reader, mut client_writer) = duplex(64 * 1024);
let (_client_visible_reader, client_visible_writer) = duplex(64 * 1024);
let peer: SocketAddr = "198.51.100.231:57231".parse().unwrap();
let local: SocketAddr = "127.0.0.1:443".parse().unwrap();
let beobachten = BeobachtenStore::new();
let mut initial = vec![0u8; 5 + body_sent];
initial[0] = 0x16;
initial[1] = 0x03;
initial[2] = 0x01;
initial[3..5].copy_from_slice(&7000u16.to_be_bytes());
initial[5..].fill(0x5A);
let fallback_task = tokio::spawn(async move {
handle_bad_client(
client_reader,
client_visible_writer,
&initial,
peer,
local,
&config,
&beobachten,
)
.await;
});
client_writer.shutdown().await.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(4), fallback_task)
.await
.unwrap()
.unwrap();
tokio::time::timeout(Duration::from_secs(4), accept_task)
.await
.unwrap()
.unwrap()
}
#[tokio::test]
async fn integration_ab_harness_envelope_and_blur_improve_obfuscation_vs_baseline() {
const ITER: usize = 8;
let mut baseline_fail = Vec::with_capacity(ITER);
let mut baseline_success = Vec::with_capacity(ITER);
let mut baseline_slow = Vec::with_capacity(ITER);
let mut hardened_fail = Vec::with_capacity(ITER);
let mut hardened_success = Vec::with_capacity(ITER);
let mut hardened_slow = Vec::with_capacity(ITER);
for _ in 0..ITER {
baseline_fail.push(measure_masking_duration_ms(PathClass::ConnectFail, false).await);
baseline_success.push(measure_masking_duration_ms(PathClass::ConnectSuccess, false).await);
baseline_slow.push(measure_masking_duration_ms(PathClass::SlowBackend, false).await);
hardened_fail.push(measure_masking_duration_ms(PathClass::ConnectFail, true).await);
hardened_success.push(measure_masking_duration_ms(PathClass::ConnectSuccess, true).await);
hardened_slow.push(measure_masking_duration_ms(PathClass::SlowBackend, true).await);
}
let baseline_means = [
mean_ms(&baseline_fail),
mean_ms(&baseline_success),
mean_ms(&baseline_slow),
];
let hardened_means = [
mean_ms(&hardened_fail),
mean_ms(&hardened_success),
mean_ms(&hardened_slow),
];
let baseline_range = baseline_means
.iter()
.copied()
.fold((f64::INFINITY, f64::NEG_INFINITY), |(mn, mx), v| {
(mn.min(v), mx.max(v))
});
let hardened_range = hardened_means
.iter()
.copied()
.fold((f64::INFINITY, f64::NEG_INFINITY), |(mn, mx), v| {
(mn.min(v), mx.max(v))
});
let baseline_spread = baseline_range.1 - baseline_range.0;
let hardened_spread = hardened_range.1 - hardened_range.0;
println!(
"ab_harness_timing baseline_means={:?} hardened_means={:?} baseline_spread={:.2} hardened_spread={:.2}",
baseline_means, hardened_means, baseline_spread, hardened_spread
);
assert!(
hardened_spread < baseline_spread,
"timing envelope should reduce cross-path mean spread: baseline={baseline_spread:.2} hardened={hardened_spread:.2}"
);
let mut baseline_a = BTreeSet::new();
let mut baseline_b = BTreeSet::new();
let mut hardened_a = BTreeSet::new();
let mut hardened_b = BTreeSet::new();
for _ in 0..24 {
baseline_a.insert(capture_above_cap_forwarded_len(5000, false, 0).await);
baseline_b.insert(capture_above_cap_forwarded_len(5040, false, 0).await);
hardened_a.insert(capture_above_cap_forwarded_len(5000, true, 96).await);
hardened_b.insert(capture_above_cap_forwarded_len(5040, true, 96).await);
}
let baseline_overlap = baseline_a.intersection(&baseline_b).count();
let hardened_overlap = hardened_a.intersection(&hardened_b).count();
println!(
"ab_harness_length baseline_overlap={} hardened_overlap={} baseline_a={} baseline_b={} hardened_a={} hardened_b={}",
baseline_overlap,
hardened_overlap,
baseline_a.len(),
baseline_b.len(),
hardened_a.len(),
hardened_b.len()
);
assert_eq!(baseline_overlap, 0, "baseline above-cap classes should be disjoint");
assert!(
hardened_overlap > baseline_overlap,
"above-cap blur should increase cross-class overlap: baseline={} hardened={}",
baseline_overlap,
hardened_overlap
);
}

View File

@ -1321,6 +1321,8 @@ async fn relay_to_mask_keeps_backend_to_client_flow_when_client_to_backend_stall
false, false,
0, 0,
0, 0,
false,
0,
) )
.await; .await;
}); });
@ -1424,7 +1426,18 @@ async fn relay_to_mask_timeout_cancels_and_drops_all_io_endpoints() {
let timed = timeout( let timed = timeout(
Duration::from_millis(40), Duration::from_millis(40),
relay_to_mask(reader, writer, mask_read, mask_write, b"", false, 0, 0), relay_to_mask(
reader,
writer,
mask_read,
mask_write,
b"",
false,
0,
0,
false,
0,
),
) )
.await; .await;

View File

@ -0,0 +1,102 @@
use super::*;
use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::time::Duration;
async fn capture_forwarded_len(
body_sent: usize,
shape_hardening: bool,
above_cap_blur: bool,
above_cap_blur_max_bytes: usize,
) -> usize {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
let mut config = ProxyConfig::default();
config.general.beobachten = false;
config.censorship.mask = true;
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = backend_addr.port();
config.censorship.mask_shape_hardening = shape_hardening;
config.censorship.mask_shape_bucket_floor_bytes = 512;
config.censorship.mask_shape_bucket_cap_bytes = 4096;
config.censorship.mask_shape_above_cap_blur = above_cap_blur;
config.censorship.mask_shape_above_cap_blur_max_bytes = above_cap_blur_max_bytes;
let accept_task = tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
let mut got = Vec::new();
let _ = tokio::time::timeout(Duration::from_secs(2), stream.read_to_end(&mut got)).await;
got.len()
});
let (server_reader, mut client_writer) = duplex(64 * 1024);
let (_client_visible_reader, client_visible_writer) = duplex(64 * 1024);
let peer: SocketAddr = "198.51.100.220:57120".parse().unwrap();
let local: SocketAddr = "127.0.0.1:443".parse().unwrap();
let beobachten = BeobachtenStore::new();
let mut probe = vec![0u8; 5 + body_sent];
probe[0] = 0x16;
probe[1] = 0x03;
probe[2] = 0x01;
probe[3..5].copy_from_slice(&7000u16.to_be_bytes());
probe[5..].fill(0x5A);
let fallback = tokio::spawn(async move {
handle_bad_client(
server_reader,
client_visible_writer,
&probe,
peer,
local,
&config,
&beobachten,
)
.await;
});
client_writer.shutdown().await.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(4), fallback)
.await
.unwrap()
.unwrap();
tokio::time::timeout(Duration::from_secs(4), accept_task)
.await
.unwrap()
.unwrap()
}
#[tokio::test]
async fn above_cap_blur_disabled_keeps_exact_above_cap_length() {
let body_sent = 5000usize;
let observed = capture_forwarded_len(body_sent, true, false, 0).await;
assert_eq!(observed, 5 + body_sent);
}
#[tokio::test]
async fn above_cap_blur_enabled_adds_bounded_random_tail() {
let body_sent = 5000usize;
let base = 5 + body_sent;
let max_extra = 64usize;
let mut saw_extra = false;
for _ in 0..20 {
let observed = capture_forwarded_len(body_sent, true, true, max_extra).await;
assert!(observed >= base, "observed={observed} base={base}");
assert!(
observed <= base + max_extra,
"observed={observed} base={} max_extra={max_extra}",
base
);
if observed > base {
saw_extra = true;
}
}
assert!(
saw_extra,
"at least one run should produce above-cap blur bytes under randomization"
);
}

View File

@ -0,0 +1,129 @@
use super::*;
use tokio::io::{duplex, empty, sink, AsyncReadExt, AsyncWrite};
struct CountingWriter {
written: usize,
}
impl CountingWriter {
fn new() -> Self {
Self { written: 0 }
}
}
impl AsyncWrite for CountingWriter {
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
self.written = self.written.saturating_add(buf.len());
std::task::Poll::Ready(Ok(buf.len()))
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
std::task::Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
std::task::Poll::Ready(Ok(()))
}
}
#[test]
fn shape_bucket_clamps_to_cap_when_next_power_of_two_exceeds_cap() {
let bucket = next_mask_shape_bucket(1200, 1000, 1500);
assert_eq!(bucket, 1500);
}
#[test]
fn shape_bucket_never_drops_below_total_for_valid_ranges() {
for total in [1usize, 32, 127, 512, 999, 1000, 1001, 1499, 1500, 1501] {
let bucket = next_mask_shape_bucket(total, 1000, 1500);
assert!(bucket >= total || total >= 1500, "bucket={bucket} total={total}");
}
}
#[tokio::test]
async fn maybe_write_shape_padding_writes_exact_delta() {
let mut writer = CountingWriter::new();
maybe_write_shape_padding(&mut writer, 1200, true, 1000, 1500, false, 0).await;
assert_eq!(writer.written, 300);
}
#[tokio::test]
async fn maybe_write_shape_padding_skips_when_disabled() {
let mut writer = CountingWriter::new();
maybe_write_shape_padding(&mut writer, 1200, false, 1000, 1500, false, 0).await;
assert_eq!(writer.written, 0);
}
#[tokio::test]
async fn relay_to_mask_applies_cap_clamped_padding_for_non_power_of_two_cap() {
let initial = vec![0x16, 0x03, 0x01, 0x04, 0x00];
let extra = vec![0xAB; 1195];
let (client_reader, mut client_writer) = duplex(4096);
let (mut mask_observer, mask_writer) = duplex(4096);
let relay = tokio::spawn(async move {
relay_to_mask(
client_reader,
sink(),
empty(),
mask_writer,
&initial,
true,
1000,
1500,
false,
0,
)
.await;
});
client_writer.write_all(&extra).await.unwrap();
client_writer.shutdown().await.unwrap();
relay.await.unwrap();
let mut observed = Vec::new();
mask_observer.read_to_end(&mut observed).await.unwrap();
assert_eq!(observed.len(), 1500);
assert_eq!(&observed[..5], &[0x16, 0x03, 0x01, 0x04, 0x00]);
assert!(observed[5..1200].iter().all(|b| *b == 0xAB));
assert_eq!(observed[1200..].len(), 300);
}
#[test]
fn shape_bucket_light_fuzz_monotonicity_and_bounds() {
let floor = 512usize;
let cap = 4096usize;
let mut prev = 0usize;
for step in 1usize..=3000 {
let total = ((step * 37) ^ (step << 3)) % (cap + 512);
let bucket = next_mask_shape_bucket(total, floor, cap);
if total < cap {
assert!(bucket >= total, "bucket={bucket} total={total}");
assert!(bucket <= cap, "bucket={bucket} cap={cap}");
} else {
assert_eq!(bucket, total, "above-cap totals must remain unchanged");
}
if total >= prev {
// For non-decreasing inputs, bucket class must not regress.
let prev_bucket = next_mask_shape_bucket(prev, floor, cap);
assert!(bucket >= prev_bucket || total >= cap);
}
prev = total;
}
}

View File

@ -0,0 +1,120 @@
use super::*;
use tokio::io::duplex;
use tokio::net::TcpListener;
use tokio::time::{Duration, Instant};
#[derive(Clone, Copy)]
enum MaskPath {
ConnectFail,
ConnectSuccess,
SlowBackend,
}
async fn measure_bad_client_duration_ms(path: MaskPath, floor_ms: u64, ceiling_ms: u64) -> u128 {
let mut config = ProxyConfig::default();
config.general.beobachten = false;
config.censorship.mask = true;
config.censorship.mask_unix_sock = None;
config.censorship.mask_timing_normalization_enabled = true;
config.censorship.mask_timing_normalization_floor_ms = floor_ms;
config.censorship.mask_timing_normalization_ceiling_ms = ceiling_ms;
let accept_task = match path {
MaskPath::ConnectFail => {
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = 1;
None
}
MaskPath::ConnectSuccess => {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = backend_addr.port();
Some(tokio::spawn(async move {
let (_stream, _) = listener.accept().await.unwrap();
}))
}
MaskPath::SlowBackend => {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = backend_addr.port();
Some(tokio::spawn(async move {
let (_stream, _) = listener.accept().await.unwrap();
tokio::time::sleep(Duration::from_millis(320)).await;
}))
}
};
let (client_reader, _client_writer) = duplex(1024);
let (_client_visible_reader, client_visible_writer) = duplex(1024);
let peer: SocketAddr = "198.51.100.221:57121".parse().unwrap();
let local: SocketAddr = "127.0.0.1:443".parse().unwrap();
let beobachten = BeobachtenStore::new();
let started = Instant::now();
handle_bad_client(
client_reader,
client_visible_writer,
b"GET /timing-normalize HTTP/1.1\r\nHost: x\r\n\r\n",
peer,
local,
&config,
&beobachten,
)
.await;
if let Some(task) = accept_task {
let _ = tokio::time::timeout(Duration::from_secs(2), task).await;
}
started.elapsed().as_millis()
}
#[tokio::test]
async fn timing_normalization_envelope_applies_to_connect_fail_and_success() {
let floor = 160u64;
let ceiling = 180u64;
let fail = measure_bad_client_duration_ms(MaskPath::ConnectFail, floor, ceiling).await;
let success = measure_bad_client_duration_ms(MaskPath::ConnectSuccess, floor, ceiling).await;
assert!(
fail >= floor as u128,
"connect-fail duration below floor: {fail}ms < {floor}ms"
);
assert!(
fail <= (ceiling + 60) as u128,
"connect-fail duration exceeded relaxed ceiling: {fail}ms > {}ms",
ceiling + 60
);
assert!(
success >= floor as u128,
"connect-success duration below floor: {success}ms < {floor}ms"
);
assert!(
success <= (ceiling + 60) as u128,
"connect-success duration exceeded relaxed ceiling: {success}ms > {}ms",
ceiling + 60
);
let delta = fail.abs_diff(success);
assert!(
delta <= 80,
"timing normalization should reduce path divergence (delta={}ms)",
delta
);
}
#[tokio::test]
async fn timing_normalization_does_not_sleep_if_path_already_exceeds_ceiling() {
let floor = 120u64;
let ceiling = 150u64;
let slow = measure_bad_client_duration_ms(MaskPath::SlowBackend, floor, ceiling).await;
assert!(slow >= 280, "slow backend path should remain slow (got {slow}ms)");
assert!(slow <= 520, "slow backend path should remain bounded in tests (got {slow}ms)");
}

View File

@ -0,0 +1,200 @@
use super::*;
use tokio::io::duplex;
use tokio::net::TcpListener;
use tokio::time::{Duration, Instant};
#[derive(Clone, Copy)]
enum TimingPath {
ConnectFail,
ConnectSuccess,
SlowBackend,
}
async fn measure_path_duration_ms(path: TimingPath) -> u128 {
let mut config = ProxyConfig::default();
config.general.beobachten = false;
config.censorship.mask = true;
config.censorship.mask_unix_sock = None;
let maybe_accept = match path {
TimingPath::ConnectFail => {
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = 1;
None
}
TimingPath::ConnectSuccess => {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = backend_addr.port();
Some(tokio::spawn(async move {
let (_stream, _) = listener.accept().await.unwrap();
}))
}
TimingPath::SlowBackend => {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = backend_addr.port();
Some(tokio::spawn(async move {
let (_stream, _) = listener.accept().await.unwrap();
tokio::time::sleep(Duration::from_millis(350)).await;
}))
}
};
let peer: SocketAddr = "198.51.100.213:57013".parse().unwrap();
let local: SocketAddr = "127.0.0.1:443".parse().unwrap();
let beobachten = BeobachtenStore::new();
let (client_reader, _client_writer) = duplex(1024);
let (_client_visible_reader, client_visible_writer) = duplex(1024);
let started = Instant::now();
handle_bad_client(
client_reader,
client_visible_writer,
b"GET /timing HTTP/1.1\r\nHost: x\r\n\r\n",
peer,
local,
&config,
&beobachten,
)
.await;
if let Some(task) = maybe_accept {
let _ = tokio::time::timeout(Duration::from_secs(2), task).await;
}
started.elapsed().as_millis()
}
fn summarize(values: &[u128]) -> (u128, u128, f64) {
let min = *values.iter().min().unwrap_or(&0);
let max = *values.iter().max().unwrap_or(&0);
let sum: u128 = values.iter().copied().sum();
let mean = if values.is_empty() {
0.0
} else {
sum as f64 / values.len() as f64
};
(min, max, mean)
}
#[tokio::test]
#[ignore = "red-team expected-fail: strict path-indistinguishability target"]
async fn redteam_timing_01_connect_fail_success_slow_backend_must_be_within_10ms() {
const ITER: usize = 8;
let mut fail = Vec::with_capacity(ITER);
let mut success = Vec::with_capacity(ITER);
let mut slow = Vec::with_capacity(ITER);
for _ in 0..ITER {
fail.push(measure_path_duration_ms(TimingPath::ConnectFail).await);
success.push(measure_path_duration_ms(TimingPath::ConnectSuccess).await);
slow.push(measure_path_duration_ms(TimingPath::SlowBackend).await);
}
let (_, fail_max, fail_mean) = summarize(&fail);
let (_, success_max, success_mean) = summarize(&success);
let (_, slow_max, slow_mean) = summarize(&slow);
let global_min = *fail
.iter()
.chain(success.iter())
.chain(slow.iter())
.min()
.unwrap();
let global_max = *fail
.iter()
.chain(success.iter())
.chain(slow.iter())
.max()
.unwrap();
println!(
"redteam_timing path=connect_fail mean_ms={:.2} max_ms={}",
fail_mean, fail_max
);
println!(
"redteam_timing path=connect_success mean_ms={:.2} max_ms={}",
success_mean, success_max
);
println!(
"redteam_timing path=slow_backend mean_ms={:.2} max_ms={}",
slow_mean, slow_max
);
assert!(
global_max.saturating_sub(global_min) <= 10,
"strict model expects all masking outcomes in one 10ms bucket: min={global_min} max={global_max}"
);
}
#[tokio::test]
#[ignore = "red-team expected-fail: strict classifier-separability target"]
async fn redteam_timing_02_path_classifier_centroid_accuracy_must_be_below_40pct() {
const ITER: usize = 12;
let mut fail = Vec::with_capacity(ITER);
let mut success = Vec::with_capacity(ITER);
let mut slow = Vec::with_capacity(ITER);
for _ in 0..ITER {
fail.push(measure_path_duration_ms(TimingPath::ConnectFail).await as f64);
success.push(measure_path_duration_ms(TimingPath::ConnectSuccess).await as f64);
slow.push(measure_path_duration_ms(TimingPath::SlowBackend).await as f64);
}
let mean = |v: &Vec<f64>| -> f64 { v.iter().sum::<f64>() / v.len() as f64 };
let c_fail = mean(&fail);
let c_success = mean(&success);
let c_slow = mean(&slow);
let mut correct = 0usize;
let mut total = 0usize;
let classify = |x: f64, c0: f64, c1: f64, c2: f64| -> usize {
let d0 = (x - c0).abs();
let d1 = (x - c1).abs();
let d2 = (x - c2).abs();
if d0 <= d1 && d0 <= d2 {
0
} else if d1 <= d0 && d1 <= d2 {
1
} else {
2
}
};
for &x in &fail {
total += 1;
if classify(x, c_fail, c_success, c_slow) == 0 {
correct += 1;
}
}
for &x in &success {
total += 1;
if classify(x, c_fail, c_success, c_slow) == 1 {
correct += 1;
}
}
for &x in &slow {
total += 1;
if classify(x, c_fail, c_success, c_slow) == 2 {
correct += 1;
}
}
let accuracy = correct as f64 / total as f64;
println!(
"redteam_timing_classifier accuracy={:.3} c_fail={:.2} c_success={:.2} c_slow={:.2}",
accuracy, c_fail, c_success, c_slow
);
assert!(
accuracy <= 0.40,
"strict model expects poor classifier; observed accuracy={accuracy:.3}"
);
}