mirror of https://github.com/telemt/telemt.git
Add adversarial tests for MTProto handshake and enhance masking functionality
- Introduced multiple adversarial tests for MTProto handshake to ensure robustness against replay attacks, invalid mutations, and concurrent flooding. - Implemented a function to build proxy headers based on the specified version, improving the handling of masking protocols. - Added tests to validate the behavior of the masking functionality under various conditions, including unknown proxy protocol versions and oversized payloads. - Enhanced relay tests to ensure stability and performance under high load and half-close scenarios.
This commit is contained in:
parent
9dce748679
commit
babd902d95
|
|
@ -334,6 +334,24 @@ impl ProxyConfig {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let handshake_timeout_ms = config
|
||||||
|
.timeouts
|
||||||
|
.client_handshake
|
||||||
|
.checked_mul(1000)
|
||||||
|
.ok_or_else(|| {
|
||||||
|
ProxyError::Config(
|
||||||
|
"timeouts.client_handshake is too large to validate milliseconds budget"
|
||||||
|
.to_string(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if config.censorship.server_hello_delay_max_ms >= handshake_timeout_ms {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"censorship.server_hello_delay_max_ms must be < timeouts.client_handshake * 1000"
|
||||||
|
.to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if config.timeouts.relay_client_idle_soft_secs == 0 {
|
if config.timeouts.relay_client_idle_soft_secs == 0 {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
"timeouts.relay_client_idle_soft_secs must be > 0".to_string(),
|
"timeouts.relay_client_idle_soft_secs must be > 0".to_string(),
|
||||||
|
|
@ -977,6 +995,10 @@ impl ProxyConfig {
|
||||||
#[path = "load_idle_policy_tests.rs"]
|
#[path = "load_idle_policy_tests.rs"]
|
||||||
mod load_idle_policy_tests;
|
mod load_idle_policy_tests;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
#[path = "load_security_tests.rs"]
|
||||||
|
mod load_security_tests;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,84 @@
|
||||||
|
use super::*;
|
||||||
|
use std::fs;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
fn write_temp_config(contents: &str) -> PathBuf {
|
||||||
|
let nonce = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.expect("system time must be after unix epoch")
|
||||||
|
.as_nanos();
|
||||||
|
let path = std::env::temp_dir().join(format!("telemt-load-security-{nonce}.toml"));
|
||||||
|
fs::write(&path, contents).expect("temp config write must succeed");
|
||||||
|
path
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_temp_config(path: &PathBuf) {
|
||||||
|
let _ = fs::remove_file(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn load_rejects_server_hello_delay_equal_to_handshake_timeout_budget() {
|
||||||
|
let path = write_temp_config(
|
||||||
|
r#"
|
||||||
|
[timeouts]
|
||||||
|
client_handshake = 1
|
||||||
|
|
||||||
|
[censorship]
|
||||||
|
server_hello_delay_max_ms = 1000
|
||||||
|
"#,
|
||||||
|
);
|
||||||
|
|
||||||
|
let err = ProxyConfig::load(&path)
|
||||||
|
.expect_err("delay equal to handshake timeout must be rejected");
|
||||||
|
let msg = err.to_string();
|
||||||
|
assert!(
|
||||||
|
msg.contains("censorship.server_hello_delay_max_ms must be < timeouts.client_handshake * 1000"),
|
||||||
|
"error must explain delay<timeout invariant, got: {msg}"
|
||||||
|
);
|
||||||
|
|
||||||
|
remove_temp_config(&path);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn load_rejects_server_hello_delay_larger_than_handshake_timeout_budget() {
|
||||||
|
let path = write_temp_config(
|
||||||
|
r#"
|
||||||
|
[timeouts]
|
||||||
|
client_handshake = 1
|
||||||
|
|
||||||
|
[censorship]
|
||||||
|
server_hello_delay_max_ms = 1500
|
||||||
|
"#,
|
||||||
|
);
|
||||||
|
|
||||||
|
let err = ProxyConfig::load(&path)
|
||||||
|
.expect_err("delay larger than handshake timeout must be rejected");
|
||||||
|
let msg = err.to_string();
|
||||||
|
assert!(
|
||||||
|
msg.contains("censorship.server_hello_delay_max_ms must be < timeouts.client_handshake * 1000"),
|
||||||
|
"error must explain delay<timeout invariant, got: {msg}"
|
||||||
|
);
|
||||||
|
|
||||||
|
remove_temp_config(&path);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn load_accepts_server_hello_delay_strictly_below_handshake_timeout_budget() {
|
||||||
|
let path = write_temp_config(
|
||||||
|
r#"
|
||||||
|
[timeouts]
|
||||||
|
client_handshake = 1
|
||||||
|
|
||||||
|
[censorship]
|
||||||
|
server_hello_delay_max_ms = 999
|
||||||
|
"#,
|
||||||
|
);
|
||||||
|
|
||||||
|
let cfg = ProxyConfig::load(&path)
|
||||||
|
.expect("delay below handshake timeout budget must be accepted");
|
||||||
|
assert_eq!(cfg.timeouts.client_handshake, 1);
|
||||||
|
assert_eq!(cfg.censorship.server_hello_delay_max_ms, 999);
|
||||||
|
|
||||||
|
remove_temp_config(&path);
|
||||||
|
}
|
||||||
|
|
@ -462,3 +462,208 @@ async fn client_repeat_acquire_release_cycles_never_accumulate_state() {
|
||||||
assert_eq!(stats.get_user_curr_connects(user), 0);
|
assert_eq!(stats.get_user_curr_connects(user), 0);
|
||||||
assert_eq!(ip_tracker.get_active_ip_count(user).await, 0);
|
assert_eq!(ip_tracker.get_active_ip_count(user).await, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn client_multi_user_isolation_under_parallel_limit_exhaustion() {
|
||||||
|
let stats = Arc::new(Stats::new());
|
||||||
|
let ip_tracker = Arc::new(UserIpTracker::new());
|
||||||
|
|
||||||
|
let mut config = ProxyConfig::default();
|
||||||
|
config.access.user_max_tcp_conns.insert("u1".to_string(), 8);
|
||||||
|
config.access.user_max_tcp_conns.insert("u2".to_string(), 8);
|
||||||
|
|
||||||
|
let mut tasks = Vec::new();
|
||||||
|
for i in 0..128u16 {
|
||||||
|
let stats = Arc::clone(&stats);
|
||||||
|
let ip_tracker = Arc::clone(&ip_tracker);
|
||||||
|
let config = config.clone();
|
||||||
|
tasks.push(tokio::spawn(async move {
|
||||||
|
let user = if i % 2 == 0 { "u1" } else { "u2" };
|
||||||
|
let peer = SocketAddr::new(
|
||||||
|
IpAddr::V4(Ipv4Addr::new(100, 64, (i / 64) as u8, (i % 64 + 1) as u8)),
|
||||||
|
37000 + i,
|
||||||
|
);
|
||||||
|
RunningClientHandler::acquire_user_connection_reservation_static(
|
||||||
|
user,
|
||||||
|
&config,
|
||||||
|
stats,
|
||||||
|
peer,
|
||||||
|
ip_tracker,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut u1_success = 0usize;
|
||||||
|
let mut u2_success = 0usize;
|
||||||
|
let mut reservations = Vec::new();
|
||||||
|
for (idx, result) in futures::future::join_all(tasks).await.into_iter().enumerate() {
|
||||||
|
let user = if idx % 2 == 0 { "u1" } else { "u2" };
|
||||||
|
match result.unwrap() {
|
||||||
|
Ok(reservation) => {
|
||||||
|
if user == "u1" {
|
||||||
|
u1_success += 1;
|
||||||
|
} else {
|
||||||
|
u2_success += 1;
|
||||||
|
}
|
||||||
|
reservations.push(reservation);
|
||||||
|
}
|
||||||
|
Err(ProxyError::ConnectionLimitExceeded { .. }) => {}
|
||||||
|
Err(other) => panic!("unexpected error: {other}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(u1_success, 8, "u1 must get exactly its own configured cap");
|
||||||
|
assert_eq!(u2_success, 8, "u2 must get exactly its own configured cap");
|
||||||
|
|
||||||
|
drop(reservations);
|
||||||
|
ip_tracker.drain_cleanup_queue().await;
|
||||||
|
assert_eq!(stats.get_user_curr_connects("u1"), 0);
|
||||||
|
assert_eq!(stats.get_user_curr_connects("u2"), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn client_limit_recovery_after_full_rejection_wave() {
|
||||||
|
let user = "recover-user";
|
||||||
|
let stats = Arc::new(Stats::new());
|
||||||
|
let ip_tracker = Arc::new(UserIpTracker::new());
|
||||||
|
ip_tracker.set_user_limit(user, 1).await;
|
||||||
|
|
||||||
|
let mut config = ProxyConfig::default();
|
||||||
|
config.access.user_max_tcp_conns.insert(user.to_string(), 1);
|
||||||
|
|
||||||
|
let first_peer: SocketAddr = "198.51.100.50:38001".parse().unwrap();
|
||||||
|
let reservation = RunningClientHandler::acquire_user_connection_reservation_static(
|
||||||
|
user,
|
||||||
|
&config,
|
||||||
|
stats.clone(),
|
||||||
|
first_peer,
|
||||||
|
ip_tracker.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
for i in 0..64u16 {
|
||||||
|
let peer = SocketAddr::new(
|
||||||
|
IpAddr::V4(Ipv4Addr::new(198, 51, 100, (i % 60 + 1) as u8)),
|
||||||
|
38002 + i,
|
||||||
|
);
|
||||||
|
let denied = RunningClientHandler::acquire_user_connection_reservation_static(
|
||||||
|
user,
|
||||||
|
&config,
|
||||||
|
stats.clone(),
|
||||||
|
peer,
|
||||||
|
ip_tracker.clone(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert!(matches!(denied, Err(ProxyError::ConnectionLimitExceeded { .. })));
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(reservation);
|
||||||
|
ip_tracker.drain_cleanup_queue().await;
|
||||||
|
assert_eq!(stats.get_user_curr_connects(user), 0);
|
||||||
|
|
||||||
|
let recovery_peer: SocketAddr = "198.51.100.200:38999".parse().unwrap();
|
||||||
|
let recovered = RunningClientHandler::acquire_user_connection_reservation_static(
|
||||||
|
user,
|
||||||
|
&config,
|
||||||
|
stats.clone(),
|
||||||
|
recovery_peer,
|
||||||
|
ip_tracker.clone(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert!(recovered.is_ok(), "capacity must recover after prior holder drops");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn client_dual_limit_cross_product_never_leaks_on_reject() {
|
||||||
|
let user = "dual-limit-user";
|
||||||
|
let stats = Arc::new(Stats::new());
|
||||||
|
let ip_tracker = Arc::new(UserIpTracker::new());
|
||||||
|
ip_tracker.set_user_limit(user, 2).await;
|
||||||
|
|
||||||
|
let mut config = ProxyConfig::default();
|
||||||
|
config.access.user_max_tcp_conns.insert(user.to_string(), 2);
|
||||||
|
|
||||||
|
let p1: SocketAddr = "203.0.113.10:39001".parse().unwrap();
|
||||||
|
let p2: SocketAddr = "203.0.113.11:39002".parse().unwrap();
|
||||||
|
let r1 = RunningClientHandler::acquire_user_connection_reservation_static(
|
||||||
|
user,
|
||||||
|
&config,
|
||||||
|
stats.clone(),
|
||||||
|
p1,
|
||||||
|
ip_tracker.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let r2 = RunningClientHandler::acquire_user_connection_reservation_static(
|
||||||
|
user,
|
||||||
|
&config,
|
||||||
|
stats.clone(),
|
||||||
|
p2,
|
||||||
|
ip_tracker.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
for i in 0..32u16 {
|
||||||
|
let peer = SocketAddr::new(
|
||||||
|
IpAddr::V4(Ipv4Addr::new(203, 0, 113, (50 + i) as u8)),
|
||||||
|
39010 + i,
|
||||||
|
);
|
||||||
|
let denied = RunningClientHandler::acquire_user_connection_reservation_static(
|
||||||
|
user,
|
||||||
|
&config,
|
||||||
|
stats.clone(),
|
||||||
|
peer,
|
||||||
|
ip_tracker.clone(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert!(matches!(denied, Err(ProxyError::ConnectionLimitExceeded { .. })));
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(stats.get_user_curr_connects(user), 2);
|
||||||
|
drop((r1, r2));
|
||||||
|
ip_tracker.drain_cleanup_queue().await;
|
||||||
|
assert_eq!(stats.get_user_curr_connects(user), 0);
|
||||||
|
assert_eq!(ip_tracker.get_active_ip_count(user).await, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn client_check_user_limits_concurrent_churn_no_counter_drift() {
|
||||||
|
let user = "check-drift-user";
|
||||||
|
let stats = Arc::new(Stats::new());
|
||||||
|
let ip_tracker = Arc::new(UserIpTracker::new());
|
||||||
|
ip_tracker.set_user_limit(user, 64).await;
|
||||||
|
|
||||||
|
let mut config = ProxyConfig::default();
|
||||||
|
config.access.user_max_tcp_conns.insert(user.to_string(), 64);
|
||||||
|
|
||||||
|
let mut tasks = Vec::new();
|
||||||
|
for i in 0..512u16 {
|
||||||
|
let stats = Arc::clone(&stats);
|
||||||
|
let ip_tracker = Arc::clone(&ip_tracker);
|
||||||
|
let config = config.clone();
|
||||||
|
tasks.push(tokio::spawn(async move {
|
||||||
|
let peer = SocketAddr::new(
|
||||||
|
IpAddr::V4(Ipv4Addr::new(172, 20, (i / 255) as u8, (i % 255 + 1) as u8)),
|
||||||
|
40000 + (i % 500),
|
||||||
|
);
|
||||||
|
let _ = RunningClientHandler::check_user_limits_static(
|
||||||
|
user,
|
||||||
|
&config,
|
||||||
|
&stats,
|
||||||
|
peer,
|
||||||
|
&ip_tracker,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
for task in futures::future::join_all(tasks).await {
|
||||||
|
task.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(stats.get_user_curr_connects(user), 0);
|
||||||
|
assert_eq!(ip_tracker.get_active_ip_count(user).await, 0);
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -229,3 +229,239 @@ async fn mtproto_handshake_concurrent_flood_stability() {
|
||||||
let _ = task.await.unwrap();
|
let _ = task.await.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn mtproto_replay_is_rejected_across_distinct_peers() {
|
||||||
|
let _guard = auth_probe_test_guard();
|
||||||
|
clear_auth_probe_state_for_testing();
|
||||||
|
|
||||||
|
let secret_hex = "0123456789abcdeffedcba9876543210";
|
||||||
|
let handshake = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 2);
|
||||||
|
let config = test_config_with_secret_hex(secret_hex);
|
||||||
|
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
|
||||||
|
|
||||||
|
let first_peer: SocketAddr = "198.51.100.10:41001".parse().unwrap();
|
||||||
|
let second_peer: SocketAddr = "198.51.100.11:41002".parse().unwrap();
|
||||||
|
|
||||||
|
let first = handle_mtproto_handshake(
|
||||||
|
&handshake,
|
||||||
|
tokio::io::empty(),
|
||||||
|
tokio::io::sink(),
|
||||||
|
first_peer,
|
||||||
|
&config,
|
||||||
|
&replay_checker,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert!(matches!(first, HandshakeResult::Success(_)));
|
||||||
|
|
||||||
|
let replay = handle_mtproto_handshake(
|
||||||
|
&handshake,
|
||||||
|
tokio::io::empty(),
|
||||||
|
tokio::io::sink(),
|
||||||
|
second_peer,
|
||||||
|
&config,
|
||||||
|
&replay_checker,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert!(matches!(replay, HandshakeResult::BadClient { .. }));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn mtproto_blackhat_mutation_corpus_never_panics_and_stays_fail_closed() {
|
||||||
|
let _guard = auth_probe_test_guard();
|
||||||
|
clear_auth_probe_state_for_testing();
|
||||||
|
|
||||||
|
let secret_hex = "89abcdef012345670123456789abcdef";
|
||||||
|
let base = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 2);
|
||||||
|
let config = test_config_with_secret_hex(secret_hex);
|
||||||
|
let replay_checker = ReplayChecker::new(8192, Duration::from_secs(60));
|
||||||
|
|
||||||
|
for i in 0..512usize {
|
||||||
|
let mut mutated = base;
|
||||||
|
let pos = (SKIP_LEN + (i * 31) % (HANDSHAKE_LEN - SKIP_LEN)).min(HANDSHAKE_LEN - 1);
|
||||||
|
mutated[pos] ^= ((i as u8) | 1).rotate_left((i % 8) as u32);
|
||||||
|
let peer: SocketAddr = SocketAddr::new(
|
||||||
|
IpAddr::V4(Ipv4Addr::new(198, 18, (i / 254) as u8, (i % 254 + 1) as u8)),
|
||||||
|
42000 + (i % 1000) as u16,
|
||||||
|
);
|
||||||
|
|
||||||
|
let res = tokio::time::timeout(
|
||||||
|
Duration::from_millis(250),
|
||||||
|
handle_mtproto_handshake(
|
||||||
|
&mutated,
|
||||||
|
tokio::io::empty(),
|
||||||
|
tokio::io::sink(),
|
||||||
|
peer,
|
||||||
|
&config,
|
||||||
|
&replay_checker,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fuzzed mutation must complete in bounded time");
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
matches!(res, HandshakeResult::BadClient { .. } | HandshakeResult::Success(_)),
|
||||||
|
"mutation corpus must stay within explicit handshake outcomes"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn auth_probe_success_clears_throttled_peer_state() {
|
||||||
|
let _guard = auth_probe_test_guard();
|
||||||
|
clear_auth_probe_state_for_testing();
|
||||||
|
|
||||||
|
let target_ip = IpAddr::V4(Ipv4Addr::new(203, 0, 113, 90));
|
||||||
|
let now = Instant::now();
|
||||||
|
for _ in 0..AUTH_PROBE_BACKOFF_START_FAILS {
|
||||||
|
auth_probe_record_failure(target_ip, now);
|
||||||
|
}
|
||||||
|
assert!(auth_probe_is_throttled(target_ip, now));
|
||||||
|
|
||||||
|
auth_probe_record_success(target_ip);
|
||||||
|
assert!(
|
||||||
|
!auth_probe_is_throttled(target_ip, now + Duration::from_millis(1)),
|
||||||
|
"successful auth must clear per-peer throttle state"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn mtproto_invalid_storm_over_cap_keeps_probe_map_hard_bounded() {
|
||||||
|
let _guard = auth_probe_test_guard();
|
||||||
|
clear_auth_probe_state_for_testing();
|
||||||
|
|
||||||
|
let secret_hex = "00112233445566778899aabbccddeeff";
|
||||||
|
let mut invalid = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 2);
|
||||||
|
invalid[SKIP_LEN + 3] ^= 0xff;
|
||||||
|
|
||||||
|
let config = test_config_with_secret_hex(secret_hex);
|
||||||
|
let replay_checker = ReplayChecker::new(64, Duration::from_secs(60));
|
||||||
|
|
||||||
|
for i in 0..(AUTH_PROBE_TRACK_MAX_ENTRIES + 512) {
|
||||||
|
let peer: SocketAddr = SocketAddr::new(
|
||||||
|
IpAddr::V4(Ipv4Addr::new(10, (i / 65535) as u8, ((i / 255) % 255) as u8, (i % 255 + 1) as u8)),
|
||||||
|
43000 + (i % 20000) as u16,
|
||||||
|
);
|
||||||
|
let res = handle_mtproto_handshake(
|
||||||
|
&invalid,
|
||||||
|
tokio::io::empty(),
|
||||||
|
tokio::io::sink(),
|
||||||
|
peer,
|
||||||
|
&config,
|
||||||
|
&replay_checker,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert!(matches!(res, HandshakeResult::BadClient { .. }));
|
||||||
|
}
|
||||||
|
|
||||||
|
let tracked = AUTH_PROBE_STATE
|
||||||
|
.get()
|
||||||
|
.map(|state| state.len())
|
||||||
|
.unwrap_or(0);
|
||||||
|
assert!(
|
||||||
|
tracked <= AUTH_PROBE_TRACK_MAX_ENTRIES,
|
||||||
|
"probe map must remain bounded under invalid storm: {tracked}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn mtproto_property_style_multi_bit_mutations_fail_closed_or_auth_only() {
|
||||||
|
let _guard = auth_probe_test_guard();
|
||||||
|
clear_auth_probe_state_for_testing();
|
||||||
|
|
||||||
|
let secret_hex = "f0e1d2c3b4a5968778695a4b3c2d1e0f";
|
||||||
|
let base = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 2);
|
||||||
|
let config = test_config_with_secret_hex(secret_hex);
|
||||||
|
let replay_checker = ReplayChecker::new(10_000, Duration::from_secs(60));
|
||||||
|
|
||||||
|
let mut seed: u64 = 0xC0FF_EE12_3456_789A;
|
||||||
|
for i in 0..2_048usize {
|
||||||
|
let mut mutated = base;
|
||||||
|
for _ in 0..4 {
|
||||||
|
seed ^= seed << 7;
|
||||||
|
seed ^= seed >> 9;
|
||||||
|
seed ^= seed << 8;
|
||||||
|
let idx = SKIP_LEN + (seed as usize % (HANDSHAKE_LEN - SKIP_LEN));
|
||||||
|
mutated[idx] ^= ((seed >> 11) as u8).wrapping_add(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
let peer: SocketAddr = SocketAddr::new(
|
||||||
|
IpAddr::V4(Ipv4Addr::new(10, 123, (i / 254) as u8, (i % 254 + 1) as u8)),
|
||||||
|
45000 + (i % 2000) as u16,
|
||||||
|
);
|
||||||
|
|
||||||
|
let outcome = tokio::time::timeout(
|
||||||
|
Duration::from_millis(250),
|
||||||
|
handle_mtproto_handshake(
|
||||||
|
&mutated,
|
||||||
|
tokio::io::empty(),
|
||||||
|
tokio::io::sink(),
|
||||||
|
peer,
|
||||||
|
&config,
|
||||||
|
&replay_checker,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("mutation iteration must complete in bounded time");
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
matches!(outcome, HandshakeResult::BadClient { .. } | HandshakeResult::Success(_)),
|
||||||
|
"mutations must remain fail-closed/auth-only"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[ignore = "heavy soak; run manually"]
|
||||||
|
async fn mtproto_blackhat_20k_mutation_soak_never_panics() {
|
||||||
|
let _guard = auth_probe_test_guard();
|
||||||
|
clear_auth_probe_state_for_testing();
|
||||||
|
|
||||||
|
let secret_hex = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
|
||||||
|
let base = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 2);
|
||||||
|
let config = test_config_with_secret_hex(secret_hex);
|
||||||
|
let replay_checker = ReplayChecker::new(50_000, Duration::from_secs(120));
|
||||||
|
|
||||||
|
let mut seed: u64 = 0xA5A5_5A5A_DEAD_BEEF;
|
||||||
|
for i in 0..20_000usize {
|
||||||
|
let mut mutated = base;
|
||||||
|
for _ in 0..3 {
|
||||||
|
seed ^= seed << 7;
|
||||||
|
seed ^= seed >> 9;
|
||||||
|
seed ^= seed << 8;
|
||||||
|
let idx = SKIP_LEN + (seed as usize % (HANDSHAKE_LEN - SKIP_LEN));
|
||||||
|
mutated[idx] ^= ((seed >> 19) as u8).wrapping_add(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
let peer: SocketAddr = SocketAddr::new(
|
||||||
|
IpAddr::V4(Ipv4Addr::new(172, 31, (i / 254) as u8, (i % 254 + 1) as u8)),
|
||||||
|
47000 + (i % 15000) as u16,
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = tokio::time::timeout(
|
||||||
|
Duration::from_millis(250),
|
||||||
|
handle_mtproto_handshake(
|
||||||
|
&mutated,
|
||||||
|
tokio::io::empty(),
|
||||||
|
tokio::io::sink(),
|
||||||
|
peer,
|
||||||
|
&config,
|
||||||
|
&replay_checker,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("soak mutation must complete in bounded time");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -120,6 +120,37 @@ fn detect_client_type(data: &[u8]) -> &'static str {
|
||||||
"unknown"
|
"unknown"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn build_mask_proxy_header(
|
||||||
|
version: u8,
|
||||||
|
peer: SocketAddr,
|
||||||
|
local_addr: SocketAddr,
|
||||||
|
) -> Option<Vec<u8>> {
|
||||||
|
match version {
|
||||||
|
0 => None,
|
||||||
|
2 => Some(
|
||||||
|
ProxyProtocolV2Builder::new()
|
||||||
|
.with_addrs(peer, local_addr)
|
||||||
|
.build(),
|
||||||
|
),
|
||||||
|
_ => {
|
||||||
|
let header = match (peer, local_addr) {
|
||||||
|
(SocketAddr::V4(src), SocketAddr::V4(dst)) => {
|
||||||
|
ProxyProtocolV1Builder::new()
|
||||||
|
.tcp4(src.into(), dst.into())
|
||||||
|
.build()
|
||||||
|
}
|
||||||
|
(SocketAddr::V6(src), SocketAddr::V6(dst)) => {
|
||||||
|
ProxyProtocolV1Builder::new()
|
||||||
|
.tcp6(src.into(), dst.into())
|
||||||
|
.build()
|
||||||
|
}
|
||||||
|
_ => ProxyProtocolV1Builder::new().build(),
|
||||||
|
};
|
||||||
|
Some(header)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle a bad client by forwarding to mask host
|
/// Handle a bad client by forwarding to mask host
|
||||||
pub async fn handle_bad_client<R, W>(
|
pub async fn handle_bad_client<R, W>(
|
||||||
reader: R,
|
reader: R,
|
||||||
|
|
@ -162,23 +193,8 @@ where
|
||||||
match connect_result {
|
match connect_result {
|
||||||
Ok(Ok(stream)) => {
|
Ok(Ok(stream)) => {
|
||||||
let (mask_read, mut mask_write) = stream.into_split();
|
let (mask_read, mut mask_write) = stream.into_split();
|
||||||
let proxy_header: Option<Vec<u8>> = match config.censorship.mask_proxy_protocol {
|
let proxy_header =
|
||||||
0 => None,
|
build_mask_proxy_header(config.censorship.mask_proxy_protocol, peer, local_addr);
|
||||||
version => {
|
|
||||||
let header = match version {
|
|
||||||
2 => ProxyProtocolV2Builder::new().with_addrs(peer, local_addr).build(),
|
|
||||||
_ => match (peer, local_addr) {
|
|
||||||
(SocketAddr::V4(src), SocketAddr::V4(dst)) =>
|
|
||||||
ProxyProtocolV1Builder::new().tcp4(src.into(), dst.into()).build(),
|
|
||||||
(SocketAddr::V6(src), SocketAddr::V6(dst)) =>
|
|
||||||
ProxyProtocolV1Builder::new().tcp6(src.into(), dst.into()).build(),
|
|
||||||
_ =>
|
|
||||||
ProxyProtocolV1Builder::new().build(),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
Some(header)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if let Some(header) = proxy_header {
|
if let Some(header) = proxy_header {
|
||||||
if !write_proxy_header_with_timeout(&mut mask_write, &header).await {
|
if !write_proxy_header_with_timeout(&mut mask_write, &header).await {
|
||||||
wait_mask_outcome_budget(outcome_started).await;
|
wait_mask_outcome_budget(outcome_started).await;
|
||||||
|
|
@ -226,23 +242,8 @@ where
|
||||||
let connect_result = timeout(MASK_TIMEOUT, TcpStream::connect(&mask_addr)).await;
|
let connect_result = timeout(MASK_TIMEOUT, TcpStream::connect(&mask_addr)).await;
|
||||||
match connect_result {
|
match connect_result {
|
||||||
Ok(Ok(stream)) => {
|
Ok(Ok(stream)) => {
|
||||||
let proxy_header: Option<Vec<u8>> = match config.censorship.mask_proxy_protocol {
|
let proxy_header =
|
||||||
0 => None,
|
build_mask_proxy_header(config.censorship.mask_proxy_protocol, peer, local_addr);
|
||||||
version => {
|
|
||||||
let header = match version {
|
|
||||||
2 => ProxyProtocolV2Builder::new().with_addrs(peer, local_addr).build(),
|
|
||||||
_ => match (peer, local_addr) {
|
|
||||||
(SocketAddr::V4(src), SocketAddr::V4(dst)) =>
|
|
||||||
ProxyProtocolV1Builder::new().tcp4(src.into(), dst.into()).build(),
|
|
||||||
(SocketAddr::V6(src), SocketAddr::V6(dst)) =>
|
|
||||||
ProxyProtocolV1Builder::new().tcp6(src.into(), dst.into()).build(),
|
|
||||||
_ =>
|
|
||||||
ProxyProtocolV1Builder::new().build(),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
Some(header)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let (mask_read, mut mask_write) = stream.into_split();
|
let (mask_read, mut mask_write) = stream.into_split();
|
||||||
if let Some(header) = proxy_header {
|
if let Some(header) = proxy_header {
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,10 @@ use tokio::io::duplex;
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
use tokio::time::{Instant, Duration};
|
use tokio::time::{Instant, Duration};
|
||||||
use crate::config::ProxyConfig;
|
use crate::config::ProxyConfig;
|
||||||
|
use crate::proxy::relay::relay_bidirectional;
|
||||||
|
use crate::stats::Stats;
|
||||||
use crate::stats::beobachten::BeobachtenStore;
|
use crate::stats::beobachten::BeobachtenStore;
|
||||||
|
use crate::stream::BufferPool;
|
||||||
|
|
||||||
// ------------------------------------------------------------------
|
// ------------------------------------------------------------------
|
||||||
// Probing Indistinguishability (OWASP ASVS 5.1.7)
|
// Probing Indistinguishability (OWASP ASVS 5.1.7)
|
||||||
|
|
@ -211,3 +214,549 @@ async fn masking_ssrf_resolve_internal_ranges_blocked() {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn masking_unknown_proxy_protocol_version_falls_back_to_v1_unknown_header() {
|
||||||
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
let backend_addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
|
let accept_task = tokio::spawn(async move {
|
||||||
|
let (mut stream, _) = listener.accept().await.unwrap();
|
||||||
|
|
||||||
|
let mut header = [0u8; 15];
|
||||||
|
stream.read_exact(&mut header).await.unwrap();
|
||||||
|
assert_eq!(&header, b"PROXY UNKNOWN\r\n");
|
||||||
|
|
||||||
|
let mut payload = [0u8; 5];
|
||||||
|
stream.read_exact(&mut payload).await.unwrap();
|
||||||
|
assert_eq!(&payload, b"probe");
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut config = ProxyConfig::default();
|
||||||
|
config.censorship.mask = true;
|
||||||
|
config.censorship.mask_host = Some("127.0.0.1".to_string());
|
||||||
|
config.censorship.mask_port = backend_addr.port();
|
||||||
|
config.censorship.mask_proxy_protocol = 255;
|
||||||
|
|
||||||
|
let peer: SocketAddr = "198.51.100.77:50001".parse().unwrap();
|
||||||
|
let local_addr: SocketAddr = "[2001:db8::10]:443".parse().unwrap();
|
||||||
|
let beobachten = BeobachtenStore::new();
|
||||||
|
let (client_reader, _client_writer) = duplex(128);
|
||||||
|
let (_client_visible_reader, client_visible_writer) = duplex(128);
|
||||||
|
|
||||||
|
handle_bad_client(
|
||||||
|
client_reader,
|
||||||
|
client_visible_writer,
|
||||||
|
b"probe",
|
||||||
|
peer,
|
||||||
|
local_addr,
|
||||||
|
&config,
|
||||||
|
&beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
accept_task.await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn masking_zero_length_initial_data_does_not_hang_or_panic() {
|
||||||
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
let backend_addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
|
let accept_task = tokio::spawn(async move {
|
||||||
|
let (mut stream, _) = listener.accept().await.unwrap();
|
||||||
|
let mut one = [0u8; 1];
|
||||||
|
let n = tokio::time::timeout(Duration::from_millis(150), stream.read(&mut one))
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(n, 0, "backend must observe clean EOF for empty initial payload");
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut config = ProxyConfig::default();
|
||||||
|
config.censorship.mask = true;
|
||||||
|
config.censorship.mask_host = Some("127.0.0.1".to_string());
|
||||||
|
config.censorship.mask_port = backend_addr.port();
|
||||||
|
|
||||||
|
let peer: SocketAddr = "203.0.113.70:50002".parse().unwrap();
|
||||||
|
let local: SocketAddr = "127.0.0.1:443".parse().unwrap();
|
||||||
|
let beobachten = BeobachtenStore::new();
|
||||||
|
|
||||||
|
let (client_reader, client_writer) = duplex(64);
|
||||||
|
drop(client_writer);
|
||||||
|
let (_client_visible_reader, client_visible_writer) = duplex(64);
|
||||||
|
|
||||||
|
handle_bad_client(
|
||||||
|
client_reader,
|
||||||
|
client_visible_writer,
|
||||||
|
b"",
|
||||||
|
peer,
|
||||||
|
local,
|
||||||
|
&config,
|
||||||
|
&beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
accept_task.await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn masking_oversized_initial_payload_is_forwarded_verbatim() {
|
||||||
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
let backend_addr = listener.local_addr().unwrap();
|
||||||
|
let payload = vec![0xA5u8; 32 * 1024];
|
||||||
|
|
||||||
|
let accept_task = tokio::spawn({
|
||||||
|
let payload = payload.clone();
|
||||||
|
async move {
|
||||||
|
let (mut stream, _) = listener.accept().await.unwrap();
|
||||||
|
let mut observed = vec![0u8; payload.len()];
|
||||||
|
stream.read_exact(&mut observed).await.unwrap();
|
||||||
|
assert_eq!(observed, payload, "large initial payload must stay byte-for-byte");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut config = ProxyConfig::default();
|
||||||
|
config.censorship.mask = true;
|
||||||
|
config.censorship.mask_host = Some("127.0.0.1".to_string());
|
||||||
|
config.censorship.mask_port = backend_addr.port();
|
||||||
|
|
||||||
|
let peer: SocketAddr = "203.0.113.71:50003".parse().unwrap();
|
||||||
|
let local: SocketAddr = "127.0.0.1:443".parse().unwrap();
|
||||||
|
let beobachten = BeobachtenStore::new();
|
||||||
|
let (client_reader, _client_writer) = duplex(64);
|
||||||
|
let (_client_visible_reader, client_visible_writer) = duplex(64);
|
||||||
|
|
||||||
|
handle_bad_client(
|
||||||
|
client_reader,
|
||||||
|
client_visible_writer,
|
||||||
|
&payload,
|
||||||
|
peer,
|
||||||
|
local,
|
||||||
|
&config,
|
||||||
|
&beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
accept_task.await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn masking_refused_backend_keeps_constantish_timing_floor_under_burst() {
|
||||||
|
let mut config = ProxyConfig::default();
|
||||||
|
config.censorship.mask = true;
|
||||||
|
config.censorship.mask_host = Some("127.0.0.1".to_string());
|
||||||
|
config.censorship.mask_port = 1;
|
||||||
|
|
||||||
|
let peer: SocketAddr = "203.0.113.72:50004".parse().unwrap();
|
||||||
|
let local: SocketAddr = "127.0.0.1:443".parse().unwrap();
|
||||||
|
let beobachten = BeobachtenStore::new();
|
||||||
|
|
||||||
|
for _ in 0..16 {
|
||||||
|
let (client_reader, _client_writer) = duplex(128);
|
||||||
|
let (_client_visible_reader, client_visible_writer) = duplex(128);
|
||||||
|
let started = Instant::now();
|
||||||
|
handle_bad_client(
|
||||||
|
client_reader,
|
||||||
|
client_visible_writer,
|
||||||
|
b"GET / HTTP/1.1\r\n",
|
||||||
|
peer,
|
||||||
|
local,
|
||||||
|
&config,
|
||||||
|
&beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert!(
|
||||||
|
started.elapsed() >= Duration::from_millis(30),
|
||||||
|
"refused-backend path must keep timing floor to reduce fingerprinting"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn masking_backend_half_close_then_client_half_close_completes_without_hang() {
|
||||||
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
let backend_addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
|
let accept_task = tokio::spawn(async move {
|
||||||
|
let (mut stream, _) = listener.accept().await.unwrap();
|
||||||
|
let mut pre = [0u8; 4];
|
||||||
|
stream.read_exact(&mut pre).await.unwrap();
|
||||||
|
assert_eq!(&pre, b"PING");
|
||||||
|
stream.write_all(b"PONG").await.unwrap();
|
||||||
|
stream.shutdown().await.unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut config = ProxyConfig::default();
|
||||||
|
config.censorship.mask = true;
|
||||||
|
config.censorship.mask_host = Some("127.0.0.1".to_string());
|
||||||
|
config.censorship.mask_port = backend_addr.port();
|
||||||
|
|
||||||
|
let peer: SocketAddr = "203.0.113.73:50005".parse().unwrap();
|
||||||
|
let local: SocketAddr = "127.0.0.1:443".parse().unwrap();
|
||||||
|
let beobachten = BeobachtenStore::new();
|
||||||
|
|
||||||
|
let (mut client_writer, client_reader) = duplex(256);
|
||||||
|
let (mut client_visible_reader, client_visible_writer) = duplex(256);
|
||||||
|
|
||||||
|
let handle = tokio::spawn(async move {
|
||||||
|
handle_bad_client(
|
||||||
|
client_reader,
|
||||||
|
client_visible_writer,
|
||||||
|
b"PING",
|
||||||
|
peer,
|
||||||
|
local,
|
||||||
|
&config,
|
||||||
|
&beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
});
|
||||||
|
|
||||||
|
client_writer.shutdown().await.unwrap();
|
||||||
|
|
||||||
|
let mut got = [0u8; 4];
|
||||||
|
client_visible_reader.read_exact(&mut got).await.unwrap();
|
||||||
|
assert_eq!(&got, b"PONG");
|
||||||
|
|
||||||
|
timeout(Duration::from_secs(2), handle)
|
||||||
|
.await
|
||||||
|
.expect("masking task must terminate after bilateral half-close")
|
||||||
|
.unwrap();
|
||||||
|
accept_task.await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn chaos_burst_reconnect_storm_for_masking_and_relay_concurrently() {
|
||||||
|
const MASKING_SESSIONS: usize = 48;
|
||||||
|
const RELAY_SESSIONS: usize = 48;
|
||||||
|
|
||||||
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
let backend_addr = listener.local_addr().unwrap();
|
||||||
|
let backend_reply = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK".to_vec();
|
||||||
|
|
||||||
|
let backend_task = tokio::spawn({
|
||||||
|
let backend_reply = backend_reply.clone();
|
||||||
|
async move {
|
||||||
|
for _ in 0..MASKING_SESSIONS {
|
||||||
|
let (mut stream, _) = listener.accept().await.unwrap();
|
||||||
|
let mut req = [0u8; 32];
|
||||||
|
stream.read_exact(&mut req).await.unwrap();
|
||||||
|
assert!(
|
||||||
|
req.starts_with(b"GET /storm/"),
|
||||||
|
"masking backend must receive storm reconnect probes"
|
||||||
|
);
|
||||||
|
stream.write_all(&backend_reply).await.unwrap();
|
||||||
|
stream.shutdown().await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut config = ProxyConfig::default();
|
||||||
|
config.censorship.mask = true;
|
||||||
|
config.censorship.mask_host = Some("127.0.0.1".to_string());
|
||||||
|
config.censorship.mask_port = backend_addr.port();
|
||||||
|
config.censorship.mask_proxy_protocol = 0;
|
||||||
|
|
||||||
|
let config = Arc::new(config);
|
||||||
|
let beobachten = Arc::new(BeobachtenStore::new());
|
||||||
|
let peer: SocketAddr = "198.51.100.200:55555".parse().unwrap();
|
||||||
|
let local: SocketAddr = "127.0.0.1:443".parse().unwrap();
|
||||||
|
|
||||||
|
let mut masking_tasks = Vec::with_capacity(MASKING_SESSIONS);
|
||||||
|
for i in 0..MASKING_SESSIONS {
|
||||||
|
let config = Arc::clone(&config);
|
||||||
|
let beobachten = Arc::clone(&beobachten);
|
||||||
|
let expected_reply = backend_reply.clone();
|
||||||
|
masking_tasks.push(tokio::spawn(async move {
|
||||||
|
let mut probe = [0u8; 32];
|
||||||
|
let template = format!("GET /storm/{i:04} HTTP/1.1\r\n\r\n");
|
||||||
|
let bytes = template.as_bytes();
|
||||||
|
probe[..bytes.len()].copy_from_slice(bytes);
|
||||||
|
|
||||||
|
let (client_reader, client_writer) = duplex(256);
|
||||||
|
drop(client_writer);
|
||||||
|
let (mut client_visible_reader, client_visible_writer) = duplex(1024);
|
||||||
|
|
||||||
|
let handle = tokio::spawn(async move {
|
||||||
|
handle_bad_client(
|
||||||
|
client_reader,
|
||||||
|
client_visible_writer,
|
||||||
|
&probe,
|
||||||
|
peer,
|
||||||
|
local,
|
||||||
|
&config,
|
||||||
|
&beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut observed = vec![0u8; expected_reply.len()];
|
||||||
|
client_visible_reader.read_exact(&mut observed).await.unwrap();
|
||||||
|
assert_eq!(observed, expected_reply);
|
||||||
|
|
||||||
|
timeout(Duration::from_secs(2), handle)
|
||||||
|
.await
|
||||||
|
.expect("masking reconnect task must complete")
|
||||||
|
.unwrap();
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut relay_tasks = Vec::with_capacity(RELAY_SESSIONS);
|
||||||
|
for i in 0..RELAY_SESSIONS {
|
||||||
|
relay_tasks.push(tokio::spawn(async move {
|
||||||
|
let stats = Arc::new(Stats::new());
|
||||||
|
let (mut client_peer, relay_client) = duplex(4096);
|
||||||
|
let (relay_server, mut server_peer) = duplex(4096);
|
||||||
|
|
||||||
|
let (client_reader, client_writer) = tokio::io::split(relay_client);
|
||||||
|
let (server_reader, server_writer) = tokio::io::split(relay_server);
|
||||||
|
|
||||||
|
let relay_task = tokio::spawn(relay_bidirectional(
|
||||||
|
client_reader,
|
||||||
|
client_writer,
|
||||||
|
server_reader,
|
||||||
|
server_writer,
|
||||||
|
1024,
|
||||||
|
1024,
|
||||||
|
"chaos-storm-relay",
|
||||||
|
stats,
|
||||||
|
None,
|
||||||
|
Arc::new(BufferPool::new()),
|
||||||
|
));
|
||||||
|
|
||||||
|
let c2s = vec![(i as u8).wrapping_add(1); 64];
|
||||||
|
client_peer.write_all(&c2s).await.unwrap();
|
||||||
|
let mut c2s_seen = vec![0u8; c2s.len()];
|
||||||
|
server_peer.read_exact(&mut c2s_seen).await.unwrap();
|
||||||
|
assert_eq!(c2s_seen, c2s);
|
||||||
|
|
||||||
|
let s2c = vec![(i as u8).wrapping_add(17); 96];
|
||||||
|
server_peer.write_all(&s2c).await.unwrap();
|
||||||
|
let mut s2c_seen = vec![0u8; s2c.len()];
|
||||||
|
client_peer.read_exact(&mut s2c_seen).await.unwrap();
|
||||||
|
assert_eq!(s2c_seen, s2c);
|
||||||
|
|
||||||
|
drop(client_peer);
|
||||||
|
drop(server_peer);
|
||||||
|
timeout(Duration::from_secs(2), relay_task)
|
||||||
|
.await
|
||||||
|
.expect("relay reconnect task must complete")
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
for task in masking_tasks {
|
||||||
|
timeout(Duration::from_secs(3), task)
|
||||||
|
.await
|
||||||
|
.expect("masking storm join must complete")
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
for task in relay_tasks {
|
||||||
|
timeout(Duration::from_secs(3), task)
|
||||||
|
.await
|
||||||
|
.expect("relay storm join must complete")
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
timeout(Duration::from_secs(3), backend_task)
|
||||||
|
.await
|
||||||
|
.expect("masking backend accept loop must complete")
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_env_usize_or_default(name: &str, default: usize) -> usize {
|
||||||
|
match std::env::var(name) {
|
||||||
|
Ok(raw) => match raw.parse::<usize>() {
|
||||||
|
Ok(parsed) if parsed > 0 => parsed,
|
||||||
|
_ => default,
|
||||||
|
},
|
||||||
|
Err(_) => default,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[ignore = "heavy soak; run manually"]
|
||||||
|
async fn chaos_burst_reconnect_storm_for_masking_and_relay_multiwave_soak() {
|
||||||
|
let waves = read_env_usize_or_default("CHAOS_WAVES", 4);
|
||||||
|
let masking_per_wave = read_env_usize_or_default("CHAOS_MASKING_PER_WAVE", 160);
|
||||||
|
let relay_per_wave = read_env_usize_or_default("CHAOS_RELAY_PER_WAVE", 160);
|
||||||
|
let total_masking = waves * masking_per_wave;
|
||||||
|
|
||||||
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
let backend_addr = listener.local_addr().unwrap();
|
||||||
|
let backend_reply = b"HTTP/1.1 204 No Content\r\nContent-Length: 0\r\n\r\n".to_vec();
|
||||||
|
|
||||||
|
let backend_task = tokio::spawn({
|
||||||
|
let backend_reply = backend_reply.clone();
|
||||||
|
async move {
|
||||||
|
for _ in 0..total_masking {
|
||||||
|
let (mut stream, _) = listener.accept().await.unwrap();
|
||||||
|
let mut req = [0u8; 32];
|
||||||
|
stream.read_exact(&mut req).await.unwrap();
|
||||||
|
assert!(
|
||||||
|
req.starts_with(b"GET /storm/"),
|
||||||
|
"mask backend must only receive storm probes"
|
||||||
|
);
|
||||||
|
stream.write_all(&backend_reply).await.unwrap();
|
||||||
|
stream.shutdown().await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut config = ProxyConfig::default();
|
||||||
|
config.censorship.mask = true;
|
||||||
|
config.censorship.mask_host = Some("127.0.0.1".to_string());
|
||||||
|
config.censorship.mask_port = backend_addr.port();
|
||||||
|
config.censorship.mask_proxy_protocol = 0;
|
||||||
|
|
||||||
|
let config = Arc::new(config);
|
||||||
|
let beobachten = Arc::new(BeobachtenStore::new());
|
||||||
|
let peer: SocketAddr = "198.51.100.201:56565".parse().unwrap();
|
||||||
|
let local: SocketAddr = "127.0.0.1:443".parse().unwrap();
|
||||||
|
|
||||||
|
for wave in 0..waves {
|
||||||
|
let mut masking_tasks = Vec::with_capacity(masking_per_wave);
|
||||||
|
for i in 0..masking_per_wave {
|
||||||
|
let config = Arc::clone(&config);
|
||||||
|
let beobachten = Arc::clone(&beobachten);
|
||||||
|
let expected_reply = backend_reply.clone();
|
||||||
|
masking_tasks.push(tokio::spawn(async move {
|
||||||
|
let mut probe = [0u8; 32];
|
||||||
|
let template = format!("GET /storm/{wave:02}-{i:03}\r\n\r\n");
|
||||||
|
let bytes = template.as_bytes();
|
||||||
|
probe[..bytes.len()].copy_from_slice(bytes);
|
||||||
|
|
||||||
|
let (client_reader, client_writer) = duplex(256);
|
||||||
|
drop(client_writer);
|
||||||
|
let (mut client_visible_reader, client_visible_writer) = duplex(1024);
|
||||||
|
|
||||||
|
let handle = tokio::spawn(async move {
|
||||||
|
handle_bad_client(
|
||||||
|
client_reader,
|
||||||
|
client_visible_writer,
|
||||||
|
&probe,
|
||||||
|
peer,
|
||||||
|
local,
|
||||||
|
&config,
|
||||||
|
&beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut observed = vec![0u8; expected_reply.len()];
|
||||||
|
client_visible_reader.read_exact(&mut observed).await.unwrap();
|
||||||
|
assert_eq!(observed, expected_reply);
|
||||||
|
|
||||||
|
timeout(Duration::from_secs(3), handle)
|
||||||
|
.await
|
||||||
|
.expect("masking storm task must complete")
|
||||||
|
.unwrap();
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut relay_tasks = Vec::with_capacity(relay_per_wave);
|
||||||
|
for i in 0..relay_per_wave {
|
||||||
|
relay_tasks.push(tokio::spawn(async move {
|
||||||
|
let stats = Arc::new(Stats::new());
|
||||||
|
let (mut client_peer, relay_client) = duplex(4096);
|
||||||
|
let (relay_server, mut server_peer) = duplex(4096);
|
||||||
|
|
||||||
|
let (client_reader, client_writer) = tokio::io::split(relay_client);
|
||||||
|
let (server_reader, server_writer) = tokio::io::split(relay_server);
|
||||||
|
|
||||||
|
let relay_task = tokio::spawn(relay_bidirectional(
|
||||||
|
client_reader,
|
||||||
|
client_writer,
|
||||||
|
server_reader,
|
||||||
|
server_writer,
|
||||||
|
1024,
|
||||||
|
1024,
|
||||||
|
"chaos-multiwave-relay",
|
||||||
|
stats,
|
||||||
|
None,
|
||||||
|
Arc::new(BufferPool::new()),
|
||||||
|
));
|
||||||
|
|
||||||
|
let c2s = vec![(wave as u8).wrapping_add(i as u8).wrapping_add(1); 32];
|
||||||
|
client_peer.write_all(&c2s).await.unwrap();
|
||||||
|
let mut c2s_seen = vec![0u8; c2s.len()];
|
||||||
|
server_peer.read_exact(&mut c2s_seen).await.unwrap();
|
||||||
|
assert_eq!(c2s_seen, c2s);
|
||||||
|
|
||||||
|
let s2c = vec![(wave as u8).wrapping_add(i as u8).wrapping_add(17); 48];
|
||||||
|
server_peer.write_all(&s2c).await.unwrap();
|
||||||
|
let mut s2c_seen = vec![0u8; s2c.len()];
|
||||||
|
client_peer.read_exact(&mut s2c_seen).await.unwrap();
|
||||||
|
assert_eq!(s2c_seen, s2c);
|
||||||
|
|
||||||
|
drop(client_peer);
|
||||||
|
drop(server_peer);
|
||||||
|
timeout(Duration::from_secs(3), relay_task)
|
||||||
|
.await
|
||||||
|
.expect("relay storm task must complete")
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
for task in masking_tasks {
|
||||||
|
timeout(Duration::from_secs(6), task)
|
||||||
|
.await
|
||||||
|
.expect("masking wave task join must complete")
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
for task in relay_tasks {
|
||||||
|
timeout(Duration::from_secs(6), task)
|
||||||
|
.await
|
||||||
|
.expect("relay wave task join must complete")
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
timeout(Duration::from_secs(8), backend_task)
|
||||||
|
.await
|
||||||
|
.expect("mask backend must complete all accepted storm sessions")
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[ignore = "heavy soak; run manually"]
|
||||||
|
async fn masking_timing_bucket_soak_refused_backend_stays_within_narrow_band() {
|
||||||
|
let mut config = ProxyConfig::default();
|
||||||
|
config.censorship.mask = true;
|
||||||
|
config.censorship.mask_host = Some("127.0.0.1".to_string());
|
||||||
|
config.censorship.mask_port = 1;
|
||||||
|
|
||||||
|
let peer: SocketAddr = "203.0.113.74:50006".parse().unwrap();
|
||||||
|
let local: SocketAddr = "127.0.0.1:443".parse().unwrap();
|
||||||
|
let beobachten = BeobachtenStore::new();
|
||||||
|
|
||||||
|
let mut samples = Vec::with_capacity(128);
|
||||||
|
for _ in 0..128 {
|
||||||
|
let (client_reader, _client_writer) = duplex(128);
|
||||||
|
let (_client_visible_reader, client_visible_writer) = duplex(128);
|
||||||
|
let started = Instant::now();
|
||||||
|
handle_bad_client(
|
||||||
|
client_reader,
|
||||||
|
client_visible_writer,
|
||||||
|
b"GET / HTTP/1.1\r\n",
|
||||||
|
peer,
|
||||||
|
local,
|
||||||
|
&config,
|
||||||
|
&beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
samples.push(started.elapsed().as_millis());
|
||||||
|
}
|
||||||
|
|
||||||
|
samples.sort_unstable();
|
||||||
|
let p10 = samples[samples.len() / 10];
|
||||||
|
let p90 = samples[(samples.len() * 9) / 10];
|
||||||
|
assert!(
|
||||||
|
p90.saturating_sub(p10) <= 40,
|
||||||
|
"timing spread too wide for refused-backend masking path: p10={p10}ms p90={p90}ms"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -130,6 +130,41 @@ fn detect_client_type_len_boundary_9_vs_10_bytes() {
|
||||||
assert_eq!(detect_client_type(b"1234567890"), "unknown");
|
assert_eq!(detect_client_type(b"1234567890"), "unknown");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_mask_proxy_header_version_zero_disables_header() {
|
||||||
|
let peer: SocketAddr = "203.0.113.10:42424".parse().unwrap();
|
||||||
|
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
|
||||||
|
|
||||||
|
let header = build_mask_proxy_header(0, peer, local_addr);
|
||||||
|
assert!(header.is_none(), "version 0 must disable PROXY header");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_mask_proxy_header_v2_matches_builder_output() {
|
||||||
|
let peer: SocketAddr = "203.0.113.10:42424".parse().unwrap();
|
||||||
|
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
|
||||||
|
|
||||||
|
let expected = ProxyProtocolV2Builder::new()
|
||||||
|
.with_addrs(peer, local_addr)
|
||||||
|
.build();
|
||||||
|
let actual = build_mask_proxy_header(2, peer, local_addr)
|
||||||
|
.expect("v2 mode must produce a header");
|
||||||
|
|
||||||
|
assert_eq!(actual, expected, "v2 header bytes must be deterministic");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_mask_proxy_header_v1_mixed_ip_family_uses_generic_unknown_form() {
|
||||||
|
let peer: SocketAddr = "203.0.113.10:42424".parse().unwrap();
|
||||||
|
let local_addr: SocketAddr = "[2001:db8::1]:443".parse().unwrap();
|
||||||
|
|
||||||
|
let expected = ProxyProtocolV1Builder::new().build();
|
||||||
|
let actual = build_mask_proxy_header(1, peer, local_addr)
|
||||||
|
.expect("v1 mode must produce a header");
|
||||||
|
|
||||||
|
assert_eq!(actual, expected, "mixed-family v1 must use UNKNOWN form");
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn beobachten_records_scanner_class_when_mask_is_disabled() {
|
async fn beobachten_records_scanner_class_when_mask_is_disabled() {
|
||||||
let mut config = ProxyConfig::default();
|
let mut config = ProxyConfig::default();
|
||||||
|
|
|
||||||
|
|
@ -120,3 +120,91 @@ async fn relay_quota_mid_session_cutoff() {
|
||||||
let n = sp_reader.read(&mut small_buf).await.unwrap();
|
let n = sp_reader.read(&mut small_buf).await.unwrap();
|
||||||
assert_eq!(n, 0, "Server must see EOF after quota reached");
|
assert_eq!(n, 0, "Server must see EOF after quota reached");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn relay_chaos_half_close_crossfire_terminates_without_hang() {
|
||||||
|
let stats = Arc::new(Stats::new());
|
||||||
|
|
||||||
|
let (mut client_peer, relay_client) = duplex(8192);
|
||||||
|
let (relay_server, mut server_peer) = duplex(8192);
|
||||||
|
|
||||||
|
let (client_reader, client_writer) = tokio::io::split(relay_client);
|
||||||
|
let (server_reader, server_writer) = tokio::io::split(relay_server);
|
||||||
|
|
||||||
|
let relay_task = tokio::spawn(relay_bidirectional(
|
||||||
|
client_reader,
|
||||||
|
client_writer,
|
||||||
|
server_reader,
|
||||||
|
server_writer,
|
||||||
|
1024,
|
||||||
|
1024,
|
||||||
|
"half-close-crossfire",
|
||||||
|
Arc::clone(&stats),
|
||||||
|
None,
|
||||||
|
Arc::new(BufferPool::new()),
|
||||||
|
));
|
||||||
|
|
||||||
|
client_peer.write_all(b"c2s-pre-half-close").await.unwrap();
|
||||||
|
server_peer.write_all(b"s2c-pre-half-close").await.unwrap();
|
||||||
|
|
||||||
|
client_peer.shutdown().await.unwrap();
|
||||||
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
server_peer.shutdown().await.unwrap();
|
||||||
|
|
||||||
|
let done = timeout(Duration::from_secs(1), relay_task)
|
||||||
|
.await
|
||||||
|
.expect("relay must terminate after bilateral half-close")
|
||||||
|
.expect("relay task must not panic");
|
||||||
|
assert!(done.is_ok(), "relay must terminate cleanly under half-close crossfire");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[ignore = "heavy soak; run manually"]
|
||||||
|
async fn relay_soak_bidirectional_temporal_jitter_5k_rounds() {
|
||||||
|
let stats = Arc::new(Stats::new());
|
||||||
|
|
||||||
|
let (mut client_peer, relay_client) = duplex(65536);
|
||||||
|
let (relay_server, mut server_peer) = duplex(65536);
|
||||||
|
|
||||||
|
let (client_reader, client_writer) = tokio::io::split(relay_client);
|
||||||
|
let (server_reader, server_writer) = tokio::io::split(relay_server);
|
||||||
|
|
||||||
|
let relay_task = tokio::spawn(relay_bidirectional(
|
||||||
|
client_reader,
|
||||||
|
client_writer,
|
||||||
|
server_reader,
|
||||||
|
server_writer,
|
||||||
|
4096,
|
||||||
|
4096,
|
||||||
|
"soak-jitter-user",
|
||||||
|
Arc::clone(&stats),
|
||||||
|
None,
|
||||||
|
Arc::new(BufferPool::new()),
|
||||||
|
));
|
||||||
|
|
||||||
|
for i in 0..5_000u32 {
|
||||||
|
let c = [((i as u8).wrapping_mul(13)).wrapping_add(1); 17];
|
||||||
|
client_peer.write_all(&c).await.unwrap();
|
||||||
|
let mut c_seen = [0u8; 17];
|
||||||
|
server_peer.read_exact(&mut c_seen).await.unwrap();
|
||||||
|
assert_eq!(c_seen, c);
|
||||||
|
|
||||||
|
let s = [((i as u8).wrapping_mul(7)).wrapping_add(3); 23];
|
||||||
|
server_peer.write_all(&s).await.unwrap();
|
||||||
|
let mut s_seen = [0u8; 23];
|
||||||
|
client_peer.read_exact(&mut s_seen).await.unwrap();
|
||||||
|
assert_eq!(s_seen, s);
|
||||||
|
|
||||||
|
if i % 10 == 0 {
|
||||||
|
tokio::time::sleep(Duration::from_millis((i % 3) as u64)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(client_peer);
|
||||||
|
drop(server_peer);
|
||||||
|
let done = timeout(Duration::from_secs(2), relay_task)
|
||||||
|
.await
|
||||||
|
.expect("relay must stop after soak peers close")
|
||||||
|
.expect("relay task must not panic");
|
||||||
|
assert!(done.is_ok());
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue