diff --git a/src/config/load.rs b/src/config/load.rs index 268db13..0f0990c 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -947,7 +947,11 @@ impl ProxyConfig { } if matches!(config.server.conntrack_control.mode, ConntrackMode::Hybrid) - && config.server.conntrack_control.hybrid_listener_ips.is_empty() + && config + .server + .conntrack_control + .hybrid_listener_ips + .is_empty() { return Err(ProxyError::Config( "server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid" @@ -2503,9 +2507,9 @@ mod tests { let path = dir.join("telemt_conntrack_high_watermark_invalid_test.toml"); std::fs::write(&path, toml).unwrap(); let err = ProxyConfig::load(&path).unwrap_err().to_string(); - assert!( - err.contains("server.conntrack_control.pressure_high_watermark_pct must be within [1, 100]") - ); + assert!(err.contains( + "server.conntrack_control.pressure_high_watermark_pct must be within [1, 100]" + )); let _ = std::fs::remove_file(path); } @@ -2570,9 +2574,9 @@ mod tests { let path = dir.join("telemt_conntrack_hybrid_requires_ips_test.toml"); std::fs::write(&path, toml).unwrap(); let err = ProxyConfig::load(&path).unwrap_err().to_string(); - assert!( - err.contains("server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid") - ); + assert!(err.contains( + "server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid" + )); let _ = std::fs::remove_file(path); } diff --git a/src/conntrack_control.rs b/src/conntrack_control.rs index 5083877..306697e 100644 --- a/src/conntrack_control.rs +++ b/src/conntrack_control.rs @@ -56,7 +56,11 @@ pub(crate) fn spawn_conntrack_controller( shared: Arc, ) { if !cfg!(target_os = "linux") { - let enabled = config_rx.borrow().server.conntrack_control.inline_conntrack_control; + let enabled = config_rx + .borrow() + .server + .conntrack_control + .inline_conntrack_control; stats.set_conntrack_control_enabled(enabled); stats.set_conntrack_control_available(false); stats.set_conntrack_pressure_active(false); @@ -65,7 +69,9 @@ pub(crate) fn spawn_conntrack_controller( shared.disable_conntrack_close_sender(); shared.set_conntrack_pressure_active(false); if enabled { - warn!("conntrack control is configured but unsupported on this OS; disabling runtime worker"); + warn!( + "conntrack control is configured but unsupported on this OS; disabling runtime worker" + ); } return; } @@ -88,7 +94,13 @@ async fn run_conntrack_controller( let mut delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec; let mut backend = pick_backend(cfg.server.conntrack_control.backend); - apply_runtime_state(stats.as_ref(), shared.as_ref(), &cfg, backend.is_some(), false); + apply_runtime_state( + stats.as_ref(), + shared.as_ref(), + &cfg, + backend.is_some(), + false, + ); reconcile_rules(&cfg, backend, stats.as_ref()).await; loop { @@ -315,7 +327,9 @@ fn pick_backend(configured: ConntrackBackend) -> Option { } } ConntrackBackend::Nftables => command_exists("nft").then_some(NetfilterBackend::Nftables), - ConntrackBackend::Iptables => command_exists("iptables").then_some(NetfilterBackend::Iptables), + ConntrackBackend::Iptables => { + command_exists("iptables").then_some(NetfilterBackend::Iptables) + } } } @@ -396,7 +410,12 @@ fn notrack_targets(cfg: &ProxyConfig) -> (Vec>, Vec Result<(), String> { - let _ = run_command("nft", &["delete", "table", "inet", "telemt_conntrack"], None).await; + let _ = run_command( + "nft", + &["delete", "table", "inet", "telemt_conntrack"], + None, + ) + .await; if matches!(cfg.server.conntrack_control.mode, ConntrackMode::Tracked) { return Ok(()); } @@ -446,7 +465,12 @@ async fn apply_iptables_rules_for_binary( return Ok(()); } let chain = "TELEMT_NOTRACK"; - let _ = run_command(binary, &["-t", "raw", "-D", "PREROUTING", "-j", chain], None).await; + let _ = run_command( + binary, + &["-t", "raw", "-D", "PREROUTING", "-j", chain], + None, + ) + .await; let _ = run_command(binary, &["-t", "raw", "-F", chain], None).await; let _ = run_command(binary, &["-t", "raw", "-X", chain], None).await; @@ -456,8 +480,20 @@ async fn apply_iptables_rules_for_binary( run_command(binary, &["-t", "raw", "-N", chain], None).await?; run_command(binary, &["-t", "raw", "-F", chain], None).await?; - if run_command(binary, &["-t", "raw", "-C", "PREROUTING", "-j", chain], None).await.is_err() { - run_command(binary, &["-t", "raw", "-I", "PREROUTING", "1", "-j", chain], None).await?; + if run_command( + binary, + &["-t", "raw", "-C", "PREROUTING", "-j", chain], + None, + ) + .await + .is_err() + { + run_command( + binary, + &["-t", "raw", "-I", "PREROUTING", "1", "-j", chain], + None, + ) + .await?; } let (v4_targets, v6_targets) = notrack_targets(cfg); @@ -487,11 +523,26 @@ async fn apply_iptables_rules_for_binary( } async fn clear_notrack_rules_all_backends() { - let _ = run_command("nft", &["delete", "table", "inet", "telemt_conntrack"], None).await; - let _ = run_command("iptables", &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], None).await; + let _ = run_command( + "nft", + &["delete", "table", "inet", "telemt_conntrack"], + None, + ) + .await; + let _ = run_command( + "iptables", + &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], + None, + ) + .await; let _ = run_command("iptables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await; let _ = run_command("iptables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await; - let _ = run_command("ip6tables", &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], None).await; + let _ = run_command( + "ip6tables", + &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], + None, + ) + .await; let _ = run_command("ip6tables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await; let _ = run_command("ip6tables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await; } diff --git a/src/logging.rs b/src/logging.rs index ca26196..af9e2f7 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -319,12 +319,18 @@ mod tests { #[cfg(unix)] #[test] fn test_syslog_priority_for_level_mapping() { - assert_eq!(syslog_priority_for_level(&tracing::Level::ERROR), libc::LOG_ERR); + assert_eq!( + syslog_priority_for_level(&tracing::Level::ERROR), + libc::LOG_ERR + ); assert_eq!( syslog_priority_for_level(&tracing::Level::WARN), libc::LOG_WARNING ); - assert_eq!(syslog_priority_for_level(&tracing::Level::INFO), libc::LOG_INFO); + assert_eq!( + syslog_priority_for_level(&tracing::Level::INFO), + libc::LOG_INFO + ); assert_eq!( syslog_priority_for_level(&tracing::Level::DEBUG), libc::LOG_DEBUG diff --git a/src/maestro/helpers.rs b/src/maestro/helpers.rs index d545b21..49c5347 100644 --- a/src/maestro/helpers.rs +++ b/src/maestro/helpers.rs @@ -272,7 +272,8 @@ mod tests { .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_nanos(); - let startup_cwd = std::env::temp_dir().join(format!("telemt_cfg_startup_candidates_{nonce}")); + let startup_cwd = + std::env::temp_dir().join(format!("telemt_cfg_startup_candidates_{nonce}")); std::fs::create_dir_all(&startup_cwd).unwrap(); let telemt = startup_cwd.join("telemt.toml"); std::fs::write(&telemt, " ").unwrap(); diff --git a/src/maestro/mod.rs b/src/maestro/mod.rs index e610ae8..9211408 100644 --- a/src/maestro/mod.rs +++ b/src/maestro/mod.rs @@ -28,8 +28,8 @@ use tracing::{error, info, warn}; use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload}; use crate::api; -use crate::conntrack_control; use crate::config::{LogLevel, ProxyConfig}; +use crate::conntrack_control; use crate::crypto::SecureRandom; use crate::ip_tracker::UserIpTracker; use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe}; @@ -136,18 +136,17 @@ async fn run_inner( } else { let default = ProxyConfig::default(); - let serialized = match toml::to_string_pretty(&default) - .or_else(|_| toml::to_string(&default)) - { - Ok(value) => Some(value), - Err(serialize_error) => { - eprintln!( - "[telemt] Warning: failed to serialize default config: {}", - serialize_error - ); - None - } - }; + let serialized = + match toml::to_string_pretty(&default).or_else(|_| toml::to_string(&default)) { + Ok(value) => Some(value), + Err(serialize_error) => { + eprintln!( + "[telemt] Warning: failed to serialize default config: {}", + serialize_error + ); + None + } + }; if config_path_explicit { if let Some(serialized) = serialized.as_ref() { diff --git a/src/main.rs b/src/main.rs index 05dc058..5c134b8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,8 +2,8 @@ mod api; mod cli; -mod conntrack_control; mod config; +mod conntrack_control; mod crypto; #[cfg(unix)] mod daemon; diff --git a/src/proxy/adaptive_buffers.rs b/src/proxy/adaptive_buffers.rs index 4fcb38c..a04f4e8 100644 --- a/src/proxy/adaptive_buffers.rs +++ b/src/proxy/adaptive_buffers.rs @@ -246,7 +246,9 @@ pub fn seed_tier_for_user(user: &str) -> AdaptiveTier { if now.saturating_duration_since(value.seen_at) <= PROFILE_TTL { return value.tier; } - profiles().remove_if(user, |_, v| now.saturating_duration_since(v.seen_at) > PROFILE_TTL); + profiles().remove_if(user, |_, v| { + now.saturating_duration_since(v.seen_at) > PROFILE_TTL + }); } AdaptiveTier::Base } diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 1fb49b0..fb73db2 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -518,15 +518,15 @@ where ); return Err(ProxyError::Io(e)); } - Err(_) => { - debug!( - peer = %real_peer, - idle_secs = first_byte_idle_secs, - "Closing idle pooled connection before first client byte" - ); - return Ok(()); - } + Err(_) => { + debug!( + peer = %real_peer, + idle_secs = first_byte_idle_secs, + "Closing idle pooled connection before first client byte" + ); + return Ok(()); } + } }; let handshake_timeout = handshake_timeout_with_mask_grace(&config); diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index 7674f6b..2c4fe45 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -17,13 +17,13 @@ use crate::crypto::SecureRandom; use crate::error::{ProxyError, Result}; use crate::protocol::constants::*; use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce}; -use crate::proxy::shared_state::{ - ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState, -}; use crate::proxy::route_mode::{ ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state, cutover_stagger_delay, }; +use crate::proxy::shared_state::{ + ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState, +}; use crate::stats::Stats; use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; use crate::transport::UpstreamManager; diff --git a/src/proxy/handshake.rs b/src/proxy/handshake.rs index 9c41e91..16d0c5e 100644 --- a/src/proxy/handshake.rs +++ b/src/proxy/handshake.rs @@ -118,7 +118,11 @@ fn auth_probe_state_expired(state: &AuthProbeState, now: Instant) -> bool { now.duration_since(state.last_seen) > retention } -fn auth_probe_eviction_offset_in(shared: &ProxySharedState, peer_ip: IpAddr, now: Instant) -> usize { +fn auth_probe_eviction_offset_in( + shared: &ProxySharedState, + peer_ip: IpAddr, + now: Instant, +) -> usize { let hasher_state = &shared.handshake.auth_probe_eviction_hasher; let mut hasher = hasher_state.build_hasher(); peer_ip.hash(&mut hasher); diff --git a/src/proxy/masking.rs b/src/proxy/masking.rs index 9ac376d..70e72a0 100644 --- a/src/proxy/masking.rs +++ b/src/proxy/masking.rs @@ -255,7 +255,11 @@ async fn wait_mask_connect_budget(started: Instant) { // sigma is chosen so ~99% of raw samples land inside [floor, ceiling] before clamp. // When floor > ceiling (misconfiguration), returns ceiling (the smaller value). // When floor == ceiling, returns that value. When both are 0, returns 0. -pub(crate) fn sample_lognormal_percentile_bounded(floor: u64, ceiling: u64, rng: &mut impl Rng) -> u64 { +pub(crate) fn sample_lognormal_percentile_bounded( + floor: u64, + ceiling: u64, + rng: &mut impl Rng, +) -> u64 { if ceiling == 0 && floor == 0 { return 0; } @@ -296,7 +300,9 @@ fn mask_outcome_target_budget(config: &ProxyConfig) -> Duration { } if ceiling > floor { let mut rng = rand::rng(); - return Duration::from_millis(sample_lognormal_percentile_bounded(floor, ceiling, &mut rng)); + return Duration::from_millis(sample_lognormal_percentile_bounded( + floor, ceiling, &mut rng, + )); } // ceiling <= floor: use the larger value (fail-closed: preserve longer delay) return Duration::from_millis(floor.max(ceiling)); diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 7d94bba..665e90e 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -3,12 +3,12 @@ use std::collections::hash_map::DefaultHasher; use std::collections::{BTreeSet, HashMap}; #[cfg(test)] use std::future::Future; -use std::hash::{BuildHasher, Hash}; #[cfg(test)] use std::hash::Hasher; +use std::hash::{BuildHasher, Hash}; use std::net::{IpAddr, SocketAddr}; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; @@ -21,13 +21,13 @@ use crate::crypto::SecureRandom; use crate::error::{ProxyError, Result}; use crate::protocol::constants::{secure_padding_len, *}; use crate::proxy::handshake::HandshakeSuccess; -use crate::proxy::shared_state::{ - ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState, -}; use crate::proxy::route_mode::{ ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state, cutover_stagger_delay, }; +use crate::proxy::shared_state::{ + ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState, +}; use crate::stats::{ MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, QuotaReserveError, Stats, UserStats, }; @@ -257,9 +257,7 @@ impl RelayClientIdlePolicy { if self.soft_idle > self.hard_idle { self.soft_idle = self.hard_idle; } - self.legacy_frame_read_timeout = self - .legacy_frame_read_timeout - .min(pressure_hard_idle_cap); + self.legacy_frame_read_timeout = self.legacy_frame_read_timeout.min(pressure_hard_idle_cap); if self.grace_after_downstream_activity > self.hard_idle { self.grace_after_downstream_activity = self.hard_idle; } @@ -461,12 +459,15 @@ fn report_desync_frame_too_large_in( .map(|b| matches!(b[0], b'G' | b'P' | b'H' | b'C' | b'D')) .unwrap_or(false); let now = Instant::now(); - let dedup_key = hash_value_in(shared, &( - state.user.as_str(), - state.peer_hash, - proto_tag, - DESYNC_ERROR_CLASS, - )); + let dedup_key = hash_value_in( + shared, + &( + state.user.as_str(), + state.peer_hash, + proto_tag, + DESYNC_ERROR_CLASS, + ), + ); let emit_full = should_emit_full_desync_in(shared, dedup_key, state.desync_all_full, now); let duration_ms = state.started_at.elapsed().as_millis() as u64; let bytes_me2c = state.bytes_me2c.load(Ordering::Relaxed); @@ -631,7 +632,10 @@ fn observe_me_d2c_flush_event( } #[cfg(test)] -pub(crate) fn mark_relay_idle_candidate_for_testing(shared: &ProxySharedState, conn_id: u64) -> bool { +pub(crate) fn mark_relay_idle_candidate_for_testing( + shared: &ProxySharedState, + conn_id: u64, +) -> bool { let registry = &shared.middle_relay.relay_idle_registry; let mut guard = match registry.lock() { Ok(guard) => guard, @@ -716,7 +720,10 @@ pub(crate) fn relay_pressure_event_seq_for_testing(shared: &ProxySharedState) -> #[cfg(test)] pub(crate) fn relay_idle_mark_seq_for_testing(shared: &ProxySharedState) -> u64 { - shared.middle_relay.relay_idle_mark_seq.load(Ordering::Relaxed) + shared + .middle_relay + .relay_idle_mark_seq + .load(Ordering::Relaxed) } #[cfg(test)] @@ -865,10 +872,7 @@ pub(crate) fn desync_dedup_insert_for_testing(shared: &ProxySharedState, key: u6 } #[cfg(test)] -pub(crate) fn desync_dedup_get_for_testing( - shared: &ProxySharedState, - key: u64, -) -> Option { +pub(crate) fn desync_dedup_get_for_testing(shared: &ProxySharedState, key: u64) -> Option { shared .middle_relay .desync_dedup @@ -877,7 +881,9 @@ pub(crate) fn desync_dedup_get_for_testing( } #[cfg(test)] -pub(crate) fn desync_dedup_keys_for_testing(shared: &ProxySharedState) -> std::collections::HashSet { +pub(crate) fn desync_dedup_keys_for_testing( + shared: &ProxySharedState, +) -> std::collections::HashSet { shared .middle_relay .desync_dedup diff --git a/src/proxy/shared_state.rs b/src/proxy/shared_state.rs index 784d666..dd49806 100644 --- a/src/proxy/shared_state.rs +++ b/src/proxy/shared_state.rs @@ -8,7 +8,7 @@ use std::time::Instant; use dashmap::DashMap; use tokio::sync::mpsc; -use crate::proxy::handshake::{AuthProbeState, AuthProbeSaturationState}; +use crate::proxy::handshake::{AuthProbeSaturationState, AuthProbeState}; use crate::proxy::middle_relay::{DesyncDedupRotationState, RelayIdleCandidateRegistry}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -136,7 +136,8 @@ impl ProxySharedState { } pub(crate) fn set_conntrack_pressure_active(&self, active: bool) { - self.conntrack_pressure_active.store(active, Ordering::Relaxed); + self.conntrack_pressure_active + .store(active, Ordering::Relaxed); } pub(crate) fn conntrack_pressure_active(&self) -> bool { diff --git a/src/proxy/tests/adaptive_buffers_record_race_security_tests.rs b/src/proxy/tests/adaptive_buffers_record_race_security_tests.rs index aa7a42e..89bcdf5 100644 --- a/src/proxy/tests/adaptive_buffers_record_race_security_tests.rs +++ b/src/proxy/tests/adaptive_buffers_record_race_security_tests.rs @@ -1,6 +1,6 @@ use super::*; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant}; static RACE_TEST_KEY_COUNTER: AtomicUsize = AtomicUsize::new(1_000_000); diff --git a/src/proxy/tests/adaptive_buffers_security_tests.rs b/src/proxy/tests/adaptive_buffers_security_tests.rs index 612dafa..d065fb7 100644 --- a/src/proxy/tests/adaptive_buffers_security_tests.rs +++ b/src/proxy/tests/adaptive_buffers_security_tests.rs @@ -65,9 +65,15 @@ fn adaptive_base_tier_buffers_unchanged() { fn adaptive_tier1_buffers_within_caps() { let (c2s, s2c) = direct_copy_buffers_for_tier(AdaptiveTier::Tier1, 65536, 262144); assert!(c2s > 65536, "Tier1 c2s should exceed Base"); - assert!(c2s <= 128 * 1024, "Tier1 c2s should not exceed DIRECT_C2S_CAP_BYTES"); + assert!( + c2s <= 128 * 1024, + "Tier1 c2s should not exceed DIRECT_C2S_CAP_BYTES" + ); assert!(s2c > 262144, "Tier1 s2c should exceed Base"); - assert!(s2c <= 512 * 1024, "Tier1 s2c should not exceed DIRECT_S2C_CAP_BYTES"); + assert!( + s2c <= 512 * 1024, + "Tier1 s2c should not exceed DIRECT_S2C_CAP_BYTES" + ); } #[test] diff --git a/src/proxy/tests/handshake_auth_probe_eviction_bias_security_tests.rs b/src/proxy/tests/handshake_auth_probe_eviction_bias_security_tests.rs index f6192f3..b87c3a4 100644 --- a/src/proxy/tests/handshake_auth_probe_eviction_bias_security_tests.rs +++ b/src/proxy/tests/handshake_auth_probe_eviction_bias_security_tests.rs @@ -19,7 +19,8 @@ fn adversarial_large_state_offsets_escape_first_scan_window() { ((i.wrapping_mul(131)) & 0xff) as u8, )); let now = base + Duration::from_nanos(i); - let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); + let start = + auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); if start >= scan_limit { saw_offset_outside_first_window = true; break; @@ -48,7 +49,8 @@ fn stress_large_state_offsets_cover_many_scan_windows() { ((i.wrapping_mul(17)) & 0xff) as u8, )); let now = base + Duration::from_micros(i); - let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); + let start = + auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); covered_windows.insert(start / scan_limit); } @@ -80,7 +82,8 @@ fn light_fuzz_offset_always_stays_inside_state_len() { let state_len = ((seed >> 16) as usize % 200_000).saturating_add(1); let scan_limit = ((seed >> 40) as usize % 2_048).saturating_add(1); let now = base + Duration::from_nanos(seed & 0x0fff); - let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); + let start = + auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); assert!( start < state_len, diff --git a/src/proxy/tests/handshake_auth_probe_hardening_adversarial_tests.rs b/src/proxy/tests/handshake_auth_probe_hardening_adversarial_tests.rs index 5268b2f..d20f881 100644 --- a/src/proxy/tests/handshake_auth_probe_hardening_adversarial_tests.rs +++ b/src/proxy/tests/handshake_auth_probe_hardening_adversarial_tests.rs @@ -87,7 +87,11 @@ fn adversarial_saturation_grace_requires_extra_failures_before_preauth_throttle( } assert!( - auth_probe_should_apply_preauth_throttle_in(shared.as_ref(), ip, now + Duration::from_millis(1)), + auth_probe_should_apply_preauth_throttle_in( + shared.as_ref(), + ip, + now + Duration::from_millis(1) + ), "after grace failures are exhausted, preauth throttle must activate" ); } @@ -134,7 +138,11 @@ fn light_fuzz_randomized_failures_preserve_cap_and_nonzero_streaks() { (seed >> 8) as u8, seed as u8, )); - auth_probe_record_failure_in(shared.as_ref(), ip, now + Duration::from_millis((seed & 0x3f) as u64)); + auth_probe_record_failure_in( + shared.as_ref(), + ip, + now + Duration::from_millis((seed & 0x3f) as u64), + ); } let state = auth_probe_state_for_testing_in_shared(shared.as_ref()); @@ -162,7 +170,11 @@ async fn stress_parallel_failure_flood_keeps_state_hard_capped() { ((i >> 8) & 0xff) as u8, (i & 0xff) as u8, )); - auth_probe_record_failure_in(shared.as_ref(), ip, start + Duration::from_millis((i % 4) as u64)); + auth_probe_record_failure_in( + shared.as_ref(), + ip, + start + Duration::from_millis((i % 4) as u64), + ); } })); } diff --git a/src/proxy/tests/handshake_auth_probe_scan_budget_security_tests.rs b/src/proxy/tests/handshake_auth_probe_scan_budget_security_tests.rs index 0fb3b68..75a8bbd 100644 --- a/src/proxy/tests/handshake_auth_probe_scan_budget_security_tests.rs +++ b/src/proxy/tests/handshake_auth_probe_scan_budget_security_tests.rs @@ -31,7 +31,8 @@ fn adversarial_large_state_must_allow_start_offset_outside_scan_budget_window() (i & 0xff) as u8, )); let now = base + Duration::from_micros(i as u64); - let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); + let start = + auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); assert!( start < state_len, "start offset must stay within state length; start={start}, len={state_len}" @@ -83,7 +84,8 @@ fn light_fuzz_scan_offset_budget_never_exceeds_effective_window() { let state_len = ((seed >> 8) as usize % 131_072).saturating_add(1); let scan_limit = ((seed >> 32) as usize % 512).saturating_add(1); let now = base + Duration::from_nanos(seed & 0xffff); - let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); + let start = + auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); assert!( start < state_len, diff --git a/src/proxy/tests/handshake_auth_probe_scan_offset_stress_tests.rs b/src/proxy/tests/handshake_auth_probe_scan_offset_stress_tests.rs index fd08c1b..e604641 100644 --- a/src/proxy/tests/handshake_auth_probe_scan_offset_stress_tests.rs +++ b/src/proxy/tests/handshake_auth_probe_scan_offset_stress_tests.rs @@ -36,7 +36,13 @@ fn adversarial_many_ips_same_time_spreads_offsets_without_bias_collapse() { i as u8, (255 - (i as u8)), )); - uniq.insert(auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, 65_536, 16)); + uniq.insert(auth_probe_scan_start_offset_in( + shared.as_ref(), + ip, + now, + 65_536, + 16, + )); } assert!( @@ -63,7 +69,11 @@ async fn stress_parallel_failure_churn_under_saturation_remains_capped_and_live( ((i >> 8) & 0xff) as u8, (i & 0xff) as u8, )); - auth_probe_record_failure_in(shared.as_ref(), ip, start + Duration::from_micros((i % 128) as u64)); + auth_probe_record_failure_in( + shared.as_ref(), + ip, + start + Duration::from_micros((i % 128) as u64), + ); } })); } @@ -73,12 +83,17 @@ async fn stress_parallel_failure_churn_under_saturation_remains_capped_and_live( } assert!( - auth_probe_state_for_testing_in_shared(shared.as_ref()).len() <= AUTH_PROBE_TRACK_MAX_ENTRIES, + auth_probe_state_for_testing_in_shared(shared.as_ref()).len() + <= AUTH_PROBE_TRACK_MAX_ENTRIES, "state must remain hard-capped under parallel saturation churn" ); let probe = IpAddr::V4(Ipv4Addr::new(10, 4, 1, 1)); - let _ = auth_probe_should_apply_preauth_throttle_in(shared.as_ref(), probe, start + Duration::from_millis(1)); + let _ = auth_probe_should_apply_preauth_throttle_in( + shared.as_ref(), + probe, + start + Duration::from_millis(1), + ); } #[test] @@ -102,7 +117,8 @@ fn light_fuzz_scan_offset_stays_within_window_for_randomized_inputs() { let scan_limit = ((seed >> 40) as usize % 1024).saturating_add(1); let now = base + Duration::from_nanos(seed & 0x1fff); - let offset = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); + let offset = + auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); assert!( offset < state_len, "scan offset must always remain inside state length" diff --git a/src/proxy/tests/handshake_baseline_invariant_tests.rs b/src/proxy/tests/handshake_baseline_invariant_tests.rs index 0cab662..5f938d8 100644 --- a/src/proxy/tests/handshake_baseline_invariant_tests.rs +++ b/src/proxy/tests/handshake_baseline_invariant_tests.rs @@ -116,8 +116,14 @@ async fn handshake_baseline_auth_probe_streak_increments_per_ip() { ) .await; assert!(matches!(res, HandshakeResult::BadClient { .. })); - assert_eq!(auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()), Some(expected)); - assert_eq!(auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), untouched_ip), None); + assert_eq!( + auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()), + Some(expected) + ); + assert_eq!( + auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), untouched_ip), + None + ); } } @@ -149,7 +155,8 @@ fn handshake_baseline_repeated_probes_streak_monotonic() { for _ in 0..100 { auth_probe_record_failure_in(shared.as_ref(), ip, now); - let current = auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), ip).unwrap_or(0); + let current = + auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), ip).unwrap_or(0); assert!(current >= prev, "streak must be monotonic"); prev = current; } @@ -173,8 +180,16 @@ fn handshake_baseline_throttled_ip_incurs_backoff_delay() { let before_expiry = now + delay.saturating_sub(Duration::from_millis(1)); let after_expiry = now + delay + Duration::from_millis(1); - assert!(auth_probe_is_throttled_in(shared.as_ref(), ip, before_expiry)); - assert!(!auth_probe_is_throttled_in(shared.as_ref(), ip, after_expiry)); + assert!(auth_probe_is_throttled_in( + shared.as_ref(), + ip, + before_expiry + )); + assert!(!auth_probe_is_throttled_in( + shared.as_ref(), + ip, + after_expiry + )); } #[tokio::test] @@ -212,7 +227,10 @@ async fn handshake_baseline_malformed_probe_frames_fail_closed_to_masking() { .expect("malformed probe handling must complete in bounded time"); assert!( - matches!(res, HandshakeResult::BadClient { .. } | HandshakeResult::Error(_)), + matches!( + res, + HandshakeResult::BadClient { .. } | HandshakeResult::Error(_) + ), "malformed probe must fail closed" ); } diff --git a/src/proxy/tests/handshake_more_clever_tests.rs b/src/proxy/tests/handshake_more_clever_tests.rs index b8e5ae8..f570424 100644 --- a/src/proxy/tests/handshake_more_clever_tests.rs +++ b/src/proxy/tests/handshake_more_clever_tests.rs @@ -332,7 +332,13 @@ async fn invalid_secret_warning_lock_contention_and_bound() { b.wait().await; for i in 0..iterations_per_task { let user_name = format!("contention_user_{}_{}", t, i); - warn_invalid_secret_once_in(shared.as_ref(), &user_name, "invalid_hex", ACCESS_SECRET_BYTES, None); + warn_invalid_secret_once_in( + shared.as_ref(), + &user_name, + "invalid_hex", + ACCESS_SECRET_BYTES, + None, + ); } })); } @@ -629,7 +635,8 @@ fn auth_probe_saturation_note_resets_retention_window() { // This call may return false if backoff has elapsed, but it must not clear // the saturation state because `later` refreshed last_seen. - let _ = auth_probe_saturation_is_throttled_at_for_testing_in_shared(shared.as_ref(), check_time); + let _ = + auth_probe_saturation_is_throttled_at_for_testing_in_shared(shared.as_ref(), check_time); let guard = auth_probe_saturation_state_lock_for_testing_in_shared(shared.as_ref()); assert!( guard.is_some(), diff --git a/src/proxy/tests/handshake_real_bug_stress_tests.rs b/src/proxy/tests/handshake_real_bug_stress_tests.rs index 8c81061..9705853 100644 --- a/src/proxy/tests/handshake_real_bug_stress_tests.rs +++ b/src/proxy/tests/handshake_real_bug_stress_tests.rs @@ -206,7 +206,12 @@ fn auth_probe_eviction_identical_timestamps_keeps_map_bounded() { } let new_ip = IpAddr::V4(Ipv4Addr::new(192, 168, 21, 21)); - auth_probe_record_failure_with_state_in(shared.as_ref(), state, new_ip, same + Duration::from_millis(1)); + auth_probe_record_failure_with_state_in( + shared.as_ref(), + state, + new_ip, + same + Duration::from_millis(1), + ); assert_eq!(state.len(), AUTH_PROBE_TRACK_MAX_ENTRIES); assert!(state.contains_key(&new_ip)); @@ -325,7 +330,8 @@ async fn saturation_grace_exhaustion_under_concurrency_keeps_peer_throttled() { final_state.fail_streak >= AUTH_PROBE_BACKOFF_START_FAILS + AUTH_PROBE_SATURATION_GRACE_FAILS ); - assert!(auth_probe_should_apply_preauth_throttle_in(shared.as_ref(), + assert!(auth_probe_should_apply_preauth_throttle_in( + shared.as_ref(), peer_ip, Instant::now() )); diff --git a/src/proxy/tests/handshake_saturation_poison_security_tests.rs b/src/proxy/tests/handshake_saturation_poison_security_tests.rs index d7e1106..ebec667 100644 --- a/src/proxy/tests/handshake_saturation_poison_security_tests.rs +++ b/src/proxy/tests/handshake_saturation_poison_security_tests.rs @@ -54,7 +54,9 @@ fn clear_auth_probe_state_clears_saturation_even_if_poisoned() { poison_saturation_mutex(shared.as_ref()); auth_probe_note_saturation_in(shared.as_ref(), Instant::now()); - assert!(auth_probe_saturation_is_throttled_for_testing_in_shared(shared.as_ref())); + assert!(auth_probe_saturation_is_throttled_for_testing_in_shared( + shared.as_ref() + )); clear_auth_probe_state_for_testing_in_shared(shared.as_ref()); assert!( diff --git a/src/proxy/tests/handshake_security_tests.rs b/src/proxy/tests/handshake_security_tests.rs index 7479772..d8396b5 100644 --- a/src/proxy/tests/handshake_security_tests.rs +++ b/src/proxy/tests/handshake_security_tests.rs @@ -1427,7 +1427,13 @@ fn invalid_secret_warning_cache_is_bounded() { for idx in 0..(WARNED_SECRET_MAX_ENTRIES + 32) { let user = format!("warned_user_{idx}"); - warn_invalid_secret_once_in(shared.as_ref(), &user, "invalid_length", ACCESS_SECRET_BYTES, Some(idx)); + warn_invalid_secret_once_in( + shared.as_ref(), + &user, + "invalid_length", + ACCESS_SECRET_BYTES, + Some(idx), + ); } let warned = warned_secrets_for_testing_in_shared(shared.as_ref()); @@ -1640,11 +1646,15 @@ fn unknown_sni_warn_cooldown_first_event_is_warn_and_repeated_events_are_info_un "first unknown SNI event must be eligible for WARN emission" ); assert!( - !should_emit_unknown_sni_warn_for_testing_in_shared(shared.as_ref(), now + Duration::from_secs(1)), + !should_emit_unknown_sni_warn_for_testing_in_shared( + shared.as_ref(), + now + Duration::from_secs(1) + ), "events inside cooldown window must be demoted from WARN to INFO" ); assert!( - should_emit_unknown_sni_warn_for_testing_in_shared(shared.as_ref(), + should_emit_unknown_sni_warn_for_testing_in_shared( + shared.as_ref(), now + Duration::from_secs(UNKNOWN_SNI_WARN_COOLDOWN_SECS) ), "once cooldown expires, next unknown SNI event must be WARN-eligible again" @@ -1725,7 +1735,12 @@ fn auth_probe_over_cap_churn_still_tracks_newcomer_after_round_limit() { } let newcomer = IpAddr::V4(Ipv4Addr::new(203, 0, 114, 77)); - auth_probe_record_failure_with_state_in(shared.as_ref(), &state, newcomer, now + Duration::from_secs(1)); + auth_probe_record_failure_with_state_in( + shared.as_ref(), + &state, + newcomer, + now + Duration::from_secs(1), + ); assert!( state.get(&newcomer).is_some(), @@ -1931,8 +1946,18 @@ fn auth_probe_ipv6_is_bucketed_by_prefix_64() { let ip_a = IpAddr::V6("2001:db8:abcd:1234:1:2:3:4".parse().unwrap()); let ip_b = IpAddr::V6("2001:db8:abcd:1234:ffff:eeee:dddd:cccc".parse().unwrap()); - auth_probe_record_failure_with_state_in(shared.as_ref(), &state, normalize_auth_probe_ip(ip_a), now); - auth_probe_record_failure_with_state_in(shared.as_ref(), &state, normalize_auth_probe_ip(ip_b), now); + auth_probe_record_failure_with_state_in( + shared.as_ref(), + &state, + normalize_auth_probe_ip(ip_a), + now, + ); + auth_probe_record_failure_with_state_in( + shared.as_ref(), + &state, + normalize_auth_probe_ip(ip_b), + now, + ); let normalized = normalize_auth_probe_ip(ip_a); assert_eq!( @@ -1956,8 +1981,18 @@ fn auth_probe_ipv6_different_prefixes_use_distinct_buckets() { let ip_a = IpAddr::V6("2001:db8:1111:2222:1:2:3:4".parse().unwrap()); let ip_b = IpAddr::V6("2001:db8:1111:3333:1:2:3:4".parse().unwrap()); - auth_probe_record_failure_with_state_in(shared.as_ref(), &state, normalize_auth_probe_ip(ip_a), now); - auth_probe_record_failure_with_state_in(shared.as_ref(), &state, normalize_auth_probe_ip(ip_b), now); + auth_probe_record_failure_with_state_in( + shared.as_ref(), + &state, + normalize_auth_probe_ip(ip_a), + now, + ); + auth_probe_record_failure_with_state_in( + shared.as_ref(), + &state, + normalize_auth_probe_ip(ip_b), + now, + ); assert_eq!( state.len(), @@ -2070,7 +2105,12 @@ fn auth_probe_round_limited_overcap_eviction_marks_saturation_and_keeps_newcomer } let newcomer = IpAddr::V4(Ipv4Addr::new(203, 0, 113, 40)); - auth_probe_record_failure_with_state_in(shared.as_ref(), &state, newcomer, now + Duration::from_millis(1)); + auth_probe_record_failure_with_state_in( + shared.as_ref(), + &state, + newcomer, + now + Duration::from_millis(1), + ); assert!( state.get(&newcomer).is_some(), @@ -2081,7 +2121,10 @@ fn auth_probe_round_limited_overcap_eviction_marks_saturation_and_keeps_newcomer "high fail-streak sentinel must survive round-limited eviction" ); assert!( - auth_probe_saturation_is_throttled_at_for_testing_in_shared(shared.as_ref(), now + Duration::from_millis(1)), + auth_probe_saturation_is_throttled_at_for_testing_in_shared( + shared.as_ref(), + now + Duration::from_millis(1) + ), "round-limited over-cap path must activate saturation throttle marker" ); } @@ -2163,7 +2206,8 @@ fn stress_auth_probe_overcap_churn_does_not_starve_high_threat_sentinel_bucket() ((step >> 8) & 0xff) as u8, (step & 0xff) as u8, )); - auth_probe_record_failure_with_state_in(shared.as_ref(), + auth_probe_record_failure_with_state_in( + shared.as_ref(), &state, newcomer, base_now + Duration::from_millis(step as u64 + 1), @@ -2226,7 +2270,8 @@ fn light_fuzz_auth_probe_overcap_eviction_prefers_less_threatening_entries() { ((round >> 8) & 0xff) as u8, (round & 0xff) as u8, )); - auth_probe_record_failure_with_state_in(shared.as_ref(), + auth_probe_record_failure_with_state_in( + shared.as_ref(), &state, newcomer, now + Duration::from_millis(round as u64 + 1), @@ -3105,7 +3150,10 @@ async fn saturation_grace_boundary_still_admits_valid_tls_before_exhaustion() { matches!(result, HandshakeResult::Success(_)), "valid TLS should still pass while peer remains within saturation grace budget" ); - assert_eq!(auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()), None); + assert_eq!( + auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()), + None + ); } #[tokio::test] @@ -3171,7 +3219,10 @@ async fn saturation_grace_exhaustion_blocks_valid_tls_until_backoff_expires() { matches!(allowed, HandshakeResult::Success(_)), "valid TLS should recover after peer-specific pre-auth backoff has elapsed" ); - assert_eq!(auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()), None); + assert_eq!( + auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()), + None + ); } #[tokio::test] diff --git a/src/proxy/tests/masking_lognormal_timing_security_tests.rs b/src/proxy/tests/masking_lognormal_timing_security_tests.rs index 0c0bd1e..5d6c456 100644 --- a/src/proxy/tests/masking_lognormal_timing_security_tests.rs +++ b/src/proxy/tests/masking_lognormal_timing_security_tests.rs @@ -1,6 +1,6 @@ use super::*; -use rand::rngs::StdRng; use rand::SeedableRng; +use rand::rngs::StdRng; fn seeded_rng(seed: u64) -> StdRng { StdRng::seed_from_u64(seed) @@ -57,7 +57,10 @@ fn masking_lognormal_degenerate_floor_eq_ceiling_returns_floor() { let mut rng = seeded_rng(99); for _ in 0..100 { let val = sample_lognormal_percentile_bounded(1000, 1000, &mut rng); - assert_eq!(val, 1000, "floor == ceiling must always return exactly that value"); + assert_eq!( + val, 1000, + "floor == ceiling must always return exactly that value" + ); } } diff --git a/src/proxy/tests/middle_relay_baseline_invariant_tests.rs b/src/proxy/tests/middle_relay_baseline_invariant_tests.rs index 0a7e358..5e9ae2e 100644 --- a/src/proxy/tests/middle_relay_baseline_invariant_tests.rs +++ b/src/proxy/tests/middle_relay_baseline_invariant_tests.rs @@ -7,13 +7,22 @@ fn middle_relay_baseline_public_api_idle_roundtrip_contract() { clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7001)); - assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(7001)); + assert_eq!( + oldest_relay_idle_candidate_for_testing(shared.as_ref()), + Some(7001) + ); clear_relay_idle_candidate_for_testing(shared.as_ref(), 7001); - assert_ne!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(7001)); + assert_ne!( + oldest_relay_idle_candidate_for_testing(shared.as_ref()), + Some(7001) + ); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7001)); - assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(7001)); + assert_eq!( + oldest_relay_idle_candidate_for_testing(shared.as_ref()), + Some(7001) + ); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); } @@ -26,7 +35,12 @@ fn middle_relay_baseline_public_api_desync_window_contract() { let key = 0xDEAD_BEEF_0000_0001u64; let t0 = Instant::now(); - assert!(should_emit_full_desync_for_testing(shared.as_ref(), key, false, t0)); + assert!(should_emit_full_desync_for_testing( + shared.as_ref(), + key, + false, + t0 + )); assert!(!should_emit_full_desync_for_testing( shared.as_ref(), key, @@ -35,7 +49,12 @@ fn middle_relay_baseline_public_api_desync_window_contract() { )); let t1 = t0 + DESYNC_DEDUP_WINDOW + Duration::from_millis(10); - assert!(should_emit_full_desync_for_testing(shared.as_ref(), key, false, t1)); + assert!(should_emit_full_desync_for_testing( + shared.as_ref(), + key, + false, + t1 + )); clear_desync_dedup_for_testing_in_shared(shared.as_ref()); } diff --git a/src/proxy/tests/middle_relay_desync_all_full_dedup_security_tests.rs b/src/proxy/tests/middle_relay_desync_all_full_dedup_security_tests.rs index 46521e6..883f390 100644 --- a/src/proxy/tests/middle_relay_desync_all_full_dedup_security_tests.rs +++ b/src/proxy/tests/middle_relay_desync_all_full_dedup_security_tests.rs @@ -13,7 +13,12 @@ fn desync_all_full_bypass_does_not_initialize_or_grow_dedup_cache() { for i in 0..20_000u64 { assert!( - should_emit_full_desync_for_testing(shared.as_ref(), 0xD35E_D000_0000_0000u64 ^ i, true, now), + should_emit_full_desync_for_testing( + shared.as_ref(), + 0xD35E_D000_0000_0000u64 ^ i, + true, + now + ), "desync_all_full path must always emit" ); } @@ -37,7 +42,12 @@ fn desync_all_full_bypass_keeps_existing_dedup_entries_unchanged() { let now = Instant::now(); for i in 0..2048u64 { assert!( - should_emit_full_desync_for_testing(shared.as_ref(), 0xF011_F000_0000_0000u64 ^ i, true, now), + should_emit_full_desync_for_testing( + shared.as_ref(), + 0xF011_F000_0000_0000u64 ^ i, + true, + now + ), "desync_all_full must bypass suppression and dedup refresh" ); } @@ -68,7 +78,8 @@ fn edge_all_full_burst_does_not_poison_later_false_path_tracking() { let now = Instant::now(); for i in 0..8192u64 { - assert!(should_emit_full_desync_for_testing(shared.as_ref(), + assert!(should_emit_full_desync_for_testing( + shared.as_ref(), 0xABCD_0000_0000_0000 ^ i, true, now @@ -102,7 +113,12 @@ fn adversarial_mixed_sequence_true_steps_never_change_cache_len() { let flag_all_full = (seed & 0x1) == 1; let key = 0x7000_0000_0000_0000u64 ^ i ^ seed; let before = desync_dedup_len_for_testing(shared.as_ref()); - let _ = should_emit_full_desync_for_testing(shared.as_ref(), key, flag_all_full, Instant::now()); + let _ = should_emit_full_desync_for_testing( + shared.as_ref(), + key, + flag_all_full, + Instant::now(), + ); let after = desync_dedup_len_for_testing(shared.as_ref()); if flag_all_full { @@ -124,7 +140,12 @@ fn light_fuzz_all_full_mode_always_emits_and_stays_bounded() { seed ^= seed >> 9; seed ^= seed << 8; let key = seed ^ 0x55AA_55AA_55AA_55AAu64; - assert!(should_emit_full_desync_for_testing(shared.as_ref(), key, true, Instant::now())); + assert!(should_emit_full_desync_for_testing( + shared.as_ref(), + key, + true, + Instant::now() + )); } let after = desync_dedup_len_for_testing(shared.as_ref()); diff --git a/src/proxy/tests/middle_relay_idle_policy_security_tests.rs b/src/proxy/tests/middle_relay_idle_policy_security_tests.rs index a246640..8a3d580 100644 --- a/src/proxy/tests/middle_relay_idle_policy_security_tests.rs +++ b/src/proxy/tests/middle_relay_idle_policy_security_tests.rs @@ -366,23 +366,42 @@ fn pressure_evicts_oldest_idle_candidate_with_deterministic_ordering() { assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 10)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 11)); - assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(10)); + assert_eq!( + oldest_relay_idle_candidate_for_testing(shared.as_ref()), + Some(10) + ); note_relay_pressure_event_for_testing(shared.as_ref()); let mut seen_for_newer = 0u64; assert!( - !maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 11, &mut seen_for_newer, &stats), + !maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 11, + &mut seen_for_newer, + &stats + ), "newer idle candidate must not be evicted while older candidate exists" ); - assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(10)); + assert_eq!( + oldest_relay_idle_candidate_for_testing(shared.as_ref()), + Some(10) + ); let mut seen_for_oldest = 0u64; assert!( - maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 10, &mut seen_for_oldest, &stats), + maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 10, + &mut seen_for_oldest, + &stats + ), "oldest idle candidate must be evicted first under pressure" ); - assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(11)); + assert_eq!( + oldest_relay_idle_candidate_for_testing(shared.as_ref()), + Some(11) + ); assert_eq!(stats.get_relay_pressure_evict_total(), 1); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); @@ -402,7 +421,10 @@ fn pressure_does_not_evict_without_new_pressure_signal() { "without new pressure signal, candidate must stay" ); assert_eq!(stats.get_relay_pressure_evict_total(), 0); - assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(21)); + assert_eq!( + oldest_relay_idle_candidate_for_testing(shared.as_ref()), + Some(21) + ); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); } @@ -415,7 +437,10 @@ fn stress_pressure_eviction_preserves_fifo_across_many_candidates() { let mut seen_per_conn = std::collections::HashMap::new(); for conn_id in 1000u64..1064u64 { - assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), conn_id)); + assert!(mark_relay_idle_candidate_for_testing( + shared.as_ref(), + conn_id + )); seen_per_conn.insert(conn_id, 0u64); } @@ -426,7 +451,12 @@ fn stress_pressure_eviction_preserves_fifo_across_many_candidates() { .get(&expected) .expect("per-conn pressure cursor must exist"); assert!( - maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), expected, &mut seen, &stats), + maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + expected, + &mut seen, + &stats + ), "expected conn_id {expected} must be evicted next by deterministic FIFO ordering" ); seen_per_conn.insert(expected, seen); @@ -436,7 +466,10 @@ fn stress_pressure_eviction_preserves_fifo_across_many_candidates() { } else { Some(expected + 1) }; - assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), next); + assert_eq!( + oldest_relay_idle_candidate_for_testing(shared.as_ref()), + next + ); } assert_eq!(stats.get_relay_pressure_evict_total(), 64); @@ -460,9 +493,24 @@ fn blackhat_single_pressure_event_must_not_evict_more_than_one_candidate() { // Single pressure event should authorize at most one eviction globally. note_relay_pressure_event_for_testing(shared.as_ref()); - let evicted_301 = maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 301, &mut seen_301, &stats); - let evicted_302 = maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 302, &mut seen_302, &stats); - let evicted_303 = maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 303, &mut seen_303, &stats); + let evicted_301 = maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 301, + &mut seen_301, + &stats, + ); + let evicted_302 = maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 302, + &mut seen_302, + &stats, + ); + let evicted_303 = maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 303, + &mut seen_303, + &stats, + ); let evicted_total = [evicted_301, evicted_302, evicted_303] .iter() @@ -492,12 +540,22 @@ fn blackhat_pressure_counter_must_track_global_budget_not_per_session_cursor() { note_relay_pressure_event_for_testing(shared.as_ref()); assert!( - maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 401, &mut seen_oldest, &stats), + maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 401, + &mut seen_oldest, + &stats + ), "oldest candidate must consume pressure budget first" ); assert!( - !maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 402, &mut seen_next, &stats), + !maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 402, + &mut seen_next, + &stats + ), "next candidate must not consume the same pressure budget" ); @@ -522,7 +580,12 @@ fn blackhat_stale_pressure_before_idle_mark_must_not_trigger_eviction() { let mut seen = 0u64; assert!( - !maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 501, &mut seen, &stats), + !maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 501, + &mut seen, + &stats + ), "stale pressure (before soft-idle mark) must not evict newly marked candidate" ); @@ -545,9 +608,24 @@ fn blackhat_stale_pressure_must_not_evict_any_of_newly_marked_batch() { let mut seen_513 = 0u64; let evicted = [ - maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 511, &mut seen_511, &stats), - maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 512, &mut seen_512, &stats), - maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 513, &mut seen_513, &stats), + maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 511, + &mut seen_511, + &stats, + ), + maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 512, + &mut seen_512, + &stats, + ), + maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 513, + &mut seen_513, + &stats, + ), ] .iter() .filter(|value| **value) @@ -572,7 +650,12 @@ fn blackhat_stale_pressure_seen_without_candidates_must_be_globally_invalidated( // Session A observed pressure while there were no candidates. let mut seen_a = 0u64; assert!( - !maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 999_001, &mut seen_a, &stats), + !maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 999_001, + &mut seen_a, + &stats + ), "no candidate existed, so no eviction is possible" ); @@ -580,7 +663,12 @@ fn blackhat_stale_pressure_seen_without_candidates_must_be_globally_invalidated( assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 521)); let mut seen_b = 0u64; assert!( - !maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 521, &mut seen_b, &stats), + !maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 521, + &mut seen_b, + &stats + ), "once pressure is observed with empty candidate set, it must not be replayed later" ); @@ -600,7 +688,12 @@ fn blackhat_stale_pressure_must_not_survive_candidate_churn() { let mut seen = 0u64; assert!( - !maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 532, &mut seen, &stats), + !maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 532, + &mut seen, + &stats + ), "stale pressure must not survive clear+remark churn cycles" ); @@ -663,7 +756,10 @@ async fn integration_race_single_pressure_event_allows_at_most_one_eviction_unde let mut seen_per_session = vec![0u64; sessions]; for conn_id in &conn_ids { - assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), *conn_id)); + assert!(mark_relay_idle_candidate_for_testing( + shared.as_ref(), + *conn_id + )); } for round in 0..rounds { @@ -676,8 +772,12 @@ async fn integration_race_single_pressure_event_allows_at_most_one_eviction_unde let stats = stats.clone(); let shared = shared.clone(); joins.push(tokio::spawn(async move { - let evicted = - maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), conn_id, &mut seen, stats.as_ref()); + let evicted = maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + conn_id, + &mut seen, + stats.as_ref(), + ); (idx, conn_id, seen, evicted) })); } @@ -729,7 +829,10 @@ async fn integration_race_burst_pressure_with_churn_preserves_empty_set_invalida let mut seen_per_session = vec![0u64; sessions]; for conn_id in &conn_ids { - assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), *conn_id)); + assert!(mark_relay_idle_candidate_for_testing( + shared.as_ref(), + *conn_id + )); } let mut expected_total_evictions = 0u64; @@ -751,8 +854,12 @@ async fn integration_race_burst_pressure_with_churn_preserves_empty_set_invalida let stats = stats.clone(); let shared = shared.clone(); joins.push(tokio::spawn(async move { - let evicted = - maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), conn_id, &mut seen, stats.as_ref()); + let evicted = maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + conn_id, + &mut seen, + stats.as_ref(), + ); (idx, conn_id, seen, evicted) })); } @@ -774,7 +881,10 @@ async fn integration_race_burst_pressure_with_churn_preserves_empty_set_invalida "round {round}: empty candidate phase must not allow stale-pressure eviction" ); for conn_id in &conn_ids { - assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), *conn_id)); + assert!(mark_relay_idle_candidate_for_testing( + shared.as_ref(), + *conn_id + )); } } else { assert!( @@ -783,7 +893,10 @@ async fn integration_race_burst_pressure_with_churn_preserves_empty_set_invalida ); if let Some(conn_id) = evicted_conn { expected_total_evictions = expected_total_evictions.saturating_add(1); - assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), conn_id)); + assert!(mark_relay_idle_candidate_for_testing( + shared.as_ref(), + conn_id + )); } } } diff --git a/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs b/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs index ce908da..4f57f56 100644 --- a/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs +++ b/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs @@ -25,7 +25,10 @@ fn blackhat_registry_poison_recovers_with_fail_closed_reset_and_pressure_account // Helper lock must recover from poison, reset stale state, and continue. assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 42)); - assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(42)); + assert_eq!( + oldest_relay_idle_candidate_for_testing(shared.as_ref()), + Some(42) + ); let before = relay_pressure_event_seq_for_testing(shared.as_ref()); note_relay_pressure_event_for_testing(shared.as_ref()); @@ -54,11 +57,17 @@ fn clear_state_helper_must_reset_poisoned_registry_for_deterministic_fifo_tests( clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); - assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), None); + assert_eq!( + oldest_relay_idle_candidate_for_testing(shared.as_ref()), + None + ); assert_eq!(relay_pressure_event_seq_for_testing(shared.as_ref()), 0); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7)); - assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(7)); + assert_eq!( + oldest_relay_idle_candidate_for_testing(shared.as_ref()), + Some(7) + ); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); } diff --git a/src/proxy/tests/proxy_shared_state_isolation_tests.rs b/src/proxy/tests/proxy_shared_state_isolation_tests.rs index 3e26000..7887ef8 100644 --- a/src/proxy/tests/proxy_shared_state_isolation_tests.rs +++ b/src/proxy/tests/proxy_shared_state_isolation_tests.rs @@ -1,10 +1,10 @@ +use crate::proxy::client::handle_client_stream_with_shared; use crate::proxy::handshake::{ auth_probe_fail_streak_for_testing_in_shared, auth_probe_is_throttled_for_testing_in_shared, auth_probe_record_failure_for_testing, clear_auth_probe_state_for_testing_in_shared, clear_unknown_sni_warn_state_for_testing_in_shared, clear_warned_secrets_for_testing_in_shared, should_emit_unknown_sni_warn_for_testing_in_shared, warned_secrets_for_testing_in_shared, }; -use crate::proxy::client::handle_client_stream_with_shared; use crate::proxy::middle_relay::{ clear_desync_dedup_for_testing_in_shared, clear_relay_idle_candidate_for_testing, clear_relay_idle_pressure_state_for_testing_in_shared, mark_relay_idle_candidate_for_testing, @@ -81,7 +81,10 @@ fn new_client_harness() -> ClientHarness { } } -async fn drive_invalid_mtproto_handshake(shared: Arc, peer: std::net::SocketAddr) { +async fn drive_invalid_mtproto_handshake( + shared: Arc, + peer: std::net::SocketAddr, +) { let harness = new_client_harness(); let (server_side, mut client_side) = duplex(4096); let invalid = [0u8; 64]; @@ -108,7 +111,10 @@ async fn drive_invalid_mtproto_handshake(shared: Arc, peer: st .write_all(&invalid) .await .expect("failed to write invalid handshake"); - client_side.shutdown().await.expect("failed to shutdown client"); + client_side + .shutdown() + .await + .expect("failed to shutdown client"); let _ = tokio::time::timeout(Duration::from_secs(3), task) .await .expect("client task timed out") @@ -128,7 +134,10 @@ fn proxy_shared_state_two_instances_do_not_share_auth_probe_state() { auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip), Some(1) ); - assert_eq!(auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip), None); + assert_eq!( + auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip), + None + ); } #[test] @@ -139,8 +148,18 @@ fn proxy_shared_state_two_instances_do_not_share_desync_dedup() { let now = Instant::now(); let key = 0xA5A5_u64; - assert!(should_emit_full_desync_for_testing(a.as_ref(), key, false, now)); - assert!(should_emit_full_desync_for_testing(b.as_ref(), key, false, now)); + assert!(should_emit_full_desync_for_testing( + a.as_ref(), + key, + false, + now + )); + assert!(should_emit_full_desync_for_testing( + b.as_ref(), + key, + false, + now + )); } #[test] @@ -150,7 +169,10 @@ fn proxy_shared_state_two_instances_do_not_share_idle_registry() { clear_relay_idle_pressure_state_for_testing_in_shared(a.as_ref()); assert!(mark_relay_idle_candidate_for_testing(a.as_ref(), 111)); - assert_eq!(oldest_relay_idle_candidate_for_testing(a.as_ref()), Some(111)); + assert_eq!( + oldest_relay_idle_candidate_for_testing(a.as_ref()), + Some(111) + ); assert_eq!(oldest_relay_idle_candidate_for_testing(b.as_ref()), None); } @@ -168,7 +190,10 @@ fn proxy_shared_state_reset_in_one_instance_does_not_affect_another() { auth_probe_record_failure_for_testing(b.as_ref(), ip_b, now); clear_auth_probe_state_for_testing_in_shared(a.as_ref()); - assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a), None); + assert_eq!( + auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a), + None + ); assert_eq!( auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip_b), Some(1) @@ -191,8 +216,14 @@ fn proxy_shared_state_parallel_auth_probe_updates_stay_per_instance() { auth_probe_record_failure_for_testing(b.as_ref(), ip, now + Duration::from_millis(1)); } - assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip), Some(5)); - assert_eq!(auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip), Some(3)); + assert_eq!( + auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip), + Some(5) + ); + assert_eq!( + auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip), + Some(3) + ); } #[tokio::test] @@ -317,8 +348,14 @@ fn proxy_shared_state_auth_saturation_does_not_bleed_across_instances() { auth_probe_record_failure_for_testing(a.as_ref(), ip, future_now); } - assert!(auth_probe_is_throttled_for_testing_in_shared(a.as_ref(), ip)); - assert!(!auth_probe_is_throttled_for_testing_in_shared(b.as_ref(), ip)); + assert!(auth_probe_is_throttled_for_testing_in_shared( + a.as_ref(), + ip + )); + assert!(!auth_probe_is_throttled_for_testing_in_shared( + b.as_ref(), + ip + )); } #[test] @@ -348,7 +385,10 @@ fn proxy_shared_state_poison_clear_in_one_instance_does_not_affect_other_instanc clear_auth_probe_state_for_testing_in_shared(a.as_ref()); - assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a), None); + assert_eq!( + auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a), + None + ); assert_eq!( auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip_b), Some(1), @@ -463,7 +503,10 @@ fn proxy_shared_state_warned_secret_clear_in_one_instance_does_not_clear_other() clear_warned_secrets_for_testing_in_shared(a.as_ref()); clear_warned_secrets_for_testing_in_shared(b.as_ref()); - let key = ("clear-isolation-user".to_string(), "invalid_length".to_string()); + let key = ( + "clear-isolation-user".to_string(), + "invalid_length".to_string(), + ); { let warned_a = warned_secrets_for_testing_in_shared(a.as_ref()); let mut guard_a = warned_a @@ -508,14 +551,24 @@ fn proxy_shared_state_desync_duplicate_suppression_is_instance_scoped() { let now = Instant::now(); let key = 0xBEEF_0000_0000_0001u64; - assert!(should_emit_full_desync_for_testing(a.as_ref(), key, false, now)); + assert!(should_emit_full_desync_for_testing( + a.as_ref(), + key, + false, + now + )); assert!(!should_emit_full_desync_for_testing( a.as_ref(), key, false, now + Duration::from_millis(1) )); - assert!(should_emit_full_desync_for_testing(b.as_ref(), key, false, now)); + assert!(should_emit_full_desync_for_testing( + b.as_ref(), + key, + false, + now + )); } #[test] @@ -527,8 +580,18 @@ fn proxy_shared_state_desync_clear_in_one_instance_does_not_clear_other() { let now = Instant::now(); let key = 0xCAFE_0000_0000_0001u64; - assert!(should_emit_full_desync_for_testing(a.as_ref(), key, false, now)); - assert!(should_emit_full_desync_for_testing(b.as_ref(), key, false, now)); + assert!(should_emit_full_desync_for_testing( + a.as_ref(), + key, + false, + now + )); + assert!(should_emit_full_desync_for_testing( + b.as_ref(), + key, + false, + now + )); clear_desync_dedup_for_testing_in_shared(a.as_ref()); @@ -558,7 +621,10 @@ fn proxy_shared_state_idle_candidate_clear_in_one_instance_does_not_affect_other clear_relay_idle_candidate_for_testing(a.as_ref(), 1001); assert_eq!(oldest_relay_idle_candidate_for_testing(a.as_ref()), None); - assert_eq!(oldest_relay_idle_candidate_for_testing(b.as_ref()), Some(2002)); + assert_eq!( + oldest_relay_idle_candidate_for_testing(b.as_ref()), + Some(2002) + ); } #[test] diff --git a/src/proxy/tests/proxy_shared_state_parallel_execution_tests.rs b/src/proxy/tests/proxy_shared_state_parallel_execution_tests.rs index 45da59a..1330df4 100644 --- a/src/proxy/tests/proxy_shared_state_parallel_execution_tests.rs +++ b/src/proxy/tests/proxy_shared_state_parallel_execution_tests.rs @@ -1,16 +1,17 @@ use crate::proxy::handshake::{ auth_probe_fail_streak_for_testing_in_shared, auth_probe_record_failure_for_testing, - clear_auth_probe_state_for_testing_in_shared, clear_unknown_sni_warn_state_for_testing_in_shared, + clear_auth_probe_state_for_testing_in_shared, + clear_unknown_sni_warn_state_for_testing_in_shared, should_emit_unknown_sni_warn_for_testing_in_shared, }; use crate::proxy::middle_relay::{ - clear_desync_dedup_for_testing_in_shared, clear_relay_idle_pressure_state_for_testing_in_shared, - mark_relay_idle_candidate_for_testing, oldest_relay_idle_candidate_for_testing, - should_emit_full_desync_for_testing, + clear_desync_dedup_for_testing_in_shared, + clear_relay_idle_pressure_state_for_testing_in_shared, mark_relay_idle_candidate_for_testing, + oldest_relay_idle_candidate_for_testing, should_emit_full_desync_for_testing, }; use crate::proxy::shared_state::ProxySharedState; -use rand::SeedableRng; use rand::RngExt; +use rand::SeedableRng; use rand::rngs::StdRng; use std::net::{IpAddr, Ipv4Addr}; use std::sync::Arc; @@ -99,8 +100,14 @@ async fn proxy_shared_state_dual_instance_same_ip_high_contention_no_counter_ble handle.await.expect("task join failed"); } - assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip), Some(64)); - assert_eq!(auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip), Some(64)); + assert_eq!( + auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip), + Some(64) + ); + assert_eq!( + auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip), + Some(64) + ); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -183,12 +190,7 @@ async fn proxy_shared_state_seed_matrix_concurrency_isolation_no_counter_bleed() clear_auth_probe_state_for_testing_in_shared(shared_a.as_ref()); clear_auth_probe_state_for_testing_in_shared(shared_b.as_ref()); - let ip = IpAddr::V4(Ipv4Addr::new( - 198, - 51, - 100, - rng.random_range(1_u8..=250_u8), - )); + let ip = IpAddr::V4(Ipv4Addr::new(198, 51, 100, rng.random_range(1_u8..=250_u8))); let workers = rng.random_range(16_usize..=48_usize); let rounds = rng.random_range(4_usize..=10_usize); @@ -210,7 +212,11 @@ async fn proxy_shared_state_seed_matrix_concurrency_isolation_no_counter_bleed() handles.push(tokio::spawn(async move { start_a.wait().await; for _ in 0..a_ops { - auth_probe_record_failure_for_testing(shared_a.as_ref(), ip, Instant::now()); + auth_probe_record_failure_for_testing( + shared_a.as_ref(), + ip, + Instant::now(), + ); } })); @@ -219,7 +225,11 @@ async fn proxy_shared_state_seed_matrix_concurrency_isolation_no_counter_bleed() handles.push(tokio::spawn(async move { start_b.wait().await; for _ in 0..b_ops { - auth_probe_record_failure_for_testing(shared_b.as_ref(), ip, Instant::now()); + auth_probe_record_failure_for_testing( + shared_b.as_ref(), + ip, + Instant::now(), + ); } })); } diff --git a/src/proxy/tests/relay_baseline_invariant_tests.rs b/src/proxy/tests/relay_baseline_invariant_tests.rs index 67e911a..998be2d 100644 --- a/src/proxy/tests/relay_baseline_invariant_tests.rs +++ b/src/proxy/tests/relay_baseline_invariant_tests.rs @@ -69,7 +69,10 @@ async fn relay_baseline_activity_timeout_fires_after_inactivity() { .expect("relay must complete after inactivity timeout") .expect("relay task must not panic"); - assert!(done.is_ok(), "relay must return Ok(()) after inactivity timeout"); + assert!( + done.is_ok(), + "relay must return Ok(()) after inactivity timeout" + ); } #[tokio::test] @@ -155,7 +158,10 @@ async fn relay_baseline_bidirectional_bytes_counted_symmetrically() { .expect("relay task must not panic"); assert!(done.is_ok()); - assert_eq!(stats.get_user_total_octets(user), (c2s.len() + s2c.len()) as u64); + assert_eq!( + stats.get_user_total_octets(user), + (c2s.len() + s2c.len()) as u64 + ); } #[tokio::test] @@ -222,7 +228,10 @@ async fn relay_baseline_broken_pipe_midtransfer_returns_error() { match done { Err(ProxyError::Io(err)) => { assert!( - matches!(err.kind(), io::ErrorKind::BrokenPipe | io::ErrorKind::ConnectionReset), + matches!( + err.kind(), + io::ErrorKind::BrokenPipe | io::ErrorKind::ConnectionReset + ), "expected BrokenPipe/ConnectionReset, got {:?}", err.kind() ); diff --git a/src/proxy/tests/test_harness_common.rs b/src/proxy/tests/test_harness_common.rs index 52e90b1..4ebb419 100644 --- a/src/proxy/tests/test_harness_common.rs +++ b/src/proxy/tests/test_harness_common.rs @@ -1,6 +1,6 @@ use crate::config::ProxyConfig; -use rand::rngs::StdRng; use rand::SeedableRng; +use rand::rngs::StdRng; use std::io; use std::pin::Pin; use std::sync::Arc; @@ -18,7 +18,10 @@ mod tests { let arc = Arc::::from_raw(data.cast::()); let cloned = Arc::clone(&arc); let _ = Arc::into_raw(arc); - RawWaker::new(Arc::into_raw(cloned).cast::<()>(), &WAKE_COUNTER_WAKER_VTABLE) + RawWaker::new( + Arc::into_raw(cloned).cast::<()>(), + &WAKE_COUNTER_WAKER_VTABLE, + ) } unsafe fn wake_counter_wake(data: *const ()) { diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 42c42ff..38b22bb 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -1593,13 +1593,15 @@ impl Stats { self.conntrack_delete_success_total.load(Ordering::Relaxed) } pub fn get_conntrack_delete_not_found_total(&self) -> u64 { - self.conntrack_delete_not_found_total.load(Ordering::Relaxed) + self.conntrack_delete_not_found_total + .load(Ordering::Relaxed) } pub fn get_conntrack_delete_error_total(&self) -> u64 { self.conntrack_delete_error_total.load(Ordering::Relaxed) } pub fn get_conntrack_close_event_drop_total(&self) -> u64 { - self.conntrack_close_event_drop_total.load(Ordering::Relaxed) + self.conntrack_close_event_drop_total + .load(Ordering::Relaxed) } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed)