diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index a3f795a..7f7499e 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -228,7 +228,9 @@ impl HotFields { me_d2c_flush_batch_max_delay_us: cfg.general.me_d2c_flush_batch_max_delay_us, me_d2c_ack_flush_immediate: cfg.general.me_d2c_ack_flush_immediate, me_quota_soft_overshoot_bytes: cfg.general.me_quota_soft_overshoot_bytes, - me_d2c_frame_buf_shrink_threshold_bytes: cfg.general.me_d2c_frame_buf_shrink_threshold_bytes, + me_d2c_frame_buf_shrink_threshold_bytes: cfg + .general + .me_d2c_frame_buf_shrink_threshold_bytes, direct_relay_copy_buf_c2s_bytes: cfg.general.direct_relay_copy_buf_c2s_bytes, direct_relay_copy_buf_s2c_bytes: cfg.general.direct_relay_copy_buf_s2c_bytes, me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy, diff --git a/src/config/load.rs b/src/config/load.rs index fc54ec2..8f12757 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -444,8 +444,7 @@ impl ProxyConfig { if !(5..=50).contains(&config.censorship.mask_classifier_prefetch_timeout_ms) { return Err(ProxyError::Config( - "censorship.mask_classifier_prefetch_timeout_ms must be within [5, 50]" - .to_string(), + "censorship.mask_classifier_prefetch_timeout_ms must be within [5, 50]".to_string(), )); } @@ -558,7 +557,9 @@ impl ProxyConfig { )); } - if !(4096..=16 * 1024 * 1024).contains(&config.general.me_d2c_frame_buf_shrink_threshold_bytes) { + if !(4096..=16 * 1024 * 1024) + .contains(&config.general.me_d2c_frame_buf_shrink_threshold_bytes) + { return Err(ProxyError::Config( "general.me_d2c_frame_buf_shrink_threshold_bytes must be within [4096, 16777216]" .to_string(), diff --git a/src/config/tests/load_mask_classifier_prefetch_timeout_security_tests.rs b/src/config/tests/load_mask_classifier_prefetch_timeout_security_tests.rs index 49ee953..0b3d543 100644 --- a/src/config/tests/load_mask_classifier_prefetch_timeout_security_tests.rs +++ b/src/config/tests/load_mask_classifier_prefetch_timeout_security_tests.rs @@ -8,8 +8,9 @@ fn write_temp_config(contents: &str) -> PathBuf { .duration_since(UNIX_EPOCH) .expect("system time must be after unix epoch") .as_nanos(); - let path = std::env::temp_dir() - .join(format!("telemt-load-mask-prefetch-timeout-security-{nonce}.toml")); + let path = std::env::temp_dir().join(format!( + "telemt-load-mask-prefetch-timeout-security-{nonce}.toml" + )); fs::write(&path, contents).expect("temp config write must succeed"); path } @@ -67,8 +68,8 @@ mask_classifier_prefetch_timeout_ms = 20 "#, ); - let cfg = ProxyConfig::load(&path) - .expect("prefetch timeout within security bounds must be accepted"); + let cfg = + ProxyConfig::load(&path).expect("prefetch timeout within security bounds must be accepted"); assert_eq!(cfg.censorship.mask_classifier_prefetch_timeout_ms, 20); remove_temp_config(&path); diff --git a/src/config/tests/load_mask_shape_security_tests.rs b/src/config/tests/load_mask_shape_security_tests.rs index 2e4aa41..bccd36f 100644 --- a/src/config/tests/load_mask_shape_security_tests.rs +++ b/src/config/tests/load_mask_shape_security_tests.rs @@ -265,8 +265,8 @@ mask_relay_max_bytes = 67108865 "#, ); - let err = ProxyConfig::load(&path) - .expect_err("mask_relay_max_bytes above hard cap must be rejected"); + let err = + ProxyConfig::load(&path).expect_err("mask_relay_max_bytes above hard cap must be rejected"); let msg = err.to_string(); assert!( msg.contains("censorship.mask_relay_max_bytes must be <= 67108864"), diff --git a/src/config/types.rs b/src/config/types.rs index 5dc9719..240d2f1 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -954,7 +954,8 @@ impl Default for GeneralConfig { me_d2c_flush_batch_max_delay_us: default_me_d2c_flush_batch_max_delay_us(), me_d2c_ack_flush_immediate: default_me_d2c_ack_flush_immediate(), me_quota_soft_overshoot_bytes: default_me_quota_soft_overshoot_bytes(), - me_d2c_frame_buf_shrink_threshold_bytes: default_me_d2c_frame_buf_shrink_threshold_bytes(), + me_d2c_frame_buf_shrink_threshold_bytes: + default_me_d2c_frame_buf_shrink_threshold_bytes(), direct_relay_copy_buf_c2s_bytes: default_direct_relay_copy_buf_c2s_bytes(), direct_relay_copy_buf_s2c_bytes: default_direct_relay_copy_buf_s2c_bytes(), me_warmup_stagger_enabled: default_true(), diff --git a/src/main.rs b/src/main.rs index c512e6b..406b321 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,12 +7,12 @@ mod crypto; mod error; mod ip_tracker; #[cfg(test)] -#[path = "tests/ip_tracker_hotpath_adversarial_tests.rs"] -mod ip_tracker_hotpath_adversarial_tests; -#[cfg(test)] #[path = "tests/ip_tracker_encapsulation_adversarial_tests.rs"] mod ip_tracker_encapsulation_adversarial_tests; #[cfg(test)] +#[path = "tests/ip_tracker_hotpath_adversarial_tests.rs"] +mod ip_tracker_hotpath_adversarial_tests; +#[cfg(test)] #[path = "tests/ip_tracker_regression_tests.rs"] mod ip_tracker_regression_tests; mod maestro; diff --git a/src/metrics.rs b/src/metrics.rs index a821d4d..f9475f6 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1233,10 +1233,7 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp out, "# HELP telemt_me_d2c_batch_bytes_bucket_total DC->Client batch byte size buckets" ); - let _ = writeln!( - out, - "# TYPE telemt_me_d2c_batch_bytes_bucket_total counter" - ); + let _ = writeln!(out, "# TYPE telemt_me_d2c_batch_bytes_bucket_total counter"); let _ = writeln!( out, "telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"0_1k\"}} {}", diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 1567caf..0190e8e 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -210,7 +210,9 @@ fn should_prefetch_mask_classifier_window(initial_data: &[u8]) -> bool { return false; } - initial_data.iter().all(|b| b.is_ascii_alphabetic() || *b == b' ') + initial_data + .iter() + .all(|b| b.is_ascii_alphabetic() || *b == b' ') } #[cfg(test)] @@ -218,16 +220,19 @@ async fn extend_masking_initial_window(reader: &mut R, initial_data: &mut Vec where R: AsyncRead + Unpin, { - extend_masking_initial_window_with_timeout(reader, initial_data, MASK_CLASSIFIER_PREFETCH_TIMEOUT) - .await; + extend_masking_initial_window_with_timeout( + reader, + initial_data, + MASK_CLASSIFIER_PREFETCH_TIMEOUT, + ) + .await; } async fn extend_masking_initial_window_with_timeout( reader: &mut R, initial_data: &mut Vec, prefetch_timeout: Duration, -) -where +) where R: AsyncRead + Unpin, { if !should_prefetch_mask_classifier_window(initial_data) { diff --git a/src/proxy/masking.rs b/src/proxy/masking.rs index 241a48f..ba9f20a 100644 --- a/src/proxy/masking.rs +++ b/src/proxy/masking.rs @@ -10,10 +10,10 @@ use rand::rngs::StdRng; use rand::{Rng, RngExt, SeedableRng}; use std::net::{IpAddr, SocketAddr}; use std::str; -#[cfg(unix)] -use std::sync::{Mutex, OnceLock}; #[cfg(test)] use std::sync::atomic::{AtomicUsize, Ordering}; +#[cfg(unix)] +use std::sync::{Mutex, OnceLock}; use std::time::{Duration, Instant as StdInstant}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpStream; @@ -107,15 +107,7 @@ where fn is_http_probe(data: &[u8]) -> bool { // RFC 7540 section 3.5: HTTP/2 client preface starts with "PRI ". const HTTP_METHODS: [&[u8]; 10] = [ - b"GET ", - b"POST", - b"HEAD", - b"PUT ", - b"DELETE", - b"OPTIONS", - b"CONNECT", - b"TRACE", - b"PATCH", + b"GET ", b"POST", b"HEAD", b"PUT ", b"DELETE", b"OPTIONS", b"CONNECT", b"TRACE", b"PATCH", b"PRI ", ]; @@ -328,7 +320,10 @@ fn parse_mask_host_ip_literal(host: &str) -> Option { fn canonical_ip(ip: IpAddr) -> IpAddr { match ip { - IpAddr::V6(v6) => v6.to_ipv4_mapped().map(IpAddr::V4).unwrap_or(IpAddr::V6(v6)), + IpAddr::V6(v6) => v6 + .to_ipv4_mapped() + .map(IpAddr::V4) + .unwrap_or(IpAddr::V6(v6)), IpAddr::V4(v4) => IpAddr::V4(v4), } } @@ -664,12 +659,20 @@ pub async fn handle_bad_client( Ok(Err(e)) => { wait_mask_connect_budget_if_needed(connect_started, config).await; debug!(error = %e, "Failed to connect to mask unix socket"); - consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes).await; + consume_client_data_with_timeout_and_cap( + reader, + config.censorship.mask_relay_max_bytes, + ) + .await; wait_mask_outcome_budget(outcome_started, config).await; } Err(_) => { debug!("Timeout connecting to mask unix socket"); - consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes).await; + consume_client_data_with_timeout_and_cap( + reader, + config.censorship.mask_relay_max_bytes, + ) + .await; wait_mask_outcome_budget(outcome_started, config).await; } } @@ -698,7 +701,8 @@ pub async fn handle_bad_client( local = %local_addr, "Mask target resolves to local listener; refusing self-referential masking fallback" ); - consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes).await; + consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes) + .await; wait_mask_outcome_budget(outcome_started, config).await; return; } @@ -758,12 +762,20 @@ pub async fn handle_bad_client( Ok(Err(e)) => { wait_mask_connect_budget_if_needed(connect_started, config).await; debug!(error = %e, "Failed to connect to mask host"); - consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes).await; + consume_client_data_with_timeout_and_cap( + reader, + config.censorship.mask_relay_max_bytes, + ) + .await; wait_mask_outcome_budget(outcome_started, config).await; } Err(_) => { debug!("Timeout connecting to mask host"); - consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes).await; + consume_client_data_with_timeout_and_cap( + reader, + config.censorship.mask_relay_max_bytes, + ) + .await; wait_mask_outcome_budget(outcome_started, config).await; } } diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index d833019..3259597 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -23,7 +23,9 @@ use crate::proxy::route_mode::{ ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state, cutover_stagger_delay, }; -use crate::stats::{MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, QuotaReserveError, Stats, UserStats}; +use crate::stats::{ + MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, QuotaReserveError, Stats, UserStats, +}; use crate::stream::{BufferPool, CryptoReader, CryptoWriter, PooledBuffer}; use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag}; @@ -91,7 +93,8 @@ fn relay_idle_candidate_registry() -> &'static Mutex RELAY_IDLE_CANDIDATE_REGISTRY.get_or_init(|| Mutex::new(RelayIdleCandidateRegistry::default())) } -fn relay_idle_candidate_registry_lock() -> std::sync::MutexGuard<'static, RelayIdleCandidateRegistry> { +fn relay_idle_candidate_registry_lock() -> std::sync::MutexGuard<'static, RelayIdleCandidateRegistry> +{ let registry = relay_idle_candidate_registry(); match registry.lock() { Ok(guard) => guard, @@ -1520,8 +1523,7 @@ where } if !idle_policy.enabled { - consecutive_zero_len_frames = - consecutive_zero_len_frames.saturating_add(1); + consecutive_zero_len_frames = consecutive_zero_len_frames.saturating_add(1); if consecutive_zero_len_frames > LEGACY_MAX_CONSECUTIVE_ZERO_LEN_FRAMES { stats.increment_relay_protocol_desync_close_total(); return Err(ProxyError::Proxy( @@ -1835,8 +1837,14 @@ where MeD2cWriteMode::Coalesced } else { let header = [first]; - client_writer.write_all(&header).await.map_err(ProxyError::Io)?; - client_writer.write_all(data).await.map_err(ProxyError::Io)?; + client_writer + .write_all(&header) + .await + .map_err(ProxyError::Io)?; + client_writer + .write_all(data) + .await + .map_err(ProxyError::Io)?; MeD2cWriteMode::Split } } else if len_words < (1 << 24) { @@ -1858,8 +1866,14 @@ where MeD2cWriteMode::Coalesced } else { let header = [first, lw[0], lw[1], lw[2]]; - client_writer.write_all(&header).await.map_err(ProxyError::Io)?; - client_writer.write_all(data).await.map_err(ProxyError::Io)?; + client_writer + .write_all(&header) + .await + .map_err(ProxyError::Io)?; + client_writer + .write_all(data) + .await + .map_err(ProxyError::Io)?; MeD2cWriteMode::Split } } else { @@ -1901,8 +1915,14 @@ where MeD2cWriteMode::Coalesced } else { let header = len_val.to_le_bytes(); - client_writer.write_all(&header).await.map_err(ProxyError::Io)?; - client_writer.write_all(data).await.map_err(ProxyError::Io)?; + client_writer + .write_all(&header) + .await + .map_err(ProxyError::Io)?; + client_writer + .write_all(data) + .await + .map_err(ProxyError::Io)?; if padding_len > 0 { frame_buf.clear(); if frame_buf.capacity() < padding_len { diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index eebc188..5880558 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -4,58 +4,58 @@ #![cfg_attr(test, allow(warnings))] #![cfg_attr(not(test), forbid(clippy::undocumented_unsafe_blocks))] #![cfg_attr( - not(test), - deny( - clippy::unwrap_used, - clippy::expect_used, - clippy::panic, - clippy::todo, - clippy::unimplemented, - clippy::correctness, - clippy::option_if_let_else, - clippy::or_fun_call, - clippy::branches_sharing_code, - clippy::single_option_map, - clippy::useless_let_if_seq, - clippy::redundant_locals, - clippy::cloned_ref_to_slice_refs, - unsafe_code, - clippy::await_holding_lock, - clippy::await_holding_refcell_ref, - clippy::debug_assert_with_mut_call, - clippy::macro_use_imports, - clippy::cast_ptr_alignment, - clippy::cast_lossless, - clippy::ptr_as_ptr, - clippy::large_stack_arrays, - clippy::same_functions_in_if_condition, - trivial_casts, - trivial_numeric_casts, - unused_extern_crates, - unused_import_braces, - rust_2018_idioms - ) + not(test), + deny( + clippy::unwrap_used, + clippy::expect_used, + clippy::panic, + clippy::todo, + clippy::unimplemented, + clippy::correctness, + clippy::option_if_let_else, + clippy::or_fun_call, + clippy::branches_sharing_code, + clippy::single_option_map, + clippy::useless_let_if_seq, + clippy::redundant_locals, + clippy::cloned_ref_to_slice_refs, + unsafe_code, + clippy::await_holding_lock, + clippy::await_holding_refcell_ref, + clippy::debug_assert_with_mut_call, + clippy::macro_use_imports, + clippy::cast_ptr_alignment, + clippy::cast_lossless, + clippy::ptr_as_ptr, + clippy::large_stack_arrays, + clippy::same_functions_in_if_condition, + trivial_casts, + trivial_numeric_casts, + unused_extern_crates, + unused_import_braces, + rust_2018_idioms + ) )] #![cfg_attr( - not(test), - allow( - clippy::use_self, - clippy::redundant_closure, - clippy::too_many_arguments, - clippy::doc_markdown, - clippy::missing_const_for_fn, - clippy::unnecessary_operation, - clippy::redundant_pub_crate, - clippy::derive_partial_eq_without_eq, - clippy::type_complexity, - clippy::new_ret_no_self, - clippy::cast_possible_truncation, - clippy::cast_possible_wrap, - clippy::significant_drop_tightening, - clippy::significant_drop_in_scrutinee, - clippy::float_cmp, - clippy::nursery - ) + not(test), + allow( + clippy::use_self, + clippy::redundant_closure, + clippy::too_many_arguments, + clippy::doc_markdown, + clippy::missing_const_for_fn, + clippy::unnecessary_operation, + clippy::redundant_pub_crate, + clippy::derive_partial_eq_without_eq, + clippy::type_complexity, + clippy::new_ret_no_self, + clippy::cast_possible_truncation, + clippy::cast_possible_wrap, + clippy::significant_drop_tightening, + clippy::significant_drop_in_scrutinee, + clippy::float_cmp, + clippy::nursery + ) )] pub mod adaptive_buffers; diff --git a/src/proxy/relay.rs b/src/proxy/relay.rs index bf4ad43..6000e18 100644 --- a/src/proxy/relay.rs +++ b/src/proxy/relay.rs @@ -56,8 +56,8 @@ use crate::stats::{Stats, UserStats}; use crate::stream::BufferPool; use std::io; use std::pin::Pin; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::task::{Context, Poll}; use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf, copy_bidirectional_with_sizes}; @@ -272,12 +272,10 @@ const QUOTA_ADAPTIVE_INTERVAL_MAX_BYTES: u64 = 64 * 1024; #[inline] fn quota_adaptive_interval_bytes(remaining_before: u64) -> u64 { - remaining_before - .saturating_div(2) - .clamp( - QUOTA_ADAPTIVE_INTERVAL_MIN_BYTES, - QUOTA_ADAPTIVE_INTERVAL_MAX_BYTES, - ) + remaining_before.saturating_div(2).clamp( + QUOTA_ADAPTIVE_INTERVAL_MIN_BYTES, + QUOTA_ADAPTIVE_INTERVAL_MAX_BYTES, + ) } #[inline] diff --git a/src/proxy/tests/client_clever_advanced_tests.rs b/src/proxy/tests/client_clever_advanced_tests.rs index da2e703..f462ed8 100644 --- a/src/proxy/tests/client_clever_advanced_tests.rs +++ b/src/proxy/tests/client_clever_advanced_tests.rs @@ -1,5 +1,5 @@ use super::*; -use crate::config::{UpstreamConfig, UpstreamType, ProxyConfig}; +use crate::config::{ProxyConfig, UpstreamConfig, UpstreamType}; use crate::protocol::constants::{MAX_TLS_PLAINTEXT_SIZE, MIN_TLS_CLIENT_HELLO_SIZE}; use crate::stats::Stats; use crate::transport::UpstreamManager; @@ -41,7 +41,9 @@ fn edge_handshake_timeout_with_mask_grace_saturating_add_prevents_overflow() { #[test] fn edge_tls_clienthello_len_in_bounds_exact_boundaries() { assert!(tls_clienthello_len_in_bounds(MIN_TLS_CLIENT_HELLO_SIZE)); - assert!(!tls_clienthello_len_in_bounds(MIN_TLS_CLIENT_HELLO_SIZE - 1)); + assert!(!tls_clienthello_len_in_bounds( + MIN_TLS_CLIENT_HELLO_SIZE - 1 + )); assert!(tls_clienthello_len_in_bounds(MAX_TLS_PLAINTEXT_SIZE)); assert!(!tls_clienthello_len_in_bounds(MAX_TLS_PLAINTEXT_SIZE + 1)); } @@ -87,7 +89,15 @@ async fn adversarial_tls_handshake_timeout_during_masking_delay() { "198.51.100.1:55000".parse().unwrap(), config, stats.clone(), - Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())), + Arc::new(UpstreamManager::new( + vec![], + 1, + 1, + 1, + 1, + false, + stats.clone(), + )), Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), Arc::new(BufferPool::new()), Arc::new(SecureRandom::new()), @@ -99,7 +109,10 @@ async fn adversarial_tls_handshake_timeout_during_masking_delay() { false, )); - client_side.write_all(&[0x16, 0x03, 0x01, 0xFF, 0xFF]).await.unwrap(); + client_side + .write_all(&[0x16, 0x03, 0x01, 0xFF, 0xFF]) + .await + .unwrap(); let result = tokio::time::timeout(Duration::from_secs(4), handle) .await @@ -123,7 +136,15 @@ async fn blackhat_proxy_protocol_slowloris_timeout() { "198.51.100.2:55000".parse().unwrap(), config, stats.clone(), - Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())), + Arc::new(UpstreamManager::new( + vec![], + 1, + 1, + 1, + 1, + false, + stats.clone(), + )), Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), Arc::new(BufferPool::new()), Arc::new(SecureRandom::new()), @@ -167,7 +188,15 @@ async fn negative_proxy_protocol_enabled_but_client_sends_tls_hello() { "198.51.100.3:55000".parse().unwrap(), config, stats.clone(), - Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())), + Arc::new(UpstreamManager::new( + vec![], + 1, + 1, + 1, + 1, + false, + stats.clone(), + )), Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), Arc::new(BufferPool::new()), Arc::new(SecureRandom::new()), @@ -179,7 +208,10 @@ async fn negative_proxy_protocol_enabled_but_client_sends_tls_hello() { true, )); - client_side.write_all(&[0x16, 0x03, 0x01, 0x02, 0x00]).await.unwrap(); + client_side + .write_all(&[0x16, 0x03, 0x01, 0x02, 0x00]) + .await + .unwrap(); let result = tokio::time::timeout(Duration::from_secs(2), handle) .await @@ -202,7 +234,15 @@ async fn edge_client_stream_exactly_4_bytes_eof() { "198.51.100.4:55000".parse().unwrap(), config, stats.clone(), - Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())), + Arc::new(UpstreamManager::new( + vec![], + 1, + 1, + 1, + 1, + false, + stats.clone(), + )), Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), Arc::new(BufferPool::new()), Arc::new(SecureRandom::new()), @@ -214,7 +254,10 @@ async fn edge_client_stream_exactly_4_bytes_eof() { false, )); - client_side.write_all(&[0x16, 0x03, 0x01, 0x00]).await.unwrap(); + client_side + .write_all(&[0x16, 0x03, 0x01, 0x00]) + .await + .unwrap(); client_side.shutdown().await.unwrap(); let _ = tokio::time::timeout(Duration::from_secs(2), handle).await; @@ -234,7 +277,15 @@ async fn edge_client_stream_tls_header_valid_but_body_1_byte_short_eof() { "198.51.100.5:55000".parse().unwrap(), config, stats.clone(), - Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())), + Arc::new(UpstreamManager::new( + vec![], + 1, + 1, + 1, + 1, + false, + stats.clone(), + )), Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), Arc::new(BufferPool::new()), Arc::new(SecureRandom::new()), @@ -246,7 +297,10 @@ async fn edge_client_stream_tls_header_valid_but_body_1_byte_short_eof() { false, )); - client_side.write_all(&[0x16, 0x03, 0x01, 0x00, 100]).await.unwrap(); + client_side + .write_all(&[0x16, 0x03, 0x01, 0x00, 100]) + .await + .unwrap(); client_side.write_all(&vec![0x41; 99]).await.unwrap(); client_side.shutdown().await.unwrap(); @@ -269,7 +323,15 @@ async fn integration_non_tls_modes_disabled_immediately_masks() { "198.51.100.6:55000".parse().unwrap(), config, stats.clone(), - Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())), + Arc::new(UpstreamManager::new( + vec![], + 1, + 1, + 1, + 1, + false, + stats.clone(), + )), Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), Arc::new(BufferPool::new()), Arc::new(SecureRandom::new()), @@ -372,11 +434,7 @@ async fn stress_user_connection_reservation_concurrent_same_ip_exhaustion() { let ip_tracker = ip_tracker.clone(); tasks.spawn(async move { RunningClientHandler::acquire_user_connection_reservation_static( - user, - &config, - stats, - peer, - ip_tracker, + user, &config, stats, peer, ip_tracker, ) .await }); diff --git a/src/proxy/tests/client_deep_invariants_tests.rs b/src/proxy/tests/client_deep_invariants_tests.rs index 0302300..e57f817 100644 --- a/src/proxy/tests/client_deep_invariants_tests.rs +++ b/src/proxy/tests/client_deep_invariants_tests.rs @@ -42,7 +42,15 @@ async fn invariant_tls_clienthello_truncation_exact_boundary_triggers_masking() "198.51.100.20:55000".parse().unwrap(), config, stats.clone(), - Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())), + Arc::new(UpstreamManager::new( + vec![], + 1, + 1, + 1, + 1, + false, + stats.clone(), + )), Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), Arc::new(BufferPool::new()), Arc::new(SecureRandom::new()), @@ -65,7 +73,9 @@ async fn invariant_tls_clienthello_truncation_exact_boundary_triggers_masking() .unwrap(); client_side.shutdown().await.unwrap(); - let _ = tokio::time::timeout(Duration::from_secs(2), handler).await.unwrap(); + let _ = tokio::time::timeout(Duration::from_secs(2), handler) + .await + .unwrap(); assert_eq!(stats.get_connects_bad(), 1); } @@ -73,7 +83,10 @@ async fn invariant_tls_clienthello_truncation_exact_boundary_triggers_masking() async fn invariant_acquire_reservation_ip_limit_rollback() { let user = "rollback-test-user"; let mut config = ProxyConfig::default(); - config.access.user_max_tcp_conns.insert(user.to_string(), 10); + config + .access + .user_max_tcp_conns + .insert(user.to_string(), 10); let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); @@ -159,7 +172,15 @@ async fn invariant_direct_mode_partial_header_eof_is_error_not_bad_connect() { "198.51.100.25:55000".parse().unwrap(), config, stats.clone(), - Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())), + Arc::new(UpstreamManager::new( + vec![], + 1, + 1, + 1, + 1, + false, + stats.clone(), + )), Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), Arc::new(BufferPool::new()), Arc::new(SecureRandom::new()), diff --git a/src/proxy/tests/client_masking_http2_fragmented_preface_security_tests.rs b/src/proxy/tests/client_masking_http2_fragmented_preface_security_tests.rs index fcf51ab..3036f95 100644 --- a/src/proxy/tests/client_masking_http2_fragmented_preface_security_tests.rs +++ b/src/proxy/tests/client_masking_http2_fragmented_preface_security_tests.rs @@ -100,14 +100,7 @@ async fn run_http2_fragment_case(split_at: usize, delay_ms: u64, peer: SocketAdd #[tokio::test] async fn http2_preface_fragmentation_matrix_is_classified_and_forwarded() { - let cases = [ - (2usize, 0u64), - (3, 0), - (4, 0), - (2, 7), - (3, 7), - (8, 1), - ]; + let cases = [(2usize, 0u64), (3, 0), (4, 0), (2, 7), (3, 7), (8, 1)]; for (i, (split_at, delay_ms)) in cases.into_iter().enumerate() { let peer: SocketAddr = format!("198.51.100.{}:58{}", 140 + i, 100 + i) diff --git a/src/proxy/tests/client_masking_prefetch_config_runtime_security_tests.rs b/src/proxy/tests/client_masking_prefetch_config_runtime_security_tests.rs index cdf2136..64e7a85 100644 --- a/src/proxy/tests/client_masking_prefetch_config_runtime_security_tests.rs +++ b/src/proxy/tests/client_masking_prefetch_config_runtime_security_tests.rs @@ -29,7 +29,10 @@ async fn configured_prefetch_budget_20ms_recovers_tail_delayed_15ms() { .write_all(b"ONNECT example.org:443 HTTP/1.1\r\n") .await .expect("tail bytes must be writable"); - writer.shutdown().await.expect("writer shutdown must succeed"); + writer + .shutdown() + .await + .expect("writer shutdown must succeed"); }); let mut initial_data = b"C".to_vec(); @@ -60,7 +63,10 @@ async fn configured_prefetch_budget_5ms_misses_tail_delayed_15ms() { .write_all(b"ONNECT example.org:443 HTTP/1.1\r\n") .await .expect("tail bytes must be writable"); - writer.shutdown().await.expect("writer shutdown must succeed"); + writer + .shutdown() + .await + .expect("writer shutdown must succeed"); }); let mut initial_data = b"C".to_vec(); diff --git a/src/proxy/tests/client_masking_prefetch_invariant_security_tests.rs b/src/proxy/tests/client_masking_prefetch_invariant_security_tests.rs index 2e03ce9..b49db3c 100644 --- a/src/proxy/tests/client_masking_prefetch_invariant_security_tests.rs +++ b/src/proxy/tests/client_masking_prefetch_invariant_security_tests.rs @@ -245,7 +245,10 @@ async fn blackhat_integration_empty_initial_data_path_is_byte_exact_and_eof_clea assert_eq!(head[0], 0x16); read_and_discard_tls_record_body(&mut client_side, head).await; - client_side.write_all(&invalid_mtproto_record).await.unwrap(); + client_side + .write_all(&invalid_mtproto_record) + .await + .unwrap(); client_side.write_all(&trailing_record).await.unwrap(); client_side.shutdown().await.unwrap(); diff --git a/src/proxy/tests/client_masking_prefetch_strict_boundary_security_tests.rs b/src/proxy/tests/client_masking_prefetch_strict_boundary_security_tests.rs index 9ece258..cbb6603 100644 --- a/src/proxy/tests/client_masking_prefetch_strict_boundary_security_tests.rs +++ b/src/proxy/tests/client_masking_prefetch_strict_boundary_security_tests.rs @@ -7,7 +7,9 @@ async fn run_strict_prefetch_case(prefetch_ms: u64, tail_delay_ms: u64) -> Vec= Duration::from_millis(40) - && replay_elapsed < Duration::from_millis(250), + replay_elapsed >= Duration::from_millis(40) && replay_elapsed < Duration::from_millis(250), "replay rejection path must still satisfy masking timing budget without unbounded DB/CPU delay" ); } diff --git a/src/proxy/tests/client_more_advanced_tests.rs b/src/proxy/tests/client_more_advanced_tests.rs index 36ffcbb..8f9d832 100644 --- a/src/proxy/tests/client_more_advanced_tests.rs +++ b/src/proxy/tests/client_more_advanced_tests.rs @@ -53,11 +53,7 @@ async fn boundary_user_data_quota_exact_match_rejects() { let peer = "198.51.100.10:55000".parse().unwrap(); let result = RunningClientHandler::acquire_user_connection_reservation_static( - user, - &config, - stats, - peer, - ip_tracker, + user, &config, stats, peer, ip_tracker, ) .await; @@ -79,11 +75,7 @@ async fn boundary_user_expiration_in_past_rejects() { let peer = "198.51.100.11:55000".parse().unwrap(); let result = RunningClientHandler::acquire_user_connection_reservation_static( - user, - &config, - stats, - peer, - ip_tracker, + user, &config, stats, peer, ip_tracker, ) .await; @@ -103,7 +95,15 @@ async fn blackhat_proxy_protocol_massive_garbage_rejected_quickly() { "198.51.100.12:55000".parse().unwrap(), config, stats.clone(), - Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())), + Arc::new(UpstreamManager::new( + vec![], + 1, + 1, + 1, + 1, + false, + stats.clone(), + )), Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), Arc::new(BufferPool::new()), Arc::new(SecureRandom::new()), @@ -141,7 +141,15 @@ async fn edge_tls_body_immediate_eof_triggers_masking_and_bad_connect() { "198.51.100.13:55000".parse().unwrap(), config, stats.clone(), - Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())), + Arc::new(UpstreamManager::new( + vec![], + 1, + 1, + 1, + 1, + false, + stats.clone(), + )), Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), Arc::new(BufferPool::new()), Arc::new(SecureRandom::new()), @@ -153,10 +161,15 @@ async fn edge_tls_body_immediate_eof_triggers_masking_and_bad_connect() { false, )); - client_side.write_all(&[0x16, 0x03, 0x01, 0x00, 100]).await.unwrap(); + client_side + .write_all(&[0x16, 0x03, 0x01, 0x00, 100]) + .await + .unwrap(); client_side.shutdown().await.unwrap(); - let _ = tokio::time::timeout(Duration::from_secs(2), handler).await.unwrap(); + let _ = tokio::time::timeout(Duration::from_secs(2), handler) + .await + .unwrap(); assert_eq!(stats.get_connects_bad(), 1); } @@ -177,7 +190,15 @@ async fn security_classic_mode_disabled_masks_valid_length_payload() { "198.51.100.15:55000".parse().unwrap(), config, stats.clone(), - Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())), + Arc::new(UpstreamManager::new( + vec![], + 1, + 1, + 1, + 1, + false, + stats.clone(), + )), Arc::new(ReplayChecker::new(128, Duration::from_secs(60))), Arc::new(BufferPool::new()), Arc::new(SecureRandom::new()), @@ -192,7 +213,9 @@ async fn security_classic_mode_disabled_masks_valid_length_payload() { client_side.write_all(&vec![0xEF; 64]).await.unwrap(); client_side.shutdown().await.unwrap(); - let _ = tokio::time::timeout(Duration::from_secs(2), handler).await.unwrap(); + let _ = tokio::time::timeout(Duration::from_secs(2), handler) + .await + .unwrap(); assert_eq!(stats.get_connects_bad(), 1); } @@ -200,7 +223,10 @@ async fn security_classic_mode_disabled_masks_valid_length_payload() { async fn concurrency_ip_tracker_strict_limit_one_rapid_churn() { let user = "rapid-churn-user"; let mut config = ProxyConfig::default(); - config.access.user_max_tcp_conns.insert(user.to_string(), 10); + config + .access + .user_max_tcp_conns + .insert(user.to_string(), 10); let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); diff --git a/src/proxy/tests/client_security_tests.rs b/src/proxy/tests/client_security_tests.rs index bae1ce2..1b46c6d 100644 --- a/src/proxy/tests/client_security_tests.rs +++ b/src/proxy/tests/client_security_tests.rs @@ -7,9 +7,9 @@ use crate::protocol::tls; use crate::proxy::handshake::HandshakeSuccess; use crate::stream::{CryptoReader, CryptoWriter}; use crate::transport::proxy_protocol::ProxyProtocolV1Builder; -use rand::rngs::StdRng; use rand::Rng; use rand::SeedableRng; +use rand::rngs::StdRng; use std::net::Ipv4Addr; use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; use tokio::net::{TcpListener, TcpStream}; @@ -34,7 +34,10 @@ fn handshake_timeout_with_mask_grace_includes_mask_margin() { config.timeouts.client_handshake = 2; config.censorship.mask = false; - assert_eq!(handshake_timeout_with_mask_grace(&config), Duration::from_secs(2)); + assert_eq!( + handshake_timeout_with_mask_grace(&config), + Duration::from_secs(2) + ); config.censorship.mask = true; assert_eq!( @@ -86,7 +89,10 @@ impl tokio::io::AsyncRead for ErrorReader { _cx: &mut std::task::Context<'_>, _buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll> { - std::task::Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "fake error"))) + std::task::Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "fake error", + ))) } } @@ -124,7 +130,10 @@ fn handshake_timeout_without_mask_is_exact_base() { config.timeouts.client_handshake = 7; config.censorship.mask = false; - assert_eq!(handshake_timeout_with_mask_grace(&config), Duration::from_secs(7)); + assert_eq!( + handshake_timeout_with_mask_grace(&config), + Duration::from_secs(7) + ); } #[test] @@ -133,7 +142,10 @@ fn handshake_timeout_mask_enabled_adds_750ms() { config.timeouts.client_handshake = 3; config.censorship.mask = true; - assert_eq!(handshake_timeout_with_mask_grace(&config), Duration::from_millis(3750)); + assert_eq!( + handshake_timeout_with_mask_grace(&config), + Duration::from_millis(3750) + ); } #[tokio::test] @@ -155,10 +167,12 @@ async fn read_with_progress_fragmented_io_works_over_multiple_calls() { let mut b = vec![0u8; chunk_size]; let n = read_with_progress(&mut cursor, &mut b).await.unwrap(); result.extend_from_slice(&b[..n]); - if n == 0 { break; } + if n == 0 { + break; + } } - assert_eq!(result, vec![1,2,3,4,5]); + assert_eq!(result, vec![1, 2, 3, 4, 5]); } #[tokio::test] @@ -174,7 +188,9 @@ async fn read_with_progress_stress_randomized_chunk_sizes() { let mut b = vec![0u8; chunk]; let read = read_with_progress(&mut cursor, &mut b).await.unwrap(); collected.extend_from_slice(&b[..read]); - if read == 0 { break; } + if read == 0 { + break; + } } assert_eq!(collected, input); @@ -215,10 +231,12 @@ fn wrap_tls_application_record_roundtrip_size_check() { let mut consumed = 0; while idx + 5 <= wrapped.len() { assert_eq!(wrapped[idx], 0x17); - let len = u16::from_be_bytes([wrapped[idx+3], wrapped[idx+4]]) as usize; + let len = u16::from_be_bytes([wrapped[idx + 3], wrapped[idx + 4]]) as usize; consumed += len; idx += 5 + len; - if idx >= wrapped.len() { break; } + if idx >= wrapped.len() { + break; + } } assert_eq!(consumed, payload_len); diff --git a/src/proxy/tests/client_tls_record_wrap_hardening_security_tests.rs b/src/proxy/tests/client_tls_record_wrap_hardening_security_tests.rs index 08f52d1..7964cdd 100644 --- a/src/proxy/tests/client_tls_record_wrap_hardening_security_tests.rs +++ b/src/proxy/tests/client_tls_record_wrap_hardening_security_tests.rs @@ -25,13 +25,26 @@ fn wrap_tls_application_record_oversized_payload_is_chunked_without_truncation() let len = u16::from_be_bytes([record[offset + 3], record[offset + 4]]) as usize; let body_start = offset + 5; let body_end = body_start + len; - assert!(body_end <= record.len(), "declared TLS record length must be in-bounds"); + assert!( + body_end <= record.len(), + "declared TLS record length must be in-bounds" + ); recovered.extend_from_slice(&record[body_start..body_end]); offset = body_end; frames += 1; } - assert_eq!(offset, record.len(), "record parser must consume exact output size"); - assert_eq!(frames, 2, "oversized payload should split into exactly two records"); - assert_eq!(recovered, payload, "chunked records must preserve full payload"); + assert_eq!( + offset, + record.len(), + "record parser must consume exact output size" + ); + assert_eq!( + frames, 2, + "oversized payload should split into exactly two records" + ); + assert_eq!( + recovered, payload, + "chunked records must preserve full payload" + ); } diff --git a/src/proxy/tests/direct_relay_security_tests.rs b/src/proxy/tests/direct_relay_security_tests.rs index 16fe8da..a731830 100644 --- a/src/proxy/tests/direct_relay_security_tests.rs +++ b/src/proxy/tests/direct_relay_security_tests.rs @@ -773,8 +773,7 @@ fn anchored_open_nix_path_writes_expected_lines() { "target/telemt-unknown-dc-anchored-open-ok-{}/unknown-dc.log", std::process::id() ); - let sanitized = - sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize"); + let sanitized = sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize"); let _ = fs::remove_file(&sanitized.resolved_path); let mut first = open_unknown_dc_log_append_anchored(&sanitized) @@ -787,7 +786,10 @@ fn anchored_open_nix_path_writes_expected_lines() { let content = fs::read_to_string(&sanitized.resolved_path).expect("anchored log file must be readable"); - let lines: Vec<&str> = content.lines().filter(|line| !line.trim().is_empty()).collect(); + let lines: Vec<&str> = content + .lines() + .filter(|line| !line.trim().is_empty()) + .collect(); assert_eq!(lines.len(), 2, "expected one line per anchored append call"); assert!( lines.contains(&"dc_idx=31200") && lines.contains(&"dc_idx=31201"), @@ -811,8 +813,7 @@ fn anchored_open_parallel_appends_preserve_line_integrity() { "target/telemt-unknown-dc-anchored-open-parallel-{}/unknown-dc.log", std::process::id() ); - let sanitized = - sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize"); + let sanitized = sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize"); let _ = fs::remove_file(&sanitized.resolved_path); let mut workers = Vec::new(); @@ -831,8 +832,15 @@ fn anchored_open_parallel_appends_preserve_line_integrity() { let content = fs::read_to_string(&sanitized.resolved_path).expect("parallel log file must be readable"); - let lines: Vec<&str> = content.lines().filter(|line| !line.trim().is_empty()).collect(); - assert_eq!(lines.len(), 64, "expected one complete line per worker append"); + let lines: Vec<&str> = content + .lines() + .filter(|line| !line.trim().is_empty()) + .collect(); + assert_eq!( + lines.len(), + 64, + "expected one complete line per worker append" + ); for line in lines { assert!( line.starts_with("dc_idx="), @@ -867,8 +875,7 @@ fn anchored_open_creates_private_0600_file_permissions() { "target/telemt-unknown-dc-anchored-perms-{}/unknown-dc.log", std::process::id() ); - let sanitized = - sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize"); + let sanitized = sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize"); let _ = fs::remove_file(&sanitized.resolved_path); let mut file = open_unknown_dc_log_append_anchored(&sanitized) @@ -905,8 +912,7 @@ fn anchored_open_rejects_existing_symlink_target() { "target/telemt-unknown-dc-anchored-symlink-target-{}/unknown-dc.log", std::process::id() ); - let sanitized = - sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize"); + let sanitized = sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize"); let outside = std::env::temp_dir().join(format!( "telemt-unknown-dc-anchored-symlink-outside-{}.log", @@ -943,8 +949,7 @@ fn anchored_open_high_contention_multi_write_preserves_complete_lines() { "target/telemt-unknown-dc-anchored-contention-{}/unknown-dc.log", std::process::id() ); - let sanitized = - sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize"); + let sanitized = sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize"); let _ = fs::remove_file(&sanitized.resolved_path); let workers = 24usize; @@ -970,7 +975,10 @@ fn anchored_open_high_contention_multi_write_preserves_complete_lines() { let content = fs::read_to_string(&sanitized.resolved_path) .expect("contention output file must be readable"); - let lines: Vec<&str> = content.lines().filter(|line| !line.trim().is_empty()).collect(); + let lines: Vec<&str> = content + .lines() + .filter(|line| !line.trim().is_empty()) + .collect(); assert_eq!( lines.len(), workers * rounds, @@ -1014,8 +1022,7 @@ fn append_unknown_dc_line_returns_error_for_read_only_descriptor() { "target/telemt-unknown-dc-append-ro-{}/unknown-dc.log", std::process::id() ); - let sanitized = - sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize"); + let sanitized = sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize"); fs::write(&sanitized.resolved_path, "seed\n").expect("seed file must be writable"); let mut readonly = std::fs::OpenOptions::new() diff --git a/src/proxy/tests/handshake_advanced_clever_tests.rs b/src/proxy/tests/handshake_advanced_clever_tests.rs index 9b12f21..76347c4 100644 --- a/src/proxy/tests/handshake_advanced_clever_tests.rs +++ b/src/proxy/tests/handshake_advanced_clever_tests.rs @@ -1,5 +1,5 @@ use super::*; -use crate::crypto::{sha256, sha256_hmac, AesCtr}; +use crate::crypto::{AesCtr, sha256, sha256_hmac}; use crate::protocol::constants::{ProtoTag, RESERVED_NONCE_BEGINNINGS, RESERVED_NONCE_FIRST_BYTES}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; @@ -175,7 +175,10 @@ async fn tls_minimum_viable_length_boundary() { None, ) .await; - assert!(matches!(res, HandshakeResult::Success(_)), "Exact minimum length TLS handshake must succeed"); + assert!( + matches!(res, HandshakeResult::Success(_)), + "Exact minimum length TLS handshake must succeed" + ); let short_handshake = vec![0x42u8; min_len - 1]; let res_short = handle_tls_handshake( @@ -189,7 +192,10 @@ async fn tls_minimum_viable_length_boundary() { None, ) .await; - assert!(matches!(res_short, HandshakeResult::BadClient { .. }), "Handshake 1 byte shorter than minimum must fail closed"); + assert!( + matches!(res_short, HandshakeResult::BadClient { .. }), + "Handshake 1 byte shorter than minimum must fail closed" + ); } #[tokio::test] @@ -219,9 +225,16 @@ async fn mtproto_extreme_dc_index_serialization() { match res { HandshakeResult::Success((_, _, success)) => { - assert_eq!(success.dc_idx, extreme_dc, "Extreme DC index {} must serialize/deserialize perfectly", extreme_dc); + assert_eq!( + success.dc_idx, extreme_dc, + "Extreme DC index {} must serialize/deserialize perfectly", + extreme_dc + ); } - _ => panic!("MTProto handshake with extreme DC index {} failed", extreme_dc), + _ => panic!( + "MTProto handshake with extreme DC index {} failed", + extreme_dc + ), } } } @@ -253,7 +266,11 @@ async fn alpn_strict_case_and_padding_rejection() { None, ) .await; - assert!(matches!(res, HandshakeResult::BadClient { .. }), "ALPN strict enforcement must reject {:?}", bad_alpn); + assert!( + matches!(res, HandshakeResult::BadClient { .. }), + "ALPN strict enforcement must reject {:?}", + bad_alpn + ); } } @@ -265,8 +282,15 @@ fn ipv4_mapped_ipv6_bucketing_anomaly() { let norm_1 = normalize_auth_probe_ip(ipv4_mapped_1); let norm_2 = normalize_auth_probe_ip(ipv4_mapped_2); - assert_eq!(norm_1, norm_2, "IPv4-mapped IPv6 addresses must collapse into the same /64 bucket (::0)"); - assert_eq!(norm_1, IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), "The bucket must be exactly ::0"); + assert_eq!( + norm_1, norm_2, + "IPv4-mapped IPv6 addresses must collapse into the same /64 bucket (::0)" + ); + assert_eq!( + norm_1, + IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), + "The bucket must be exactly ::0" + ); } // --- Category 2: Adversarial & Black Hat --- @@ -309,7 +333,10 @@ async fn mtproto_invalid_ciphertext_does_not_poison_replay_cache() { None, ) .await; - assert!(matches!(res_valid, HandshakeResult::Success(_)), "Invalid MTProto ciphertext must not poison the replay cache"); + assert!( + matches!(res_valid, HandshakeResult::Success(_)), + "Invalid MTProto ciphertext must not poison the replay cache" + ); } #[tokio::test] @@ -352,7 +379,10 @@ async fn tls_invalid_session_does_not_poison_replay_cache() { None, ) .await; - assert!(matches!(res_valid, HandshakeResult::Success(_)), "Invalid TLS payload must not poison the replay cache"); + assert!( + matches!(res_valid, HandshakeResult::Success(_)), + "Invalid TLS payload must not poison the replay cache" + ); } #[tokio::test] @@ -387,7 +417,10 @@ async fn server_hello_delay_timing_neutrality_on_hmac_failure() { let elapsed = start.elapsed(); assert!(matches!(res, HandshakeResult::BadClient { .. })); - assert!(elapsed >= Duration::from_millis(45), "Invalid HMAC must still incur the configured ServerHello delay to prevent timing side-channels"); + assert!( + elapsed >= Duration::from_millis(45), + "Invalid HMAC must still incur the configured ServerHello delay to prevent timing side-channels" + ); } #[tokio::test] @@ -421,7 +454,10 @@ async fn server_hello_delay_inversion_resilience() { let elapsed = start.elapsed(); assert!(matches!(res, HandshakeResult::Success(_))); - assert!(elapsed >= Duration::from_millis(90), "Delay logic must gracefully handle min > max inversions via max.max(min)"); + assert!( + elapsed >= Duration::from_millis(90), + "Delay logic must gracefully handle min > max inversions via max.max(min)" + ); } #[tokio::test] @@ -436,10 +472,16 @@ async fn mixed_valid_and_invalid_user_secrets_configuration() { for i in 0..9 { let bad_secret = if i % 2 == 0 { "badhex!" } else { "1122" }; - config.access.users.insert(format!("bad_user_{}", i), bad_secret.to_string()); + config + .access + .users + .insert(format!("bad_user_{}", i), bad_secret.to_string()); } let valid_secret_hex = "99999999999999999999999999999999"; - config.access.users.insert("good_user".to_string(), valid_secret_hex.to_string()); + config + .access + .users + .insert("good_user".to_string(), valid_secret_hex.to_string()); config.general.modes.secure = true; config.general.modes.classic = true; config.general.modes.tls = true; @@ -463,7 +505,10 @@ async fn mixed_valid_and_invalid_user_secrets_configuration() { ) .await; - assert!(matches!(res, HandshakeResult::Success(_)), "Proxy must gracefully skip invalid secrets and authenticate the valid one"); + assert!( + matches!(res, HandshakeResult::Success(_)), + "Proxy must gracefully skip invalid secrets and authenticate the valid one" + ); } #[tokio::test] @@ -494,7 +539,10 @@ async fn tls_emulation_fallback_when_cache_missing() { ) .await; - assert!(matches!(res, HandshakeResult::Success(_)), "TLS emulation must gracefully fall back to standard ServerHello if cache is missing"); + assert!( + matches!(res, HandshakeResult::Success(_)), + "TLS emulation must gracefully fall back to standard ServerHello if cache is missing" + ); } #[tokio::test] @@ -524,7 +572,10 @@ async fn classic_mode_over_tls_transport_protocol_confusion() { ) .await; - assert!(matches!(res, HandshakeResult::Success(_)), "Intermediate tag over TLS must succeed if classic mode is enabled, locking in cross-transport behavior"); + assert!( + matches!(res, HandshakeResult::Success(_)), + "Intermediate tag over TLS must succeed if classic mode is enabled, locking in cross-transport behavior" + ); } #[test] @@ -543,9 +594,15 @@ fn generate_tg_nonce_never_emits_reserved_bytes() { false, ); - assert!(!RESERVED_NONCE_FIRST_BYTES.contains(&nonce[0]), "Nonce must never start with reserved bytes"); + assert!( + !RESERVED_NONCE_FIRST_BYTES.contains(&nonce[0]), + "Nonce must never start with reserved bytes" + ); let first_four: [u8; 4] = [nonce[0], nonce[1], nonce[2], nonce[3]]; - assert!(!RESERVED_NONCE_BEGINNINGS.contains(&first_four), "Nonce must never match reserved 4-byte beginnings"); + assert!( + !RESERVED_NONCE_BEGINNINGS.contains(&first_four), + "Nonce must never match reserved 4-byte beginnings" + ); } } @@ -568,11 +625,18 @@ async fn dashmap_concurrent_saturation_stress() { } for task in tasks { - task.await.expect("Task panicked during concurrent DashMap stress"); + task.await + .expect("Task panicked during concurrent DashMap stress"); } - assert!(auth_probe_is_throttled_for_testing(ip_a), "IP A must be throttled after concurrent stress"); - assert!(auth_probe_is_throttled_for_testing(ip_b), "IP B must be throttled after concurrent stress"); + assert!( + auth_probe_is_throttled_for_testing(ip_a), + "IP A must be throttled after concurrent stress" + ); + assert!( + auth_probe_is_throttled_for_testing(ip_b), + "IP B must be throttled after concurrent stress" + ); } #[test] @@ -586,7 +650,12 @@ fn prototag_invalid_bytes_fail_closed() { ]; for tag in invalid_tags { - assert_eq!(ProtoTag::from_bytes(tag), None, "Invalid ProtoTag bytes {:?} must fail closed", tag); + assert_eq!( + ProtoTag::from_bytes(tag), + None, + "Invalid ProtoTag bytes {:?} must fail closed", + tag + ); } } @@ -603,7 +672,10 @@ fn auth_probe_eviction_hash_collision_stress() { auth_probe_record_failure_with_state(state, ip, now); } - assert!(state.len() <= AUTH_PROBE_TRACK_MAX_ENTRIES, "Eviction logic must successfully bound the map size under heavy insertion stress"); + assert!( + state.len() <= AUTH_PROBE_TRACK_MAX_ENTRIES, + "Eviction logic must successfully bound the map size under heavy insertion stress" + ); } #[test] diff --git a/src/proxy/tests/handshake_auth_probe_eviction_bias_security_tests.rs b/src/proxy/tests/handshake_auth_probe_eviction_bias_security_tests.rs index 6c48cc1..77cea19 100644 --- a/src/proxy/tests/handshake_auth_probe_eviction_bias_security_tests.rs +++ b/src/proxy/tests/handshake_auth_probe_eviction_bias_security_tests.rs @@ -88,6 +88,9 @@ fn light_fuzz_offset_always_stays_inside_state_len() { let now = base + Duration::from_nanos(seed & 0x0fff); let start = auth_probe_scan_start_offset(ip, now, state_len, scan_limit); - assert!(start < state_len, "scan offset must stay inside state length"); + assert!( + start < state_len, + "scan offset must stay inside state length" + ); } -} \ No newline at end of file +} diff --git a/src/proxy/tests/handshake_auth_probe_scan_budget_security_tests.rs b/src/proxy/tests/handshake_auth_probe_scan_budget_security_tests.rs index ece6ff5..c91a215 100644 --- a/src/proxy/tests/handshake_auth_probe_scan_budget_security_tests.rs +++ b/src/proxy/tests/handshake_auth_probe_scan_budget_security_tests.rs @@ -96,4 +96,4 @@ fn light_fuzz_scan_offset_budget_never_exceeds_effective_window() { "scan offset must stay inside state length" ); } -} \ No newline at end of file +} diff --git a/src/proxy/tests/handshake_auth_probe_scan_offset_stress_tests.rs b/src/proxy/tests/handshake_auth_probe_scan_offset_stress_tests.rs index 260a1b9..bf97990 100644 --- a/src/proxy/tests/handshake_auth_probe_scan_offset_stress_tests.rs +++ b/src/proxy/tests/handshake_auth_probe_scan_offset_stress_tests.rs @@ -113,4 +113,4 @@ fn light_fuzz_scan_offset_stays_within_window_for_randomized_inputs() { "scan offset must always remain inside state length" ); } -} \ No newline at end of file +} diff --git a/src/proxy/tests/handshake_more_clever_tests.rs b/src/proxy/tests/handshake_more_clever_tests.rs index 77df442..9782469 100644 --- a/src/proxy/tests/handshake_more_clever_tests.rs +++ b/src/proxy/tests/handshake_more_clever_tests.rs @@ -1,8 +1,8 @@ use super::*; -use crate::crypto::{sha256, sha256_hmac, AesCtr}; +use crate::crypto::{AesCtr, sha256, sha256_hmac}; use crate::protocol::constants::{ProtoTag, RESERVED_NONCE_BEGINNINGS, RESERVED_NONCE_FIRST_BYTES}; -use rand::{Rng, SeedableRng}; use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; use std::collections::HashSet; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; @@ -223,7 +223,10 @@ fn auth_probe_backoff_extreme_fail_streak_clamps_safely() { assert_eq!(updated.fail_streak, u32::MAX); let expected_blocked_until = now + Duration::from_millis(AUTH_PROBE_BACKOFF_MAX_MS); - assert_eq!(updated.blocked_until, expected_blocked_until, "Extreme fail streak must clamp cleanly to AUTH_PROBE_BACKOFF_MAX_MS"); + assert_eq!( + updated.blocked_until, expected_blocked_until, + "Extreme fail streak must clamp cleanly to AUTH_PROBE_BACKOFF_MAX_MS" + ); } #[test] @@ -250,12 +253,19 @@ fn generate_tg_nonce_cryptographic_uniqueness_and_entropy() { total_set_bits += byte.count_ones() as usize; } - assert!(nonces.insert(nonce), "generate_tg_nonce emitted a duplicate nonce! RNG is stuck."); + assert!( + nonces.insert(nonce), + "generate_tg_nonce emitted a duplicate nonce! RNG is stuck." + ); } let total_bits = iterations * HANDSHAKE_LEN * 8; let ratio = (total_set_bits as f64) / (total_bits as f64); - assert!(ratio > 0.48 && ratio < 0.52, "Nonce entropy is degraded. Set bit ratio: {}", ratio); + assert!( + ratio > 0.48 && ratio < 0.52, + "Nonce entropy is degraded. Set bit ratio: {}", + ratio + ); } #[tokio::test] @@ -267,10 +277,19 @@ async fn mtproto_multi_user_decryption_isolation() { config.general.modes.secure = true; config.access.ignore_time_skew = true; - config.access.users.insert("user_a".to_string(), "11111111111111111111111111111111".to_string()); - config.access.users.insert("user_b".to_string(), "22222222222222222222222222222222".to_string()); + config.access.users.insert( + "user_a".to_string(), + "11111111111111111111111111111111".to_string(), + ); + config.access.users.insert( + "user_b".to_string(), + "22222222222222222222222222222222".to_string(), + ); let good_secret_hex = "33333333333333333333333333333333"; - config.access.users.insert("user_c".to_string(), good_secret_hex.to_string()); + config + .access + .users + .insert("user_c".to_string(), good_secret_hex.to_string()); let replay_checker = ReplayChecker::new(128, Duration::from_secs(60)); let peer: SocketAddr = "192.0.2.104:12345".parse().unwrap(); @@ -291,9 +310,14 @@ async fn mtproto_multi_user_decryption_isolation() { match res { HandshakeResult::Success((_, _, success)) => { - assert_eq!(success.user, "user_c", "Decryption attempts on previous users must not corrupt the handshake buffer for the valid user"); + assert_eq!( + success.user, "user_c", + "Decryption attempts on previous users must not corrupt the handshake buffer for the valid user" + ); } - _ => panic!("Multi-user MTProto handshake failed. Decryption buffer might be mutating in place."), + _ => panic!( + "Multi-user MTProto handshake failed. Decryption buffer might be mutating in place." + ), } } @@ -325,7 +349,9 @@ async fn invalid_secret_warning_lock_contention_and_bound() { } let warned = INVALID_SECRET_WARNED.get().unwrap(); - let guard = warned.lock().unwrap_or_else(|poisoned| poisoned.into_inner()); + let guard = warned + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); assert_eq!( guard.len(), @@ -342,7 +368,11 @@ async fn mtproto_strict_concurrent_replay_race_condition() { let secret_hex = "4A4A4A4A4A4A4A4A4A4A4A4A4A4A4A4A"; let config = Arc::new(test_config_with_secret_hex(secret_hex)); let replay_checker = Arc::new(ReplayChecker::new(4096, Duration::from_secs(60))); - let valid_handshake = Arc::new(make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 1)); + let valid_handshake = Arc::new(make_valid_mtproto_handshake( + secret_hex, + ProtoTag::Secure, + 1, + )); let tasks = 100; let barrier = Arc::new(Barrier::new(tasks)); @@ -355,7 +385,10 @@ async fn mtproto_strict_concurrent_replay_race_condition() { let hs = valid_handshake.clone(); handles.push(tokio::spawn(async move { - let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, (i % 250) as u8)), 10000 + i as u16); + let peer = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(10, 0, 0, (i % 250) as u8)), + 10000 + i as u16, + ); b.wait().await; handle_mtproto_handshake( &hs, @@ -382,8 +415,15 @@ async fn mtproto_strict_concurrent_replay_race_condition() { } } - assert_eq!(successes, 1, "Replay cache race condition allowed multiple identical MTProto handshakes to succeed"); - assert_eq!(failures, tasks - 1, "Replay cache failed to forcefully reject concurrent duplicates"); + assert_eq!( + successes, 1, + "Replay cache race condition allowed multiple identical MTProto handshakes to succeed" + ); + assert_eq!( + failures, + tasks - 1, + "Replay cache failed to forcefully reject concurrent duplicates" + ); } #[tokio::test] @@ -398,7 +438,8 @@ async fn tls_alpn_zero_length_protocol_handled_safely() { let rng = SecureRandom::new(); let peer: SocketAddr = "192.0.2.107:12345".parse().unwrap(); - let handshake = make_valid_tls_client_hello_with_sni_and_alpn(&secret, 0, "example.com", &[b""]); + let handshake = + make_valid_tls_client_hello_with_sni_and_alpn(&secret, 0, "example.com", &[b""]); let res = handle_tls_handshake( &handshake, @@ -412,7 +453,10 @@ async fn tls_alpn_zero_length_protocol_handled_safely() { ) .await; - assert!(matches!(res, HandshakeResult::BadClient { .. }), "0-length ALPN must be safely rejected without panicking"); + assert!( + matches!(res, HandshakeResult::BadClient { .. }), + "0-length ALPN must be safely rejected without panicking" + ); } #[tokio::test] @@ -427,7 +471,8 @@ async fn tls_sni_massive_hostname_does_not_panic() { let peer: SocketAddr = "192.0.2.108:12345".parse().unwrap(); let massive_hostname = String::from_utf8(vec![b'a'; 65000]).unwrap(); - let handshake = make_valid_tls_client_hello_with_sni_and_alpn(&secret, 0, &massive_hostname, &[]); + let handshake = + make_valid_tls_client_hello_with_sni_and_alpn(&secret, 0, &massive_hostname, &[]); let res = handle_tls_handshake( &handshake, @@ -441,7 +486,13 @@ async fn tls_sni_massive_hostname_does_not_panic() { ) .await; - assert!(matches!(res, HandshakeResult::Success(_) | HandshakeResult::BadClient { .. }), "Massive SNI hostname must be processed or ignored without stack overflow or panic"); + assert!( + matches!( + res, + HandshakeResult::Success(_) | HandshakeResult::BadClient { .. } + ), + "Massive SNI hostname must be processed or ignored without stack overflow or panic" + ); } #[tokio::test] @@ -455,7 +506,8 @@ async fn tls_progressive_truncation_fuzzing_no_panics() { let rng = SecureRandom::new(); let peer: SocketAddr = "192.0.2.109:12345".parse().unwrap(); - let valid_handshake = make_valid_tls_client_hello_with_sni_and_alpn(&secret, 0, "example.com", &[b"h2"]); + let valid_handshake = + make_valid_tls_client_hello_with_sni_and_alpn(&secret, 0, "example.com", &[b"h2"]); let full_len = valid_handshake.len(); // Truncated corpus only: full_len is a valid baseline and should not be @@ -473,7 +525,11 @@ async fn tls_progressive_truncation_fuzzing_no_panics() { None, ) .await; - assert!(matches!(res, HandshakeResult::BadClient { .. }), "Truncated TLS handshake at len {} must fail safely without panicking", i); + assert!( + matches!(res, HandshakeResult::BadClient { .. }), + "Truncated TLS handshake at len {} must fail safely without panicking", + i + ); } } @@ -504,7 +560,10 @@ async fn mtproto_pure_entropy_fuzzing_no_panics() { ) .await; - assert!(matches!(res, HandshakeResult::BadClient { .. }), "Pure entropy MTProto payload must fail closed and never panic"); + assert!( + matches!(res, HandshakeResult::BadClient { .. }), + "Pure entropy MTProto payload must fail closed and never panic" + ); } } @@ -517,10 +576,16 @@ fn decode_user_secret_odd_length_hex_rejection() { let mut config = ProxyConfig::default(); config.access.users.clear(); - config.access.users.insert("odd_user".to_string(), "1234567890123456789012345678901".to_string()); + config.access.users.insert( + "odd_user".to_string(), + "1234567890123456789012345678901".to_string(), + ); let decoded = decode_user_secrets(&config, None); - assert!(decoded.is_empty(), "Odd-length hex string must be gracefully rejected by hex::decode without unwrapping"); + assert!( + decoded.is_empty(), + "Odd-length hex string must be gracefully rejected by hex::decode without unwrapping" + ); } #[test] @@ -552,7 +617,10 @@ fn saturation_grace_pre_existing_high_fail_streak_immediate_throttle() { } let is_throttled = auth_probe_should_apply_preauth_throttle(peer_ip, now); - assert!(is_throttled, "A peer with a pre-existing high fail streak must be immediately throttled when saturation begins, receiving no unearned grace period"); + assert!( + is_throttled, + "A peer with a pre-existing high fail streak must be immediately throttled when saturation begins, receiving no unearned grace period" + ); } #[test] @@ -586,7 +654,11 @@ fn mtproto_classic_tags_rejected_when_only_secure_mode_enabled() { config.general.modes.tls = false; assert!(!mode_enabled_for_proto(&config, ProtoTag::Abridged, false)); - assert!(!mode_enabled_for_proto(&config, ProtoTag::Intermediate, false)); + assert!(!mode_enabled_for_proto( + &config, + ProtoTag::Intermediate, + false + )); } #[test] diff --git a/src/proxy/tests/handshake_real_bug_stress_tests.rs b/src/proxy/tests/handshake_real_bug_stress_tests.rs index d7234ff..1e27ed5 100644 --- a/src/proxy/tests/handshake_real_bug_stress_tests.rs +++ b/src/proxy/tests/handshake_real_bug_stress_tests.rs @@ -1,5 +1,5 @@ use super::*; -use crate::crypto::{sha256, sha256_hmac, AesCtr, SecureRandom}; +use crate::crypto::{AesCtr, SecureRandom, sha256, sha256_hmac}; use crate::protocol::constants::{ProtoTag, TLS_RECORD_HANDSHAKE, TLS_VERSION}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; @@ -80,8 +80,7 @@ fn make_valid_tls_client_hello_with_alpn( digest[28 + i] ^= ts[i]; } - record[tls::TLS_DIGEST_POS..tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN] - .copy_from_slice(&digest); + record[tls::TLS_DIGEST_POS..tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN].copy_from_slice(&digest); record } @@ -331,7 +330,11 @@ async fn saturation_grace_exhaustion_under_concurrency_keeps_peer_throttled() { let final_state = state.get(&peer_ip).expect("state must exist"); assert!( - final_state.fail_streak >= AUTH_PROBE_BACKOFF_START_FAILS + AUTH_PROBE_SATURATION_GRACE_FAILS + final_state.fail_streak + >= AUTH_PROBE_BACKOFF_START_FAILS + AUTH_PROBE_SATURATION_GRACE_FAILS ); - assert!(auth_probe_should_apply_preauth_throttle(peer_ip, Instant::now())); + assert!(auth_probe_should_apply_preauth_throttle( + peer_ip, + Instant::now() + )); } diff --git a/src/proxy/tests/handshake_timing_manual_bench_tests.rs b/src/proxy/tests/handshake_timing_manual_bench_tests.rs index 95e9f49..13d112c 100644 --- a/src/proxy/tests/handshake_timing_manual_bench_tests.rs +++ b/src/proxy/tests/handshake_timing_manual_bench_tests.rs @@ -1,5 +1,5 @@ use super::*; -use crate::crypto::{sha256, sha256_hmac, AesCtr, SecureRandom}; +use crate::crypto::{AesCtr, SecureRandom, sha256, sha256_hmac}; use crate::protocol::constants::{ProtoTag, TLS_RECORD_HANDSHAKE, TLS_VERSION}; use std::net::SocketAddr; use std::time::{Duration, Instant}; @@ -169,10 +169,10 @@ async fn mtproto_user_scan_timing_manual_benchmark() { ); } - config.access.users.insert( - preferred_user.to_string(), - target_secret_hex.to_string(), - ); + config + .access + .users + .insert(preferred_user.to_string(), target_secret_hex.to_string()); let replay_checker_preferred = ReplayChecker::new(65_536, Duration::from_secs(60)); let replay_checker_full_scan = ReplayChecker::new(65_536, Duration::from_secs(60)); diff --git a/src/proxy/tests/masking_ab_envelope_blur_integration_security_tests.rs b/src/proxy/tests/masking_ab_envelope_blur_integration_security_tests.rs index 84c904f..a977409 100644 --- a/src/proxy/tests/masking_ab_envelope_blur_integration_security_tests.rs +++ b/src/proxy/tests/masking_ab_envelope_blur_integration_security_tests.rs @@ -544,7 +544,6 @@ async fn timing_classifier_light_fuzz_pairwise_bucketed_accuracy_stays_bounded_u if hardened_acc + 0.05 <= baseline_acc { meaningful_improvement_seen = true; } - } assert!( diff --git a/src/proxy/tests/masking_aggressive_mode_security_tests.rs b/src/proxy/tests/masking_aggressive_mode_security_tests.rs index a77fc14..7356dc0 100644 --- a/src/proxy/tests/masking_aggressive_mode_security_tests.rs +++ b/src/proxy/tests/masking_aggressive_mode_security_tests.rs @@ -85,7 +85,10 @@ async fn aggressive_mode_shapes_backend_silent_non_eof_path() { let legacy = capture_forwarded_len_with_mode(body_sent, false, false, false, 0).await; let aggressive = capture_forwarded_len_with_mode(body_sent, false, true, false, 0).await; - assert!(legacy < floor, "legacy mode should keep timeout path unshaped"); + assert!( + legacy < floor, + "legacy mode should keep timeout path unshaped" + ); assert!( aggressive >= floor, "aggressive mode must shape backend-silent non-EOF paths (aggressive={aggressive}, floor={floor})" diff --git a/src/proxy/tests/masking_connect_failure_close_matrix_security_tests.rs b/src/proxy/tests/masking_connect_failure_close_matrix_security_tests.rs index 614af9b..718189c 100644 --- a/src/proxy/tests/masking_connect_failure_close_matrix_security_tests.rs +++ b/src/proxy/tests/masking_connect_failure_close_matrix_security_tests.rs @@ -52,7 +52,10 @@ async fn run_connect_failure_case( .await .unwrap() .unwrap(); - assert_eq!(n, 0, "connect-failure path must close client-visible writer"); + assert_eq!( + n, 0, + "connect-failure path must close client-visible writer" + ); started.elapsed() } @@ -67,13 +70,9 @@ async fn connect_failure_refusal_close_behavior_matrix() { let peer: SocketAddr = format!("203.0.113.210:{}", 54100 + idx as u16) .parse() .unwrap(); - let elapsed = run_connect_failure_case( - "127.0.0.1", - unused_port, - timing_normalization_enabled, - peer, - ) - .await; + let elapsed = + run_connect_failure_case("127.0.0.1", unused_port, timing_normalization_enabled, peer) + .await; if timing_normalization_enabled { assert!( diff --git a/src/proxy/tests/masking_consume_idle_timeout_security_tests.rs b/src/proxy/tests/masking_consume_idle_timeout_security_tests.rs index b52af35..f2c39a2 100644 --- a/src/proxy/tests/masking_consume_idle_timeout_security_tests.rs +++ b/src/proxy/tests/masking_consume_idle_timeout_security_tests.rs @@ -79,7 +79,10 @@ async fn io_error_terminates_cleanly() { } } - tokio::time::timeout(MASK_RELAY_TIMEOUT, consume_client_data(ErrReader, usize::MAX)) - .await - .expect("consume_client_data did not return on I/O error"); + tokio::time::timeout( + MASK_RELAY_TIMEOUT, + consume_client_data(ErrReader, usize::MAX), + ) + .await + .expect("consume_client_data did not return on I/O error"); } diff --git a/src/proxy/tests/masking_extended_attack_surface_security_tests.rs b/src/proxy/tests/masking_extended_attack_surface_security_tests.rs index 040f567..650731c 100644 --- a/src/proxy/tests/masking_extended_attack_surface_security_tests.rs +++ b/src/proxy/tests/masking_extended_attack_surface_security_tests.rs @@ -32,8 +32,16 @@ async fn run_self_target_refusal( let (mut client, server) = duplex(1024); let started = Instant::now(); let task = tokio::spawn(async move { - handle_bad_client(server, tokio::io::sink(), initial, peer, local_addr, &config, &beobachten) - .await; + handle_bad_client( + server, + tokio::io::sink(), + initial, + peer, + local_addr, + &config, + &beobachten, + ) + .await; }); client @@ -214,4 +222,4 @@ async fn stress_high_fanout_self_target_refusal_no_deadlock_or_timeout() { }) .await .expect("high-fanout refusal workload must complete without deadlock"); -} \ No newline at end of file +} diff --git a/src/proxy/tests/masking_http_probe_boundary_security_tests.rs b/src/proxy/tests/masking_http_probe_boundary_security_tests.rs index 47b6dc6..c8f3ec0 100644 --- a/src/proxy/tests/masking_http_probe_boundary_security_tests.rs +++ b/src/proxy/tests/masking_http_probe_boundary_security_tests.rs @@ -2,7 +2,13 @@ use super::*; #[test] fn exact_four_byte_http_tokens_are_classified() { - for token in [b"GET ".as_ref(), b"POST".as_ref(), b"HEAD".as_ref(), b"PUT ".as_ref(), b"PRI ".as_ref()] { + for token in [ + b"GET ".as_ref(), + b"POST".as_ref(), + b"HEAD".as_ref(), + b"PUT ".as_ref(), + b"PRI ".as_ref(), + ] { assert!( is_http_probe(token), "exact 4-byte token must be classified as HTTP probe: {:?}", @@ -76,4 +82,4 @@ fn light_fuzz_four_byte_ascii_noise_not_misclassified() { token ); } -} \ No newline at end of file +} diff --git a/src/proxy/tests/masking_interface_cache_concurrency_security_tests.rs b/src/proxy/tests/masking_interface_cache_concurrency_security_tests.rs index 8d99b8f..ed6d1ab 100644 --- a/src/proxy/tests/masking_interface_cache_concurrency_security_tests.rs +++ b/src/proxy/tests/masking_interface_cache_concurrency_security_tests.rs @@ -38,4 +38,4 @@ async fn adversarial_parallel_cold_miss_performs_single_interface_refresh() { 1, "parallel cold misses must coalesce into a single interface enumeration" ); -} \ No newline at end of file +} diff --git a/src/proxy/tests/masking_interface_cache_security_tests.rs b/src/proxy/tests/masking_interface_cache_security_tests.rs index 6be99d0..17debb0 100644 --- a/src/proxy/tests/masking_interface_cache_security_tests.rs +++ b/src/proxy/tests/masking_interface_cache_security_tests.rs @@ -37,7 +37,10 @@ async fn tdd_non_local_port_short_circuit_does_not_enumerate_interfaces() { let local_addr: SocketAddr = "0.0.0.0:443".parse().expect("valid local addr"); let is_local = is_mask_target_local_listener_async("127.0.0.1", 8443, local_addr, None).await; - assert!(!is_local, "different port must not be treated as local listener"); + assert!( + !is_local, + "different port must not be treated as local listener" + ); assert_eq!( local_interface_enumerations_for_tests(), 0, diff --git a/src/proxy/tests/masking_production_cap_regression_security_tests.rs b/src/proxy/tests/masking_production_cap_regression_security_tests.rs index f2368a1..9ff51ba 100644 --- a/src/proxy/tests/masking_production_cap_regression_security_tests.rs +++ b/src/proxy/tests/masking_production_cap_regression_security_tests.rs @@ -63,17 +63,11 @@ impl AsyncWrite for CountingWriter { Poll::Ready(Ok(buf.len())) } - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_shutdown( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/src/proxy/tests/masking_self_target_loop_security_tests.rs b/src/proxy/tests/masking_self_target_loop_security_tests.rs index 18cb0d7..7f6cb29 100644 --- a/src/proxy/tests/masking_self_target_loop_security_tests.rs +++ b/src/proxy/tests/masking_self_target_loop_security_tests.rs @@ -1,6 +1,6 @@ use super::*; -use std::net::TcpListener as StdTcpListener; use std::net::SocketAddr; +use std::net::TcpListener as StdTcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; use tokio::net::TcpListener; use tokio::time::{Duration, Instant, timeout}; @@ -15,74 +15,38 @@ fn closed_local_port() -> u16 { #[tokio::test] async fn self_target_detection_matches_literal_ipv4_listener() { let local: SocketAddr = "198.51.100.40:443".parse().unwrap(); - assert!(is_mask_target_local_listener_async( - "198.51.100.40", - 443, - local, - None, - ) - .await); + assert!(is_mask_target_local_listener_async("198.51.100.40", 443, local, None,).await); } #[tokio::test] async fn self_target_detection_matches_bracketed_ipv6_listener() { let local: SocketAddr = "[2001:db8::44]:8443".parse().unwrap(); - assert!(is_mask_target_local_listener_async( - "[2001:db8::44]", - 8443, - local, - None, - ) - .await); + assert!(is_mask_target_local_listener_async("[2001:db8::44]", 8443, local, None,).await); } #[tokio::test] async fn self_target_detection_keeps_same_ip_different_port_forwardable() { let local: SocketAddr = "203.0.113.44:443".parse().unwrap(); - assert!(!is_mask_target_local_listener_async( - "203.0.113.44", - 8443, - local, - None, - ) - .await); + assert!(!is_mask_target_local_listener_async("203.0.113.44", 8443, local, None,).await); } #[tokio::test] async fn self_target_detection_normalizes_ipv4_mapped_ipv6_literal() { let local: SocketAddr = "127.0.0.1:443".parse().unwrap(); - assert!(is_mask_target_local_listener_async( - "::ffff:127.0.0.1", - 443, - local, - None, - ) - .await); + assert!(is_mask_target_local_listener_async("::ffff:127.0.0.1", 443, local, None,).await); } #[tokio::test] async fn self_target_detection_unspecified_bind_blocks_loopback_target() { let local: SocketAddr = "0.0.0.0:443".parse().unwrap(); - assert!(is_mask_target_local_listener_async( - "127.0.0.1", - 443, - local, - None, - ) - .await); + assert!(is_mask_target_local_listener_async("127.0.0.1", 443, local, None,).await); } #[tokio::test] async fn self_target_detection_unspecified_bind_keeps_remote_target_forwardable() { let local: SocketAddr = "0.0.0.0:443".parse().unwrap(); let remote: SocketAddr = "198.51.100.44:443".parse().unwrap(); - assert!(!is_mask_target_local_listener_async( - "mask.example", - 443, - local, - Some(remote), - ) - .await); + assert!(!is_mask_target_local_listener_async("mask.example", 443, local, Some(remote),).await); } #[tokio::test] @@ -306,7 +270,10 @@ async fn offline_mask_target_refusal_respects_timing_normalization_budget() { }); client.shutdown().await.unwrap(); - timeout(Duration::from_secs(2), task).await.unwrap().unwrap(); + timeout(Duration::from_secs(2), task) + .await + .unwrap() + .unwrap(); let elapsed = started.elapsed(); assert!( @@ -350,7 +317,10 @@ async fn offline_mask_target_refusal_with_idle_client_is_bounded_by_consume_time .await .expect("connection should still be open before consume timeout expires"); - timeout(Duration::from_secs(2), task).await.unwrap().unwrap(); + timeout(Duration::from_secs(2), task) + .await + .unwrap() + .unwrap(); let elapsed = started.elapsed(); assert!( diff --git a/src/proxy/tests/masking_timing_budget_coupling_security_tests.rs b/src/proxy/tests/masking_timing_budget_coupling_security_tests.rs index 1c342ea..fda6de7 100644 --- a/src/proxy/tests/masking_timing_budget_coupling_security_tests.rs +++ b/src/proxy/tests/masking_timing_budget_coupling_security_tests.rs @@ -40,7 +40,10 @@ async fn adversarial_delayed_interface_lookup_does_not_consume_outcome_floor_bud tokio::time::sleep(Duration::from_millis(80)).await; drop(held_refresh_guard); - client.shutdown().await.expect("client shutdown must succeed"); + client + .shutdown() + .await + .expect("client shutdown must succeed"); timeout(Duration::from_secs(2), task) .await @@ -52,4 +55,4 @@ async fn adversarial_delayed_interface_lookup_does_not_consume_outcome_floor_bud elapsed >= Duration::from_millis(180) && elapsed < Duration::from_millis(350), "timing normalization floor must start after pre-outcome self-target checks" ); -} \ No newline at end of file +} diff --git a/src/proxy/tests/middle_relay_idle_policy_security_tests.rs b/src/proxy/tests/middle_relay_idle_policy_security_tests.rs index 6ea182b..fd3243d 100644 --- a/src/proxy/tests/middle_relay_idle_policy_security_tests.rs +++ b/src/proxy/tests/middle_relay_idle_policy_security_tests.rs @@ -2,8 +2,8 @@ use super::*; use crate::crypto::AesCtr; use crate::stats::Stats; use crate::stream::{BufferPool, CryptoReader}; -use std::sync::atomic::AtomicU64; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use tokio::io::AsyncWriteExt; use tokio::io::duplex; use tokio::time::{Duration as TokioDuration, Instant as TokioInstant, timeout}; diff --git a/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs b/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs index 112d926..b43825c 100644 --- a/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs +++ b/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs @@ -29,7 +29,10 @@ fn blackhat_registry_poison_recovers_with_fail_closed_reset_and_pressure_account let before = relay_pressure_event_seq(); note_relay_pressure_event(); let after = relay_pressure_event_seq(); - assert!(after > before, "pressure accounting must still advance after poison"); + assert!( + after > before, + "pressure accounting must still advance after poison" + ); clear_relay_idle_pressure_state_for_testing(); } diff --git a/src/proxy/tests/middle_relay_tiny_frame_debt_concurrency_security_tests.rs b/src/proxy/tests/middle_relay_tiny_frame_debt_concurrency_security_tests.rs index 34fc454..6b1d511 100644 --- a/src/proxy/tests/middle_relay_tiny_frame_debt_concurrency_security_tests.rs +++ b/src/proxy/tests/middle_relay_tiny_frame_debt_concurrency_security_tests.rs @@ -217,7 +217,9 @@ async fn adversarial_lockstep_alternating_attack_under_jitter_closes() { } } - writer_task.await.expect("writer jitter task must not panic"); + writer_task + .await + .expect("writer jitter task must not panic"); assert!(closed, "alternating attack must close before EOF"); }); } @@ -247,7 +249,10 @@ async fn integration_mixed_population_attackers_close_benign_survive() { plaintext.push(0x01); plaintext.extend_from_slice(&[n, n, n, n]); } - writer.write_all(&encrypt_for_reader(&plaintext)).await.unwrap(); + writer + .write_all(&encrypt_for_reader(&plaintext)) + .await + .unwrap(); drop(writer); let mut closed = false; @@ -279,7 +284,10 @@ async fn integration_mixed_population_attackers_close_benign_survive() { } plaintext.push(0x01); plaintext.extend_from_slice(&payload); - writer.write_all(&encrypt_for_reader(&plaintext)).await.unwrap(); + writer + .write_all(&encrypt_for_reader(&plaintext)) + .await + .unwrap(); let got = read_once( &mut crypto_reader, @@ -329,7 +337,10 @@ async fn light_fuzz_parallel_patterns_no_hang_or_panic() { } } - writer.write_all(&encrypt_for_reader(&plaintext)).await.unwrap(); + writer + .write_all(&encrypt_for_reader(&plaintext)) + .await + .unwrap(); drop(writer); for _ in 0..320 { diff --git a/src/proxy/tests/middle_relay_tiny_frame_debt_proto_chunking_security_tests.rs b/src/proxy/tests/middle_relay_tiny_frame_debt_proto_chunking_security_tests.rs index 853b381..cbbc971 100644 --- a/src/proxy/tests/middle_relay_tiny_frame_debt_proto_chunking_security_tests.rs +++ b/src/proxy/tests/middle_relay_tiny_frame_debt_proto_chunking_security_tests.rs @@ -51,7 +51,9 @@ fn make_enabled_idle_policy() -> RelayClientIdlePolicy { fn append_tiny_frame(plaintext: &mut Vec, proto: ProtoTag) { match proto { ProtoTag::Abridged => plaintext.push(0x00), - ProtoTag::Intermediate | ProtoTag::Secure => plaintext.extend_from_slice(&0u32.to_le_bytes()), + ProtoTag::Intermediate | ProtoTag::Secure => { + plaintext.extend_from_slice(&0u32.to_le_bytes()) + } } } @@ -206,7 +208,11 @@ async fn intermediate_chunked_alternating_attack_closes_before_eof() { let mut plaintext = Vec::with_capacity(8 * 200); for n in 0..180u8 { append_tiny_frame(&mut plaintext, ProtoTag::Intermediate); - append_real_frame(&mut plaintext, ProtoTag::Intermediate, [n, n ^ 1, n ^ 2, n ^ 3]); + append_real_frame( + &mut plaintext, + ProtoTag::Intermediate, + [n, n ^ 1, n ^ 2, n ^ 3], + ); } let encrypted = encrypt_for_reader(&plaintext); @@ -240,7 +246,9 @@ async fn intermediate_chunked_alternating_attack_closes_before_eof() { } } - writer_task.await.expect("intermediate writer task must not panic"); + writer_task + .await + .expect("intermediate writer task must not panic"); assert!(closed, "intermediate alternating attack must fail closed"); } @@ -290,7 +298,9 @@ async fn secure_chunked_alternating_attack_closes_before_eof() { } } - writer_task.await.expect("secure writer task must not panic"); + writer_task + .await + .expect("secure writer task must not panic"); assert!(closed, "secure alternating attack must fail closed"); } diff --git a/src/proxy/tests/middle_relay_tiny_frame_debt_security_tests.rs b/src/proxy/tests/middle_relay_tiny_frame_debt_security_tests.rs index dee5dd9..fad87d0 100644 --- a/src/proxy/tests/middle_relay_tiny_frame_debt_security_tests.rs +++ b/src/proxy/tests/middle_relay_tiny_frame_debt_security_tests.rs @@ -2,8 +2,8 @@ use super::*; use crate::crypto::AesCtr; use crate::stats::Stats; use crate::stream::{BufferPool, CryptoReader}; -use std::sync::atomic::AtomicU64; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use std::time::Instant; use tokio::io::{AsyncRead, AsyncWriteExt, duplex}; @@ -156,7 +156,10 @@ fn alternating_one_to_one_closes_with_bounded_real_frame_count() { } let (closed_at, _, reals) = simulate_tiny_debt_pattern(&pattern, pattern.len()); assert!(closed_at.is_some()); - assert!(reals <= 80, "expected bounded real frames before close, got {reals}"); + assert!( + reals <= 80, + "expected bounded real frames before close, got {reals}" + ); } #[test] @@ -183,7 +186,10 @@ fn alternating_one_to_seven_eventually_closes() { } } let (closed_at, _, _) = simulate_tiny_debt_pattern(&pattern, pattern.len()); - assert!(closed_at.is_some(), "1:7 tiny-to-real must eventually close"); + assert!( + closed_at.is_some(), + "1:7 tiny-to-real must eventually close" + ); } #[test] diff --git a/src/proxy/tests/middle_relay_zero_length_frame_security_tests.rs b/src/proxy/tests/middle_relay_zero_length_frame_security_tests.rs index 765c253..dbf6c4c 100644 --- a/src/proxy/tests/middle_relay_zero_length_frame_security_tests.rs +++ b/src/proxy/tests/middle_relay_zero_length_frame_security_tests.rs @@ -2,10 +2,10 @@ use super::*; use crate::crypto::AesCtr; use crate::stats::Stats; use crate::stream::{BufferPool, CryptoReader}; -use std::sync::atomic::AtomicU64; use std::sync::Arc; -use tokio::io::{AsyncRead, AsyncWriteExt, duplex}; +use std::sync::atomic::AtomicU64; use std::time::Instant; +use tokio::io::{AsyncRead, AsyncWriteExt, duplex}; fn make_crypto_reader(reader: T) -> CryptoReader where diff --git a/src/proxy/tests/relay_quota_extended_attack_surface_security_tests.rs b/src/proxy/tests/relay_quota_extended_attack_surface_security_tests.rs index e80690b..8ce1c26 100644 --- a/src/proxy/tests/relay_quota_extended_attack_surface_security_tests.rs +++ b/src/proxy/tests/relay_quota_extended_attack_surface_security_tests.rs @@ -5,10 +5,13 @@ use crate::stream::BufferPool; use rand::rngs::StdRng; use rand::{RngExt, SeedableRng}; use std::sync::Arc; -use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; use tokio::time::{Duration, timeout}; -async fn read_available(reader: &mut R, budget: Duration) -> usize { +async fn read_available( + reader: &mut R, + budget: Duration, +) -> usize { let start = tokio::time::Instant::now(); let mut total = 0usize; let mut buf = [0u8; 128]; @@ -57,16 +60,25 @@ async fn positive_quota_path_forwards_both_directions_within_limit() { Arc::new(BufferPool::new()), )); - client_peer.write_all(&[0xAA, 0xBB, 0xCC, 0xDD]).await.unwrap(); + client_peer + .write_all(&[0xAA, 0xBB, 0xCC, 0xDD]) + .await + .unwrap(); server_peer.read_exact(&mut [0u8; 4]).await.unwrap(); - server_peer.write_all(&[0x11, 0x22, 0x33, 0x44]).await.unwrap(); + server_peer + .write_all(&[0x11, 0x22, 0x33, 0x44]) + .await + .unwrap(); client_peer.read_exact(&mut [0u8; 4]).await.unwrap(); drop(client_peer); drop(server_peer); - let relay_result = timeout(Duration::from_secs(2), relay).await.unwrap().unwrap(); + let relay_result = timeout(Duration::from_secs(2), relay) + .await + .unwrap() + .unwrap(); assert!(relay_result.is_ok()); assert!(stats.get_user_quota_used(user) <= 16); } @@ -98,11 +110,23 @@ async fn negative_preloaded_quota_forbids_any_forwarding() { client_peer.write_all(&[0xAA]).await.unwrap(); server_peer.write_all(&[0xBB]).await.unwrap(); - assert_eq!(read_available(&mut server_peer, Duration::from_millis(120)).await, 0); - assert_eq!(read_available(&mut client_peer, Duration::from_millis(120)).await, 0); + assert_eq!( + read_available(&mut server_peer, Duration::from_millis(120)).await, + 0 + ); + assert_eq!( + read_available(&mut client_peer, Duration::from_millis(120)).await, + 0 + ); - let relay_result = timeout(Duration::from_secs(2), relay).await.unwrap().unwrap(); - assert!(matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. }))); + let relay_result = timeout(Duration::from_secs(2), relay) + .await + .unwrap() + .unwrap(); + assert!(matches!( + relay_result, + Err(ProxyError::DataQuotaExceeded { .. }) + )); assert!(stats.get_user_quota_used(user) <= 8); } @@ -135,13 +159,25 @@ async fn edge_quota_one_ensures_at_most_one_byte_across_directions() { ); let mut buf = [0u8; 1]; - let delivered_s2c = timeout(Duration::from_millis(120), client_peer.read(&mut buf)).await.unwrap().unwrap_or(0); - let delivered_c2s = timeout(Duration::from_millis(120), server_peer.read(&mut buf)).await.unwrap().unwrap_or(0); + let delivered_s2c = timeout(Duration::from_millis(120), client_peer.read(&mut buf)) + .await + .unwrap() + .unwrap_or(0); + let delivered_c2s = timeout(Duration::from_millis(120), server_peer.read(&mut buf)) + .await + .unwrap() + .unwrap_or(0); assert!(delivered_s2c + delivered_c2s <= 1); - let relay_result = timeout(Duration::from_secs(2), relay).await.unwrap().unwrap(); - assert!(matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. }))); + let relay_result = timeout(Duration::from_secs(2), relay) + .await + .unwrap() + .unwrap(); + assert!(matches!( + relay_result, + Err(ProxyError::DataQuotaExceeded { .. }) + )); } #[tokio::test] @@ -191,8 +227,14 @@ async fn adversarial_blackhat_alternating_jitter_does_not_overshoot_quota() { tokio::time::sleep(Duration::from_millis(((i % 3) + 1) as u64)).await; } - let relay_result = timeout(Duration::from_secs(3), relay).await.unwrap().unwrap(); - assert!(matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. }))); + let relay_result = timeout(Duration::from_secs(3), relay) + .await + .unwrap() + .unwrap(); + assert!(matches!( + relay_result, + Err(ProxyError::DataQuotaExceeded { .. }) + )); assert!(total_forwarded <= quota as usize); assert!(stats.get_user_quota_used(user) <= quota); } @@ -239,13 +281,17 @@ async fn light_fuzz_random_quota_schedule_preserves_quota_invariants() { if rng.random::() { let _ = client_peer.write_all(&[rng.random::()]).await; let mut one = [0u8; 1]; - if let Ok(Ok(n)) = timeout(Duration::from_millis(4), server_peer.read(&mut one)).await { + if let Ok(Ok(n)) = + timeout(Duration::from_millis(4), server_peer.read(&mut one)).await + { total_forwarded += n; } } else { let _ = server_peer.write_all(&[rng.random::()]).await; let mut one = [0u8; 1]; - if let Ok(Ok(n)) = timeout(Duration::from_millis(4), client_peer.read(&mut one)).await { + if let Ok(Ok(n)) = + timeout(Duration::from_millis(4), client_peer.read(&mut one)).await + { total_forwarded += n; } } @@ -254,8 +300,14 @@ async fn light_fuzz_random_quota_schedule_preserves_quota_invariants() { drop(client_peer); drop(server_peer); - let relay_result = timeout(Duration::from_secs(2), relay).await.unwrap().unwrap(); - assert!(relay_result.is_ok() || matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. }))); + let relay_result = timeout(Duration::from_secs(2), relay) + .await + .unwrap() + .unwrap(); + assert!( + relay_result.is_ok() + || matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. })) + ); assert!(total_forwarded <= quota as usize); assert!(stats.get_user_quota_used(&user) <= quota); } @@ -305,13 +357,17 @@ async fn stress_parallel_relays_for_one_user_obey_global_quota() { if (step as usize + worker as usize) % 2 == 0 { let _ = client_peer.write_all(&[(step ^ 0x5A)]).await; let mut one = [0u8; 1]; - if let Ok(Ok(n)) = timeout(Duration::from_millis(6), server_peer.read(&mut one)).await { + if let Ok(Ok(n)) = + timeout(Duration::from_millis(6), server_peer.read(&mut one)).await + { total += n; } } else { let _ = server_peer.write_all(&[(step ^ 0xA5)]).await; let mut one = [0u8; 1]; - if let Ok(Ok(n)) = timeout(Duration::from_millis(6), client_peer.read(&mut one)).await { + if let Ok(Ok(n)) = + timeout(Duration::from_millis(6), client_peer.read(&mut one)).await + { total += n; } } @@ -321,8 +377,14 @@ async fn stress_parallel_relays_for_one_user_obey_global_quota() { drop(client_peer); drop(server_peer); - let relay_result = timeout(Duration::from_secs(2), relay).await.unwrap().unwrap(); - assert!(relay_result.is_ok() || matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. }))); + let relay_result = timeout(Duration::from_secs(2), relay) + .await + .unwrap() + .unwrap(); + assert!( + relay_result.is_ok() + || matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. })) + ); total })); } diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 297ff28..ff15d4f 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -381,7 +381,9 @@ impl Stats { return; } Self::touch_user_stats(user_stats); - user_stats.octets_from_client.fetch_add(bytes, Ordering::Relaxed); + user_stats + .octets_from_client + .fetch_add(bytes, Ordering::Relaxed); } #[inline] @@ -390,7 +392,9 @@ impl Stats { return; } Self::touch_user_stats(user_stats); - user_stats.octets_to_client.fetch_add(bytes, Ordering::Relaxed); + user_stats + .octets_to_client + .fetch_add(bytes, Ordering::Relaxed); } #[inline] @@ -812,7 +816,8 @@ impl Stats { } pub fn increment_me_d2c_data_frames_total(&self) { if self.telemetry_me_allows_normal() { - self.me_d2c_data_frames_total.fetch_add(1, Ordering::Relaxed); + self.me_d2c_data_frames_total + .fetch_add(1, Ordering::Relaxed); } } pub fn increment_me_d2c_ack_frames_total(&self) { @@ -1708,7 +1713,8 @@ impl Stats { self.me_d2c_batch_bytes_bucket_1k_4k.load(Ordering::Relaxed) } pub fn get_me_d2c_batch_bytes_bucket_4k_16k(&self) -> u64 { - self.me_d2c_batch_bytes_bucket_4k_16k.load(Ordering::Relaxed) + self.me_d2c_batch_bytes_bucket_4k_16k + .load(Ordering::Relaxed) } pub fn get_me_d2c_batch_bytes_bucket_16k_64k(&self) -> u64 { self.me_d2c_batch_bytes_bucket_16k_64k @@ -2371,8 +2377,8 @@ impl ReplayStats { mod tests { use super::*; use crate::config::MeTelemetryLevel; - use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; + use std::sync::atomic::{AtomicU64, Ordering}; #[test] fn test_stats_shared_counters() { diff --git a/src/stream/frame_stream_padding_security_tests.rs b/src/stream/frame_stream_padding_security_tests.rs index 83b30f9..1ec787e 100644 --- a/src/stream/frame_stream_padding_security_tests.rs +++ b/src/stream/frame_stream_padding_security_tests.rs @@ -14,7 +14,10 @@ fn padding_rounding_equivalent_for_extensive_safe_domain() { let old = old_padding_round_up_to_4(len).expect("old expression must be safe"); let new = new_padding_round_up_to_4(len).expect("new expression must be safe"); assert_eq!(old, new, "mismatch for len={len}"); - assert!(new >= len, "rounded length must not shrink: len={len}, out={new}"); + assert!( + new >= len, + "rounded length must not shrink: len={len}, out={new}" + ); assert_eq!(new % 4, 0, "rounded length must stay 4-byte aligned"); } } diff --git a/src/tests/ip_tracker_encapsulation_adversarial_tests.rs b/src/tests/ip_tracker_encapsulation_adversarial_tests.rs index cf42e75..3fc9727 100644 --- a/src/tests/ip_tracker_encapsulation_adversarial_tests.rs +++ b/src/tests/ip_tracker_encapsulation_adversarial_tests.rs @@ -44,7 +44,10 @@ async fn encapsulation_repeated_queue_poison_recovery_preserves_forward_progress let ip_primary = ip_from_idx(10_001); let ip_alt = ip_from_idx(10_002); - tracker.check_and_add("encap-poison", ip_primary).await.unwrap(); + tracker + .check_and_add("encap-poison", ip_primary) + .await + .unwrap(); for _ in 0..128 { let queue = tracker.cleanup_queue_mutex_for_tests(); diff --git a/src/tls_front/fetcher.rs b/src/tls_front/fetcher.rs index 4408b5a..2f39707 100644 --- a/src/tls_front/fetcher.rs +++ b/src/tls_front/fetcher.rs @@ -812,8 +812,8 @@ mod tests { #[test] fn test_encode_tls13_certificate_message_single_cert() { let cert = vec![0x30, 0x03, 0x02, 0x01, 0x01]; - let message = encode_tls13_certificate_message(std::slice::from_ref(&cert)) - .expect("message"); + let message = + encode_tls13_certificate_message(std::slice::from_ref(&cert)).expect("message"); assert_eq!(message[0], 0x0b); assert_eq!(read_u24(&message[1..4]), message.len() - 4); diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index 918ccd4..1ef59e1 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -293,9 +293,7 @@ impl MePool { WriterContour::Draining => "draining", }; - if !draining - && let Some(dc_idx) = dc - { + if !draining && let Some(dc_idx) = dc { *live_writers_by_dc_endpoint .entry((dc_idx, endpoint)) .or_insert(0) += 1; diff --git a/src/transport/pool.rs b/src/transport/pool.rs index 60f8a01..bb0baac 100644 --- a/src/transport/pool.rs +++ b/src/transport/pool.rs @@ -201,7 +201,10 @@ impl ConnectionPool { pub async fn close_all(&self) { let pools_snapshot: Vec<(SocketAddr, Arc>)> = { let pools = self.pools.read(); - pools.iter().map(|(addr, pool)| (*addr, Arc::clone(pool))).collect() + pools + .iter() + .map(|(addr, pool)| (*addr, Arc::clone(pool))) + .collect() }; for (addr, pool) in pools_snapshot {