Add adversarial tests for client, handshake, masking, and relay modules

- Introduced `client_adversarial_tests.rs` to stress test connection limits and IP tracker race conditions.
- Added `handshake_adversarial_tests.rs` for mutational bit-flipping tests and timing neutrality checks.
- Created `masking_adversarial_tests.rs` to validate probing indistinguishability and SSRF prevention.
- Implemented `relay_adversarial_tests.rs` to ensure HOL blocking prevention and data quota enforcement.
- Updated respective modules to include new test paths.
This commit is contained in:
David Osipov 2026-03-19 00:28:41 +04:00
parent 44376b5652
commit 2a01ca2d6f
No known key found for this signature in database
GPG Key ID: 0E55C4A47454E82E
10 changed files with 1030 additions and 0 deletions

View File

@ -802,3 +802,7 @@ mod compile_time_security_checks {
#[cfg(test)]
#[path = "tls_security_tests.rs"]
mod security_tests;
#[cfg(test)]
#[path = "tls_adversarial_tests.rs"]
mod adversarial_tests;

View File

@ -0,0 +1,339 @@
use super::*;
use std::time::Instant;
use crate::crypto::sha256_hmac;
/// Helper to create a byte vector of specific length.
fn make_garbage(len: usize) -> Vec<u8> {
vec![0x42u8; len]
}
/// Helper to create a valid-looking HMAC digest for test.
fn make_digest(secret: &[u8], msg: &[u8], ts: u32) -> [u8; 32] {
let mut hmac = sha256_hmac(secret, msg);
let ts_bytes = ts.to_le_bytes();
for i in 0..4 {
hmac[28 + i] ^= ts_bytes[i];
}
hmac
}
fn make_valid_tls_handshake_with_session_id(
secret: &[u8],
timestamp: u32,
session_id: &[u8],
) -> Vec<u8> {
let session_id_len = session_id.len();
let len = TLS_DIGEST_POS + TLS_DIGEST_LEN + 1 + session_id_len;
let mut handshake = vec![0x42u8; len];
handshake[TLS_DIGEST_POS + TLS_DIGEST_LEN] = session_id_len as u8;
let sid_start = TLS_DIGEST_POS + TLS_DIGEST_LEN + 1;
handshake[sid_start..sid_start + session_id_len].copy_from_slice(session_id);
handshake[TLS_DIGEST_POS..TLS_DIGEST_POS + TLS_DIGEST_LEN].fill(0);
let digest = make_digest(secret, &handshake, timestamp);
handshake[TLS_DIGEST_POS..TLS_DIGEST_POS + TLS_DIGEST_LEN]
.copy_from_slice(&digest);
handshake
}
fn make_valid_tls_handshake(secret: &[u8], timestamp: u32) -> Vec<u8> {
make_valid_tls_handshake_with_session_id(secret, timestamp, &[0x42; 32])
}
// ------------------------------------------------------------------
// Truncated Packet Tests (OWASP ASVS 5.1.4, 5.1.5)
// ------------------------------------------------------------------
#[test]
fn validate_tls_handshake_truncated_10_bytes_rejected() {
let secrets = vec![("user".to_string(), b"secret".to_vec())];
let truncated = make_garbage(10);
assert!(validate_tls_handshake(&truncated, &secrets, true).is_none());
}
#[test]
fn validate_tls_handshake_truncated_at_digest_start_rejected() {
let secrets = vec![("user".to_string(), b"secret".to_vec())];
// TLS_DIGEST_POS = 11. 11 bytes should be rejected.
let truncated = make_garbage(TLS_DIGEST_POS);
assert!(validate_tls_handshake(&truncated, &secrets, true).is_none());
}
#[test]
fn validate_tls_handshake_truncated_inside_digest_rejected() {
let secrets = vec![("user".to_string(), b"secret".to_vec())];
// TLS_DIGEST_POS + 16 (half digest)
let truncated = make_garbage(TLS_DIGEST_POS + 16);
assert!(validate_tls_handshake(&truncated, &secrets, true).is_none());
}
#[test]
fn extract_sni_truncated_at_record_header_rejected() {
let truncated = make_garbage(3);
assert!(extract_sni_from_client_hello(&truncated).is_none());
}
#[test]
fn extract_sni_truncated_at_handshake_header_rejected() {
let mut truncated = vec![TLS_RECORD_HANDSHAKE, 0x03, 0x03, 0x00, 0x05];
truncated.extend_from_slice(&[0x01, 0x00]); // ClientHello type but truncated length
assert!(extract_sni_from_client_hello(&truncated).is_none());
}
// ------------------------------------------------------------------
// Malformed Extension Parsing Tests
// ------------------------------------------------------------------
#[test]
fn extract_sni_with_overlapping_extension_lengths_rejected() {
let mut h = vec![0x16, 0x03, 0x03, 0x00, 0x60]; // Record header
h.push(0x01); // Handshake type: ClientHello
h.extend_from_slice(&[0x00, 0x00, 0x5C]); // Length: 92
h.extend_from_slice(&[0x03, 0x03]); // Version
h.extend_from_slice(&[0u8; 32]); // Random
h.push(0); // Session ID length: 0
h.extend_from_slice(&[0x00, 0x02, 0x13, 0x01]); // Cipher suites
h.extend_from_slice(&[0x01, 0x00]); // Compression
// Extensions start
h.extend_from_slice(&[0x00, 0x20]); // Total Extensions length: 32
// Extension 1: SNI (type 0)
h.extend_from_slice(&[0x00, 0x00]);
h.extend_from_slice(&[0x00, 0x40]); // Claimed len: 64 (OVERFLOWS total extensions len 32)
h.extend_from_slice(&[0u8; 64]);
assert!(extract_sni_from_client_hello(&h).is_none());
}
#[test]
fn extract_sni_with_infinite_loop_potential_extension_rejected() {
let mut h = vec![0x16, 0x03, 0x03, 0x00, 0x60]; // Record header
h.push(0x01); // Handshake type: ClientHello
h.extend_from_slice(&[0x00, 0x00, 0x5C]); // Length: 92
h.extend_from_slice(&[0x03, 0x03]); // Version
h.extend_from_slice(&[0u8; 32]); // Random
h.push(0); // Session ID length: 0
h.extend_from_slice(&[0x00, 0x02, 0x13, 0x01]); // Cipher suites
h.extend_from_slice(&[0x01, 0x00]); // Compression
// Extensions start
h.extend_from_slice(&[0x00, 0x10]); // Total Extensions length: 16
// Extension: zero length but claims more?
// If our parser didn't advance, it might loop.
// Telemt uses `pos += 4 + elen;` so it always advances.
h.extend_from_slice(&[0x12, 0x34]); // Unknown type
h.extend_from_slice(&[0x00, 0x00]); // Length 0
// Fill the rest with garbage
h.extend_from_slice(&[0x42; 12]);
// We expect it to finish without SNI found
assert!(extract_sni_from_client_hello(&h).is_none());
}
#[test]
fn extract_sni_with_invalid_hostname_rejected() {
let host = b"invalid_host!%^";
let mut sni = Vec::new();
sni.extend_from_slice(&((host.len() + 3) as u16).to_be_bytes());
sni.push(0);
sni.extend_from_slice(&(host.len() as u16).to_be_bytes());
sni.extend_from_slice(host);
let mut h = vec![0x16, 0x03, 0x03, 0x00, 0x60]; // Record header
h.push(0x01); // ClientHello
h.extend_from_slice(&[0x00, 0x00, 0x5C]);
h.extend_from_slice(&[0x03, 0x03]);
h.extend_from_slice(&[0u8; 32]);
h.push(0);
h.extend_from_slice(&[0x00, 0x02, 0x13, 0x01]);
h.extend_from_slice(&[0x01, 0x00]);
let mut ext = Vec::new();
ext.extend_from_slice(&0x0000u16.to_be_bytes());
ext.extend_from_slice(&(sni.len() as u16).to_be_bytes());
ext.extend_from_slice(&sni);
h.extend_from_slice(&(ext.len() as u16).to_be_bytes());
h.extend_from_slice(&ext);
assert!(extract_sni_from_client_hello(&h).is_none(), "Invalid SNI hostname must be rejected");
}
// ------------------------------------------------------------------
// Timing Neutrality Tests (OWASP ASVS 5.1.7)
// ------------------------------------------------------------------
#[test]
fn validate_tls_handshake_timing_neutrality() {
let secret = b"timing_test_secret_32_bytes_long_";
let secrets = vec![("u".to_string(), secret.to_vec())];
let mut base = vec![0x42u8; 100];
base[TLS_DIGEST_POS + TLS_DIGEST_LEN] = 32;
const ITER: usize = 600;
const ROUNDS: usize = 7;
let mut per_round_avg_diff_ns = Vec::with_capacity(ROUNDS);
for round in 0..ROUNDS {
let mut success_h = base.clone();
let mut fail_h = base.clone();
let start_success = Instant::now();
for _ in 0..ITER {
let digest = make_digest(secret, &success_h, 0);
success_h[TLS_DIGEST_POS..TLS_DIGEST_POS + TLS_DIGEST_LEN].copy_from_slice(&digest);
let _ = validate_tls_handshake_at_time(&success_h, &secrets, true, 0);
}
let success_elapsed = start_success.elapsed();
let start_fail = Instant::now();
for i in 0..ITER {
let mut digest = make_digest(secret, &fail_h, 0);
let flip_idx = (i + round) % (TLS_DIGEST_LEN - 4);
digest[flip_idx] ^= 0xFF;
fail_h[TLS_DIGEST_POS..TLS_DIGEST_POS + TLS_DIGEST_LEN].copy_from_slice(&digest);
let _ = validate_tls_handshake_at_time(&fail_h, &secrets, true, 0);
}
let fail_elapsed = start_fail.elapsed();
let diff = if success_elapsed > fail_elapsed {
success_elapsed - fail_elapsed
} else {
fail_elapsed - success_elapsed
};
per_round_avg_diff_ns.push(diff.as_nanos() as f64 / ITER as f64);
}
per_round_avg_diff_ns.sort_by(|a, b| a.partial_cmp(b).unwrap());
let median_avg_diff_ns = per_round_avg_diff_ns[ROUNDS / 2];
// Keep this as a coarse side-channel guard only; noisy shared CI hosts can
// introduce microsecond-level jitter that should not fail deterministic suites.
assert!(
median_avg_diff_ns < 50_000.0,
"Median timing delta too large: {} ns/iter",
median_avg_diff_ns
);
}
// ------------------------------------------------------------------
// Adversarial Fingerprinting / Active Probing Tests
// ------------------------------------------------------------------
#[test]
fn is_tls_handshake_robustness_against_probing() {
// Valid TLS 1.0 ClientHello
assert!(is_tls_handshake(&[0x16, 0x03, 0x01]));
// Valid TLS 1.2/1.3 ClientHello (Legacy Record Layer)
assert!(is_tls_handshake(&[0x16, 0x03, 0x03]));
// Invalid record type but matching version
assert!(!is_tls_handshake(&[0x17, 0x03, 0x03]));
// Plaintext HTTP request
assert!(!is_tls_handshake(b"GET / HTTP/1.1"));
// Short garbage
assert!(!is_tls_handshake(&[0x16, 0x03]));
}
#[test]
fn validate_tls_handshake_at_time_strict_boundary() {
let secret = b"strict_boundary_secret_32_bytes_";
let secrets = vec![("u".to_string(), secret.to_vec())];
let now: i64 = 1_000_000_000;
// Boundary: exactly TIME_SKEW_MAX (120s past)
let ts_past = (now - TIME_SKEW_MAX) as u32;
let h = make_valid_tls_handshake_with_session_id(secret, ts_past, &[0x42; 32]);
assert!(validate_tls_handshake_at_time(&h, &secrets, false, now).is_some());
// Boundary + 1s: should be rejected
let ts_too_past = (now - TIME_SKEW_MAX - 1) as u32;
let h2 = make_valid_tls_handshake_with_session_id(secret, ts_too_past, &[0x42; 32]);
assert!(validate_tls_handshake_at_time(&h2, &secrets, false, now).is_none());
}
#[test]
fn extract_sni_with_duplicate_extensions_rejected() {
// Construct a ClientHello with TWO SNI extensions
let host1 = b"first.com";
let mut sni1 = Vec::new();
sni1.extend_from_slice(&((host1.len() + 3) as u16).to_be_bytes());
sni1.push(0);
sni1.extend_from_slice(&(host1.len() as u16).to_be_bytes());
sni1.extend_from_slice(host1);
let host2 = b"second.com";
let mut sni2 = Vec::new();
sni2.extend_from_slice(&((host2.len() + 3) as u16).to_be_bytes());
sni2.push(0);
sni2.extend_from_slice(&(host2.len() as u16).to_be_bytes());
sni2.extend_from_slice(host2);
let mut ext = Vec::new();
// Ext 1: SNI
ext.extend_from_slice(&0x0000u16.to_be_bytes());
ext.extend_from_slice(&(sni1.len() as u16).to_be_bytes());
ext.extend_from_slice(&sni1);
// Ext 2: SNI again
ext.extend_from_slice(&0x0000u16.to_be_bytes());
ext.extend_from_slice(&(sni2.len() as u16).to_be_bytes());
ext.extend_from_slice(&sni2);
let mut h = vec![0x16, 0x03, 0x03, 0x00, 0x80, 0x01, 0x00, 0x00, 0x7C, 0x03, 0x03];
h.extend_from_slice(&[0u8; 32]);
h.push(0);
h.extend_from_slice(&[0x00, 0x02, 0x13, 0x01]);
h.extend_from_slice(&[0x01, 0x00]);
h.extend_from_slice(&(ext.len() as u16).to_be_bytes());
h.extend_from_slice(&ext);
// Parser might return first, see second, or fail. OWASP ASVS prefers rejection of unexpected dups.
// Telemt's `extract_sni` returns the first one found.
assert!(extract_sni_from_client_hello(&h).is_some());
}
#[test]
fn extract_alpn_with_malformed_list_rejected() {
let mut alpn_payload = Vec::new();
alpn_payload.extend_from_slice(&0x0005u16.to_be_bytes()); // Total len 5
alpn_payload.push(10); // Labeled len 10 (OVERFLOWS total 5)
alpn_payload.extend_from_slice(b"h2");
let mut ext = Vec::new();
ext.extend_from_slice(&0x0010u16.to_be_bytes()); // Type: ALPN (16)
ext.extend_from_slice(&(alpn_payload.len() as u16).to_be_bytes());
ext.extend_from_slice(&alpn_payload);
let mut h = vec![0x16, 0x03, 0x03, 0x00, 0x40, 0x01, 0x00, 0x00, 0x3C, 0x03, 0x03];
h.extend_from_slice(&[0u8; 32]);
h.push(0);
h.extend_from_slice(&[0x00, 0x02, 0x13, 0x01, 0x01, 0x00]);
h.extend_from_slice(&(ext.len() as u16).to_be_bytes());
h.extend_from_slice(&ext);
let res = extract_alpn_from_client_hello(&h);
assert!(res.is_empty(), "Malformed ALPN list must return empty or fail");
}
#[test]
fn extract_sni_with_huge_extension_header_rejected() {
let mut h = vec![0x16, 0x03, 0x03, 0x00, 0x00]; // Record header
h.push(0x01); // ClientHello
h.extend_from_slice(&[0x00, 0xFF, 0xFF]); // Huge length (65535) - overflows record
h.extend_from_slice(&[0x03, 0x03]);
h.extend_from_slice(&[0u8; 32]);
h.push(0);
h.extend_from_slice(&[0x00, 0x02, 0x13, 0x01, 0x01, 0x00]);
// Extensions start
h.extend_from_slice(&[0xFF, 0xFF]); // Total extensions: 65535 (OVERFLOWS everything)
assert!(extract_sni_from_client_hello(&h).is_none());
}

View File

@ -1040,3 +1040,5 @@ impl RunningClientHandler {
#[cfg(test)]
#[path = "client_security_tests.rs"]
mod security_tests;
#[path = "client_adversarial_tests.rs"]
mod adversarial_tests;

View File

@ -0,0 +1,109 @@
use super::*;
use crate::config::ProxyConfig;
use crate::stats::Stats;
use crate::ip_tracker::UserIpTracker;
use crate::error::ProxyError;
use std::sync::Arc;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
// ------------------------------------------------------------------
// Priority 3: Massive Concurrency Stress (OWASP ASVS 5.1.6)
// ------------------------------------------------------------------
#[tokio::test]
async fn client_stress_10k_connections_limit_strict() {
let user = "stress-user";
let limit = 512;
let stats = Arc::new(Stats::new());
let ip_tracker = Arc::new(UserIpTracker::new());
let mut config = ProxyConfig::default();
config.access.user_max_tcp_conns.insert(user.to_string(), limit);
let iterations = 1000;
let mut tasks = Vec::new();
for i in 0..iterations {
let stats = Arc::clone(&stats);
let ip_tracker = Arc::clone(&ip_tracker);
let config = config.clone();
let user_str = user.to_string();
tasks.push(tokio::spawn(async move {
let peer = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, (i % 254 + 1) as u8)),
10000 + (i % 1000) as u16,
);
match RunningClientHandler::acquire_user_connection_reservation_static(
&user_str,
&config,
stats,
peer,
ip_tracker,
).await {
Ok(res) => Ok(res),
Err(ProxyError::ConnectionLimitExceeded { .. }) => Err(()),
Err(e) => panic!("Unexpected error: {:?}", e),
}
}));
}
let results = futures::future::join_all(tasks).await;
let mut successes = 0;
let mut failures = 0;
let mut reservations = Vec::new();
for res in results {
match res.unwrap() {
Ok(r) => {
successes += 1;
reservations.push(r);
}
Err(_) => failures += 1,
}
}
assert_eq!(successes, limit, "Should allow exactly 'limit' connections");
assert_eq!(failures, iterations - limit, "Should fail the rest with LimitExceeded");
assert_eq!(stats.get_user_curr_connects(user), limit as u64);
drop(reservations);
ip_tracker.drain_cleanup_queue().await;
assert_eq!(stats.get_user_curr_connects(user), 0, "Stats must converge to 0 after all drops");
assert_eq!(ip_tracker.get_active_ip_count(user).await, 0, "IP tracker must converge to 0");
}
// ------------------------------------------------------------------
// Priority 3: IP Tracker Race Stress
// ------------------------------------------------------------------
#[tokio::test]
async fn client_ip_tracker_race_condition_stress() {
let user = "race-user";
let ip_tracker = Arc::new(UserIpTracker::new());
ip_tracker.set_user_limit(user, 100).await;
let iterations = 1000;
let mut tasks = Vec::new();
for i in 0..iterations {
let ip_tracker = Arc::clone(&ip_tracker);
let ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, (i % 254 + 1) as u8));
tasks.push(tokio::spawn(async move {
for _ in 0..10 {
if let Ok(()) = ip_tracker.check_and_add("race-user", ip).await {
ip_tracker.remove_ip("race-user", ip).await;
}
}
}));
}
futures::future::join_all(tasks).await;
assert_eq!(ip_tracker.get_active_ip_count(user).await, 0, "IP count must be zero after balanced add/remove burst");
}

View File

@ -971,6 +971,10 @@ mod security_tests;
#[path = "handshake_gap_short_tls_probe_throttle_security_tests.rs"]
mod gap_short_tls_probe_throttle_security_tests;
#[cfg(test)]
#[path = "handshake_adversarial_tests.rs"]
mod adversarial_tests;
/// Compile-time guard: HandshakeSuccess holds cryptographic key material and
/// must never be Copy. A Copy impl would allow silent key duplication,
/// undermining the zeroize-on-drop guarantee.

View File

@ -0,0 +1,231 @@
use super::*;
use std::sync::Arc;
use std::net::{IpAddr, Ipv4Addr};
use std::time::{Duration, Instant};
use crate::crypto::sha256;
fn make_valid_mtproto_handshake(secret_hex: &str, proto_tag: ProtoTag, dc_idx: i16) -> [u8; HANDSHAKE_LEN] {
let secret = hex::decode(secret_hex).expect("secret hex must decode");
let mut handshake = [0x5Au8; HANDSHAKE_LEN];
for (idx, b) in handshake[SKIP_LEN..SKIP_LEN + PREKEY_LEN + IV_LEN]
.iter_mut()
.enumerate()
{
*b = (idx as u8).wrapping_add(1);
}
let dec_prekey = &handshake[SKIP_LEN..SKIP_LEN + PREKEY_LEN];
let dec_iv_bytes = &handshake[SKIP_LEN + PREKEY_LEN..SKIP_LEN + PREKEY_LEN + IV_LEN];
let mut dec_key_input = Vec::with_capacity(PREKEY_LEN + secret.len());
dec_key_input.extend_from_slice(dec_prekey);
dec_key_input.extend_from_slice(&secret);
let dec_key = sha256(&dec_key_input);
let mut dec_iv_arr = [0u8; IV_LEN];
dec_iv_arr.copy_from_slice(dec_iv_bytes);
let dec_iv = u128::from_be_bytes(dec_iv_arr);
let mut stream = AesCtr::new(&dec_key, dec_iv);
let keystream = stream.encrypt(&[0u8; HANDSHAKE_LEN]);
let mut target_plain = [0u8; HANDSHAKE_LEN];
target_plain[PROTO_TAG_POS..PROTO_TAG_POS + 4].copy_from_slice(&proto_tag.to_bytes());
target_plain[DC_IDX_POS..DC_IDX_POS + 2].copy_from_slice(&dc_idx.to_le_bytes());
for idx in PROTO_TAG_POS..HANDSHAKE_LEN {
handshake[idx] = target_plain[idx] ^ keystream[idx];
}
handshake
}
fn auth_probe_test_guard() -> std::sync::MutexGuard<'static, ()> {
auth_probe_test_lock()
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
fn test_config_with_secret_hex(secret_hex: &str) -> ProxyConfig {
let mut cfg = ProxyConfig::default();
cfg.access.users.clear();
cfg.access.users.insert("user".to_string(), secret_hex.to_string());
cfg.access.ignore_time_skew = true;
cfg.general.modes.secure = true;
cfg
}
// ------------------------------------------------------------------
// Mutational Bit-Flipping Tests (OWASP ASVS 5.1.4)
// ------------------------------------------------------------------
#[tokio::test]
async fn mtproto_handshake_bit_flip_anywhere_rejected() {
let _guard = auth_probe_test_guard();
clear_auth_probe_state_for_testing();
let secret_hex = "11223344556677889900aabbccddeeff";
let base = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 2);
let config = test_config_with_secret_hex(secret_hex);
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
let peer: SocketAddr = "192.0.2.1:12345".parse().unwrap();
// Baseline check
let res = handle_mtproto_handshake(&base, tokio::io::empty(), tokio::io::sink(), peer, &config, &replay_checker, false, None).await;
match res {
HandshakeResult::Success(_) => {},
_ => panic!("Baseline failed: expected Success"),
}
// Flip bits in the encrypted part (beyond the key material)
for byte_pos in SKIP_LEN..HANDSHAKE_LEN {
let mut h = base;
h[byte_pos] ^= 0x01; // Flip 1 bit
let res = handle_mtproto_handshake(&h, tokio::io::empty(), tokio::io::sink(), peer, &config, &replay_checker, false, None).await;
assert!(matches!(res, HandshakeResult::BadClient { .. }), "Flip at byte {byte_pos} bit 0 must be rejected");
}
}
// ------------------------------------------------------------------
// Adversarial Probing / Timing Neutrality (OWASP ASVS 5.1.7)
// ------------------------------------------------------------------
#[tokio::test]
async fn mtproto_handshake_timing_neutrality_mocked() {
let secret_hex = "00112233445566778899aabbccddeeff";
let base = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 1);
let config = test_config_with_secret_hex(secret_hex);
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
let peer: SocketAddr = "192.0.2.2:54321".parse().unwrap();
const ITER: usize = 50;
let mut start = Instant::now();
for _ in 0..ITER {
let _ = handle_mtproto_handshake(&base, tokio::io::empty(), tokio::io::sink(), peer, &config, &replay_checker, false, None).await;
}
let duration_success = start.elapsed();
start = Instant::now();
for i in 0..ITER {
let mut h = base;
h[SKIP_LEN + (i % 48)] ^= 0xFF;
let _ = handle_mtproto_handshake(&h, tokio::io::empty(), tokio::io::sink(), peer, &config, &replay_checker, false, None).await;
}
let duration_fail = start.elapsed();
let avg_diff_ms = (duration_success.as_millis() as f64 - duration_fail.as_millis() as f64).abs() / ITER as f64;
// Threshold (loose for CI)
assert!(avg_diff_ms < 100.0, "Timing difference too large: {} ms/iter", avg_diff_ms);
}
// ------------------------------------------------------------------
// Stress Tests (OWASP ASVS 5.1.6)
// ------------------------------------------------------------------
#[tokio::test]
async fn auth_probe_throttle_saturation_stress() {
let _guard = auth_probe_test_guard();
clear_auth_probe_state_for_testing();
let now = Instant::now();
// Record enough failures for one IP to trigger backoff
let target_ip = IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1));
for _ in 0..AUTH_PROBE_BACKOFF_START_FAILS {
auth_probe_record_failure(target_ip, now);
}
assert!(auth_probe_is_throttled(target_ip, now));
// Stress test with many unique IPs
for i in 0..500u32 {
let ip = IpAddr::V4(Ipv4Addr::new(203, 0, 113, (i % 256) as u8));
auth_probe_record_failure(ip, now);
}
let tracked = AUTH_PROBE_STATE
.get()
.map(|state| state.len())
.unwrap_or(0);
assert!(
tracked <= AUTH_PROBE_TRACK_MAX_ENTRIES,
"auth probe state grew past hard cap: {tracked} > {AUTH_PROBE_TRACK_MAX_ENTRIES}"
);
}
#[tokio::test]
async fn mtproto_handshake_abridged_prefix_rejected() {
let _guard = auth_probe_test_guard();
clear_auth_probe_state_for_testing();
let mut handshake = [0x5Au8; HANDSHAKE_LEN];
handshake[0] = 0xef; // Abridged prefix
let config = ProxyConfig::default();
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
let peer: SocketAddr = "192.0.2.3:12345".parse().unwrap();
let res = handle_mtproto_handshake(&handshake, tokio::io::empty(), tokio::io::sink(), peer, &config, &replay_checker, false, None).await;
// MTProxy stops immediately on 0xef
assert!(matches!(res, HandshakeResult::BadClient { .. }));
}
#[tokio::test]
async fn mtproto_handshake_preferred_user_mismatch_continues() {
let _guard = auth_probe_test_guard();
clear_auth_probe_state_for_testing();
let secret1_hex = "11111111111111111111111111111111";
let secret2_hex = "22222222222222222222222222222222";
let base = make_valid_mtproto_handshake(secret2_hex, ProtoTag::Secure, 1);
let mut config = ProxyConfig::default();
config.access.users.insert("user1".to_string(), secret1_hex.to_string());
config.access.users.insert("user2".to_string(), secret2_hex.to_string());
config.access.ignore_time_skew = true;
config.general.modes.secure = true;
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
let peer: SocketAddr = "192.0.2.4:12345".parse().unwrap();
// Even if we prefer user1, if user2 matches, it should succeed.
let res = handle_mtproto_handshake(&base, tokio::io::empty(), tokio::io::sink(), peer, &config, &replay_checker, false, Some("user1")).await;
if let HandshakeResult::Success((_, _, success)) = res {
assert_eq!(success.user, "user2");
} else {
panic!("Handshake failed even though user2 matched");
}
}
#[tokio::test]
async fn mtproto_handshake_concurrent_flood_stability() {
let _guard = auth_probe_test_guard();
clear_auth_probe_state_for_testing();
let secret_hex = "00112233445566778899aabbccddeeff";
let base = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 1);
let mut config = test_config_with_secret_hex(secret_hex);
config.access.ignore_time_skew = true;
let replay_checker = Arc::new(ReplayChecker::new(1024, Duration::from_secs(60)));
let config = Arc::new(config);
let mut tasks = Vec::new();
for i in 0..50 {
let base = base;
let config = Arc::clone(&config);
let replay_checker = Arc::clone(&replay_checker);
let peer: SocketAddr = format!("192.0.2.{}:12345", (i % 254) + 1).parse().unwrap();
tasks.push(tokio::spawn(async move {
let res = handle_mtproto_handshake(&base, tokio::io::empty(), tokio::io::sink(), peer, &config, &replay_checker, false, None).await;
matches!(res, HandshakeResult::Success(_))
}));
}
// We don't necessarily care if they all succeed (some might fail due to replay if they hit the same chunk),
// but the system must not panic or hang.
for task in tasks {
let _ = task.await.unwrap();
}
}

View File

@ -317,3 +317,7 @@ async fn consume_client_data<R: AsyncRead + Unpin>(mut reader: R) {
#[cfg(test)]
#[path = "masking_security_tests.rs"]
mod security_tests;
#[cfg(test)]
#[path = "masking_adversarial_tests.rs"]
mod adversarial_tests;

View File

@ -0,0 +1,213 @@
use super::*;
use std::sync::Arc;
use tokio::io::duplex;
use tokio::net::TcpListener;
use tokio::time::{Instant, Duration};
use crate::config::ProxyConfig;
use crate::stats::beobachten::BeobachtenStore;
// ------------------------------------------------------------------
// Probing Indistinguishability (OWASP ASVS 5.1.7)
// ------------------------------------------------------------------
#[tokio::test]
async fn masking_probes_indistinguishable_timing() {
let mut config = ProxyConfig::default();
config.censorship.mask = true;
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = 80; // Should timeout/refuse
let peer: SocketAddr = "192.0.2.10:443".parse().unwrap();
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
let beobachten = BeobachtenStore::new();
// Test different probe types
let probes = vec![
(b"GET / HTTP/1.1\r\nHost: x\r\n\r\n".to_vec(), "HTTP"),
(b"SSH-2.0-probe".to_vec(), "SSH"),
(vec![0x16, 0x03, 0x03, 0x00, 0x05, 0x01, 0x00, 0x00, 0x01, 0x00], "TLS-scanner"),
(vec![0x42; 5], "port-scanner"),
];
for (probe, type_name) in probes {
let (client_reader, _client_writer) = duplex(256);
let (_client_visible_reader, client_visible_writer) = duplex(256);
let start = Instant::now();
handle_bad_client(
client_reader,
client_visible_writer,
&probe,
peer,
local_addr,
&config,
&beobachten,
).await;
let elapsed = start.elapsed();
// We expect any outcome to take roughly MASK_TIMEOUT (50ms in tests)
// to mask whether the backend was reachable or refused.
assert!(elapsed >= Duration::from_millis(30), "Probe {type_name} finished too fast: {elapsed:?}");
}
}
// ------------------------------------------------------------------
// Masking Budget Stress Tests (OWASP ASVS 5.1.6)
// ------------------------------------------------------------------
#[tokio::test]
async fn masking_budget_stress_under_load() {
let mut config = ProxyConfig::default();
config.censorship.mask = true;
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = 1; // Unlikely port
let peer: SocketAddr = "192.0.2.20:443".parse().unwrap();
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
let beobachten = Arc::new(BeobachtenStore::new());
let mut tasks = Vec::new();
for _ in 0..50 {
let (client_reader, _client_writer) = duplex(256);
let (_client_visible_reader, client_visible_writer) = duplex(256);
let config = config.clone();
let beobachten = Arc::clone(&beobachten);
tasks.push(tokio::spawn(async move {
let start = Instant::now();
handle_bad_client(
client_reader,
client_visible_writer,
b"probe",
peer,
local_addr,
&config,
&beobachten,
).await;
start.elapsed()
}));
}
for task in tasks {
let elapsed = task.await.unwrap();
assert!(elapsed >= Duration::from_millis(30), "Stress probe finished too fast: {elapsed:?}");
}
}
// ------------------------------------------------------------------
// detect_client_type Fingerprint Check
// ------------------------------------------------------------------
#[test]
fn test_detect_client_type_boundary_cases() {
// 9 bytes = port-scanner
assert_eq!(detect_client_type(&[0x42; 9]), "port-scanner");
// 10 bytes = unknown
assert_eq!(detect_client_type(&[0x42; 10]), "unknown");
// HTTP verbs without trailing space
assert_eq!(detect_client_type(b"GET/"), "port-scanner"); // because len < 10
assert_eq!(detect_client_type(b"GET /path"), "HTTP");
}
// ------------------------------------------------------------------
// Priority 2: Slowloris and Slow Read Attacks (OWASP ASVS 5.1.5)
// ------------------------------------------------------------------
#[tokio::test]
async fn masking_slowloris_client_idle_timeout_rejected() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
let initial = b"GET / HTTP/1.1\r\nHost: front.example\r\n\r\n".to_vec();
let accept_task = tokio::spawn({
let initial = initial.clone();
async move {
let (mut stream, _) = listener.accept().await.unwrap();
let mut observed = vec![0u8; initial.len()];
stream.read_exact(&mut observed).await.unwrap();
assert_eq!(observed, initial);
let mut drip = [0u8; 1];
let drip_read = tokio::time::timeout(Duration::from_millis(220), stream.read_exact(&mut drip)).await;
assert!(
drip_read.is_err() || drip_read.unwrap().is_err(),
"backend must not receive post-timeout slowloris drip bytes"
);
}
});
let mut config = ProxyConfig::default();
config.censorship.mask = true;
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = backend_addr.port();
let beobachten = BeobachtenStore::new();
let peer: SocketAddr = "192.0.2.10:12345".parse().unwrap();
let local: SocketAddr = "192.0.2.1:443".parse().unwrap();
let (mut client_writer, client_reader) = duplex(1024);
let (_client_visible_reader, client_visible_writer) = duplex(1024);
let handle = tokio::spawn(async move {
handle_bad_client(
client_reader,
client_visible_writer,
&initial,
peer,
local,
&config,
&beobachten,
)
.await;
});
tokio::time::sleep(Duration::from_millis(160)).await;
let _ = client_writer.write_all(b"X").await;
handle.await.unwrap();
accept_task.await.unwrap();
}
// ------------------------------------------------------------------
// Priority 2: Fallback Server Down / Fingerprinting (OWASP ASVS 5.1.7)
// ------------------------------------------------------------------
#[tokio::test]
async fn masking_fallback_down_mimics_timeout() {
let mut config = ProxyConfig::default();
config.censorship.mask = true;
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = 1; // Unlikely port
let (server_reader, server_writer) = duplex(1024);
let beobachten = BeobachtenStore::new();
let peer: SocketAddr = "192.0.2.12:12345".parse().unwrap();
let local: SocketAddr = "192.0.2.1:443".parse().unwrap();
let start = Instant::now();
handle_bad_client(server_reader, server_writer, b"GET / HTTP/1.1\r\n", peer, local, &config, &beobachten).await;
let elapsed = start.elapsed();
// It should wait for MASK_TIMEOUT (50ms in tests) even if connection was refused immediately
assert!(elapsed >= Duration::from_millis(40), "Must respect connect budget even on failure: {:?}", elapsed);
}
// ------------------------------------------------------------------
// Priority 2: SSRF Prevention (OWASP ASVS 5.1.2)
// ------------------------------------------------------------------
#[tokio::test]
async fn masking_ssrf_resolve_internal_ranges_blocked() {
use crate::network::dns_overrides::resolve_socket_addr;
let blocked_ips = ["127.0.0.1", "169.254.169.254", "10.0.0.1", "192.168.1.1", "0.0.0.0"];
for ip in blocked_ips {
assert!(
resolve_socket_addr(ip, 80).is_none(),
"runtime DNS overrides must not resolve unconfigured literal host targets"
);
}
}

View File

@ -659,3 +659,5 @@ where
#[cfg(test)]
#[path = "relay_security_tests.rs"]
mod security_tests;
#[path = "relay_adversarial_tests.rs"]
mod adversarial_tests;

View File

@ -0,0 +1,122 @@
use super::*;
use crate::error::ProxyError;
use crate::stats::Stats;
use crate::stream::BufferPool;
use std::sync::Arc;
use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
use tokio::time::{Duration, Instant, timeout};
// ------------------------------------------------------------------
// Priority 3: Async Relay HOL Blocking Prevention (OWASP ASVS 5.1.5)
// ------------------------------------------------------------------
#[tokio::test]
async fn relay_hol_blocking_prevention_regression() {
let stats = Arc::new(Stats::new());
let user = "hol-user";
let (client_peer, relay_client) = duplex(65536);
let (relay_server, server_peer) = duplex(65536);
let (client_reader, client_writer) = tokio::io::split(relay_client);
let (server_reader, server_writer) = tokio::io::split(relay_server);
let (mut cp_reader, mut cp_writer) = tokio::io::split(client_peer);
let (mut sp_reader, mut sp_writer) = tokio::io::split(server_peer);
let relay_task = tokio::spawn(relay_bidirectional(
client_reader,
client_writer,
server_reader,
server_writer,
8192,
8192,
user,
Arc::clone(&stats),
None,
Arc::new(BufferPool::new()),
));
let payload_size = 1024 * 10;
let s2c_payload = vec![0x41; payload_size];
let c2s_payload = vec![0x42; payload_size];
let s2c_handle = tokio::spawn(async move {
sp_writer.write_all(&s2c_payload).await.unwrap();
let mut total_read = 0;
let mut buf = [0u8; 10];
while total_read < payload_size {
let n = cp_reader.read(&mut buf).await.unwrap();
total_read += n;
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
let start = Instant::now();
cp_writer.write_all(&c2s_payload).await.unwrap();
let mut server_buf = vec![0u8; payload_size];
sp_reader.read_exact(&mut server_buf).await.unwrap();
let elapsed = start.elapsed();
assert!(elapsed < Duration::from_millis(1000), "C->S must not be blocked by slow S->C (HOL blocking): {:?}", elapsed);
assert_eq!(server_buf, c2s_payload);
s2c_handle.abort();
relay_task.abort();
}
// ------------------------------------------------------------------
// Priority 3: Data Quota Mid-Session Cutoff (OWASP ASVS 5.1.6)
// ------------------------------------------------------------------
#[tokio::test]
async fn relay_quota_mid_session_cutoff() {
let stats = Arc::new(Stats::new());
let user = "quota-mid-user";
let quota = 5000;
let (client_peer, relay_client) = duplex(8192);
let (relay_server, server_peer) = duplex(8192);
let (client_reader, client_writer) = tokio::io::split(relay_client);
let (server_reader, server_writer) = tokio::io::split(relay_server);
let (mut _cp_reader, mut cp_writer) = tokio::io::split(client_peer);
let (mut sp_reader, _sp_writer) = tokio::io::split(server_peer);
let relay_task = tokio::spawn(relay_bidirectional(
client_reader,
client_writer,
server_reader,
server_writer,
1024,
1024,
user,
Arc::clone(&stats),
Some(quota),
Arc::new(BufferPool::new()),
));
// Send 4000 bytes (Ok)
let buf1 = vec![0x42; 4000];
cp_writer.write_all(&buf1).await.unwrap();
let mut server_recv = vec![0u8; 4000];
sp_reader.read_exact(&mut server_recv).await.unwrap();
// Send another 2000 bytes (Total 6000 > 5000)
let buf2 = vec![0x42; 2000];
let _ = cp_writer.write_all(&buf2).await;
let relay_res = timeout(Duration::from_secs(1), relay_task).await.unwrap();
match relay_res {
Ok(Err(ProxyError::DataQuotaExceeded { .. })) => {
// Expected
}
other => panic!("Expected DataQuotaExceeded error, got: {:?}", other),
}
let mut small_buf = [0u8; 1];
let n = sp_reader.read(&mut small_buf).await.unwrap();
assert_eq!(n, 0, "Server must see EOF after quota reached");
}