diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 89e72bb..fd88796 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -595,6 +595,14 @@ pub(crate) fn default_mask_relay_max_bytes() -> usize { 32 * 1024 } +pub(crate) fn default_mask_relay_timeout_secs() -> u64 { + 60 +} + +pub(crate) fn default_mask_relay_idle_timeout_secs() -> u64 { + 5 +} + pub(crate) fn default_mask_classifier_prefetch_timeout_ms() -> u64 { 5 } diff --git a/src/config/types.rs b/src/config/types.rs index 41b0c2e..c43d296 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -1568,6 +1568,17 @@ pub struct AntiCensorshipConfig { #[serde(default = "default_mask_relay_max_bytes")] pub mask_relay_max_bytes: usize, + /// Wall-clock cap for the full masking relay on non-MTProto fallback paths. + /// Raise when the mask target is a long-lived service (e.g. WebSocket). + #[serde(default = "default_mask_relay_timeout_secs")] + pub mask_relay_timeout_secs: u64, + + /// Per-read idle timeout on masking relay and drain paths. + /// Limits resource consumption by slow-loris attacks and port scanners. + /// A read call stalling beyond this is treated as an abandoned connection. + #[serde(default = "default_mask_relay_idle_timeout_secs")] + pub mask_relay_idle_timeout_secs: u64, + /// Prefetch timeout (ms) for extending fragmented masking classifier window. #[serde(default = "default_mask_classifier_prefetch_timeout_ms")] pub mask_classifier_prefetch_timeout_ms: u64, @@ -1613,6 +1624,8 @@ impl Default for AntiCensorshipConfig { mask_shape_above_cap_blur: default_mask_shape_above_cap_blur(), mask_shape_above_cap_blur_max_bytes: default_mask_shape_above_cap_blur_max_bytes(), mask_relay_max_bytes: default_mask_relay_max_bytes(), + mask_relay_timeout_secs: default_mask_relay_timeout_secs(), + mask_relay_idle_timeout_secs: default_mask_relay_idle_timeout_secs(), mask_classifier_prefetch_timeout_ms: default_mask_classifier_prefetch_timeout_ms(), mask_timing_normalization_enabled: default_mask_timing_normalization_enabled(), mask_timing_normalization_floor_ms: default_mask_timing_normalization_floor_ms(), diff --git a/src/proxy/masking.rs b/src/proxy/masking.rs index ba9f20a..988060d 100644 --- a/src/proxy/masking.rs +++ b/src/proxy/masking.rs @@ -28,14 +28,10 @@ use tracing::debug; const MASK_TIMEOUT: Duration = Duration::from_secs(5); #[cfg(test)] const MASK_TIMEOUT: Duration = Duration::from_millis(50); -/// Maximum duration for the entire masking relay. -/// Limits resource consumption from slow-loris attacks and port scanners. -#[cfg(not(test))] -const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60); +/// Maximum duration for the entire masking relay under test (replaced by config at runtime). #[cfg(test)] const MASK_RELAY_TIMEOUT: Duration = Duration::from_millis(200); -#[cfg(not(test))] -const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_secs(5); +/// Per-read idle timeout for masking relay and drain paths under test (replaced by config at runtime). #[cfg(test)] const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_millis(100); const MASK_BUFFER_SIZE: usize = 8192; @@ -55,6 +51,7 @@ async fn copy_with_idle_timeout( writer: &mut W, byte_cap: usize, shutdown_on_eof: bool, + idle_timeout: Duration, ) -> CopyOutcome where R: AsyncRead + Unpin, @@ -78,7 +75,7 @@ where } let read_len = remaining_budget.min(MASK_BUFFER_SIZE); - let read_res = timeout(MASK_RELAY_IDLE_TIMEOUT, reader.read(&mut buf[..read_len])).await; + let read_res = timeout(idle_timeout, reader.read(&mut buf[..read_len])).await; let n = match read_res { Ok(Ok(n)) => n, Ok(Err(_)) | Err(_) => break, @@ -86,13 +83,13 @@ where if n == 0 { ended_by_eof = true; if shutdown_on_eof { - let _ = timeout(MASK_RELAY_IDLE_TIMEOUT, writer.shutdown()).await; + let _ = timeout(idle_timeout, writer.shutdown()).await; } break; } total = total.saturating_add(n); - let write_res = timeout(MASK_RELAY_IDLE_TIMEOUT, writer.write_all(&buf[..n])).await; + let write_res = timeout(idle_timeout, writer.write_all(&buf[..n])).await; match write_res { Ok(Ok(())) => {} Ok(Err(_)) | Err(_) => break, @@ -230,11 +227,15 @@ where } } -async fn consume_client_data_with_timeout_and_cap(reader: R, byte_cap: usize) -where +async fn consume_client_data_with_timeout_and_cap( + reader: R, + byte_cap: usize, + relay_timeout: Duration, + idle_timeout: Duration, +) where R: AsyncRead + Unpin, { - if timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader, byte_cap)) + if timeout(relay_timeout, consume_client_data(reader, byte_cap, idle_timeout)) .await .is_err() { @@ -598,10 +599,18 @@ pub async fn handle_bad_client( beobachten.record(client_type, peer.ip(), ttl); } + let relay_timeout = Duration::from_secs(config.censorship.mask_relay_timeout_secs); + let idle_timeout = Duration::from_secs(config.censorship.mask_relay_idle_timeout_secs); + if !config.censorship.mask { // Masking disabled, just consume data - consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes) - .await; + consume_client_data_with_timeout_and_cap( + reader, + config.censorship.mask_relay_max_bytes, + relay_timeout, + idle_timeout, + ) + .await; return; } @@ -633,7 +642,7 @@ pub async fn handle_bad_client( return; } if timeout( - MASK_RELAY_TIMEOUT, + relay_timeout, relay_to_mask( reader, writer, @@ -647,6 +656,7 @@ pub async fn handle_bad_client( config.censorship.mask_shape_above_cap_blur_max_bytes, config.censorship.mask_shape_hardening_aggressive_mode, config.censorship.mask_relay_max_bytes, + idle_timeout, ), ) .await @@ -662,6 +672,8 @@ pub async fn handle_bad_client( consume_client_data_with_timeout_and_cap( reader, config.censorship.mask_relay_max_bytes, + relay_timeout, + idle_timeout, ) .await; wait_mask_outcome_budget(outcome_started, config).await; @@ -671,6 +683,8 @@ pub async fn handle_bad_client( consume_client_data_with_timeout_and_cap( reader, config.censorship.mask_relay_max_bytes, + relay_timeout, + idle_timeout, ) .await; wait_mask_outcome_budget(outcome_started, config).await; @@ -701,8 +715,13 @@ pub async fn handle_bad_client( local = %local_addr, "Mask target resolves to local listener; refusing self-referential masking fallback" ); - consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes) - .await; + consume_client_data_with_timeout_and_cap( + reader, + config.censorship.mask_relay_max_bytes, + relay_timeout, + idle_timeout, + ) + .await; wait_mask_outcome_budget(outcome_started, config).await; return; } @@ -736,7 +755,7 @@ pub async fn handle_bad_client( return; } if timeout( - MASK_RELAY_TIMEOUT, + relay_timeout, relay_to_mask( reader, writer, @@ -750,6 +769,7 @@ pub async fn handle_bad_client( config.censorship.mask_shape_above_cap_blur_max_bytes, config.censorship.mask_shape_hardening_aggressive_mode, config.censorship.mask_relay_max_bytes, + idle_timeout, ), ) .await @@ -765,6 +785,8 @@ pub async fn handle_bad_client( consume_client_data_with_timeout_and_cap( reader, config.censorship.mask_relay_max_bytes, + relay_timeout, + idle_timeout, ) .await; wait_mask_outcome_budget(outcome_started, config).await; @@ -774,6 +796,8 @@ pub async fn handle_bad_client( consume_client_data_with_timeout_and_cap( reader, config.censorship.mask_relay_max_bytes, + relay_timeout, + idle_timeout, ) .await; wait_mask_outcome_budget(outcome_started, config).await; @@ -795,6 +819,7 @@ async fn relay_to_mask( shape_above_cap_blur_max_bytes: usize, shape_hardening_aggressive_mode: bool, mask_relay_max_bytes: usize, + idle_timeout: Duration, ) where R: AsyncRead + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static, @@ -816,11 +841,19 @@ async fn relay_to_mask( &mut mask_write, mask_relay_max_bytes, !shape_hardening_enabled, + idle_timeout, ) .await }, async { - copy_with_idle_timeout(&mut mask_read, &mut writer, mask_relay_max_bytes, true).await + copy_with_idle_timeout( + &mut mask_read, + &mut writer, + mask_relay_max_bytes, + true, + idle_timeout, + ) + .await } ); @@ -848,7 +881,11 @@ async fn relay_to_mask( } /// Just consume all data from client without responding. -async fn consume_client_data(mut reader: R, byte_cap: usize) { +async fn consume_client_data( + mut reader: R, + byte_cap: usize, + idle_timeout: Duration, +) { if byte_cap == 0 { return; } @@ -864,7 +901,7 @@ async fn consume_client_data(mut reader: R, byte_cap: usiz } let read_len = remaining_budget.min(MASK_BUFFER_SIZE); - let n = match timeout(MASK_RELAY_IDLE_TIMEOUT, reader.read(&mut buf[..read_len])).await { + let n = match timeout(idle_timeout, reader.read(&mut buf[..read_len])).await { Ok(Ok(n)) => n, Ok(Err(_)) | Err(_) => break, }; diff --git a/src/proxy/tests/masking_additional_hardening_security_tests.rs b/src/proxy/tests/masking_additional_hardening_security_tests.rs index a6f6386..1b8ca2e 100644 --- a/src/proxy/tests/masking_additional_hardening_security_tests.rs +++ b/src/proxy/tests/masking_additional_hardening_security_tests.rs @@ -47,7 +47,7 @@ async fn consume_client_data_stops_after_byte_cap_without_eof() { }; let cap = 10_000usize; - consume_client_data(reader, cap).await; + consume_client_data(reader, cap, MASK_RELAY_IDLE_TIMEOUT).await; let total = produced.load(Ordering::Relaxed); assert!( diff --git a/src/proxy/tests/masking_consume_idle_timeout_security_tests.rs b/src/proxy/tests/masking_consume_idle_timeout_security_tests.rs index f2c39a2..227db73 100644 --- a/src/proxy/tests/masking_consume_idle_timeout_security_tests.rs +++ b/src/proxy/tests/masking_consume_idle_timeout_security_tests.rs @@ -31,7 +31,7 @@ async fn stalling_client_terminates_at_idle_not_relay_timeout() { let result = tokio::time::timeout( MASK_RELAY_TIMEOUT, - consume_client_data(reader, MASK_BUFFER_SIZE * 4), + consume_client_data(reader, MASK_BUFFER_SIZE * 4, MASK_RELAY_IDLE_TIMEOUT), ) .await; @@ -57,7 +57,7 @@ async fn fast_reader_drains_to_eof() { let data = vec![0xAAu8; 32 * 1024]; let reader = std::io::Cursor::new(data); - tokio::time::timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader, usize::MAX)) + tokio::time::timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader, usize::MAX, MASK_RELAY_IDLE_TIMEOUT)) .await .expect("consume_client_data did not complete for fast EOF reader"); } @@ -81,7 +81,7 @@ async fn io_error_terminates_cleanly() { tokio::time::timeout( MASK_RELAY_TIMEOUT, - consume_client_data(ErrReader, usize::MAX), + consume_client_data(ErrReader, usize::MAX, MASK_RELAY_IDLE_TIMEOUT), ) .await .expect("consume_client_data did not return on I/O error"); diff --git a/src/proxy/tests/masking_consume_stress_adversarial_tests.rs b/src/proxy/tests/masking_consume_stress_adversarial_tests.rs index 12287b5..4ef283e 100644 --- a/src/proxy/tests/masking_consume_stress_adversarial_tests.rs +++ b/src/proxy/tests/masking_consume_stress_adversarial_tests.rs @@ -34,7 +34,7 @@ async fn consume_stall_stress_finishes_within_idle_budget() { set.spawn(async { tokio::time::timeout( MASK_RELAY_TIMEOUT, - consume_client_data(OneByteThenStall { sent: false }, usize::MAX), + consume_client_data(OneByteThenStall { sent: false }, usize::MAX, MASK_RELAY_IDLE_TIMEOUT), ) .await .expect("consume_client_data exceeded relay timeout under stall load"); @@ -56,7 +56,7 @@ async fn consume_stall_stress_finishes_within_idle_budget() { #[tokio::test] async fn consume_zero_cap_returns_immediately() { let started = Instant::now(); - consume_client_data(tokio::io::empty(), 0).await; + consume_client_data(tokio::io::empty(), 0, MASK_RELAY_IDLE_TIMEOUT).await; assert!( started.elapsed() < MASK_RELAY_IDLE_TIMEOUT, "zero byte cap must return immediately" diff --git a/src/proxy/tests/masking_production_cap_regression_security_tests.rs b/src/proxy/tests/masking_production_cap_regression_security_tests.rs index 9ff51ba..32ecdbb 100644 --- a/src/proxy/tests/masking_production_cap_regression_security_tests.rs +++ b/src/proxy/tests/masking_production_cap_regression_security_tests.rs @@ -127,7 +127,8 @@ async fn positive_copy_with_production_cap_stops_exactly_at_budget() { let mut reader = FinitePatternReader::new(PROD_CAP_BYTES + (256 * 1024), 4096, read_calls); let mut writer = CountingWriter::default(); - let outcome = copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await; + let outcome = + copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true, MASK_RELAY_IDLE_TIMEOUT).await; assert_eq!( outcome.total, PROD_CAP_BYTES, @@ -145,7 +146,7 @@ async fn negative_consume_with_zero_cap_performs_no_reads() { let read_calls = Arc::new(AtomicUsize::new(0)); let reader = FinitePatternReader::new(1024, 64, Arc::clone(&read_calls)); - consume_client_data_with_timeout_and_cap(reader, 0).await; + consume_client_data_with_timeout_and_cap(reader, 0, MASK_RELAY_TIMEOUT, MASK_RELAY_IDLE_TIMEOUT).await; assert_eq!( read_calls.load(Ordering::Relaxed), @@ -161,7 +162,8 @@ async fn edge_copy_below_cap_reports_eof_without_overread() { let mut reader = FinitePatternReader::new(payload, 3072, read_calls); let mut writer = CountingWriter::default(); - let outcome = copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await; + let outcome = + copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true, MASK_RELAY_IDLE_TIMEOUT).await; assert_eq!(outcome.total, payload); assert_eq!(writer.written, payload); @@ -175,7 +177,7 @@ async fn edge_copy_below_cap_reports_eof_without_overread() { async fn adversarial_blackhat_never_ready_reader_is_bounded_by_timeout_guards() { let started = Instant::now(); - consume_client_data_with_timeout_and_cap(NeverReadyReader, PROD_CAP_BYTES).await; + consume_client_data_with_timeout_and_cap(NeverReadyReader, PROD_CAP_BYTES, MASK_RELAY_TIMEOUT, MASK_RELAY_IDLE_TIMEOUT).await; assert!( started.elapsed() < Duration::from_millis(350), @@ -190,7 +192,7 @@ async fn integration_consume_path_honors_production_cap_for_large_payload() { let bounded = timeout( Duration::from_millis(350), - consume_client_data_with_timeout_and_cap(reader, PROD_CAP_BYTES), + consume_client_data_with_timeout_and_cap(reader, PROD_CAP_BYTES, MASK_RELAY_TIMEOUT, MASK_RELAY_IDLE_TIMEOUT), ) .await; @@ -206,7 +208,7 @@ async fn adversarial_consume_path_never_reads_beyond_declared_byte_cap() { let total_read = Arc::new(AtomicUsize::new(0)); let reader = BudgetProbeReader::new(256 * 1024, Arc::clone(&total_read)); - consume_client_data_with_timeout_and_cap(reader, byte_cap).await; + consume_client_data_with_timeout_and_cap(reader, byte_cap, MASK_RELAY_TIMEOUT, MASK_RELAY_IDLE_TIMEOUT).await; assert!( total_read.load(Ordering::Relaxed) <= byte_cap, @@ -231,7 +233,7 @@ async fn light_fuzz_cap_and_payload_matrix_preserves_min_budget_invariant() { let mut reader = FinitePatternReader::new(payload, chunk, read_calls); let mut writer = CountingWriter::default(); - let outcome = copy_with_idle_timeout(&mut reader, &mut writer, cap, true).await; + let outcome = copy_with_idle_timeout(&mut reader, &mut writer, cap, true, MASK_RELAY_IDLE_TIMEOUT).await; let expected = payload.min(cap); assert_eq!( @@ -261,7 +263,7 @@ async fn stress_parallel_copy_tasks_with_production_cap_complete_without_leaks() read_calls, ); let mut writer = CountingWriter::default(); - copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await + copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true, MASK_RELAY_IDLE_TIMEOUT).await })); } diff --git a/src/proxy/tests/masking_relay_guardrails_security_tests.rs b/src/proxy/tests/masking_relay_guardrails_security_tests.rs index 257c0f8..3613c91 100644 --- a/src/proxy/tests/masking_relay_guardrails_security_tests.rs +++ b/src/proxy/tests/masking_relay_guardrails_security_tests.rs @@ -26,6 +26,7 @@ async fn relay_to_mask_enforces_masking_session_byte_cap() { 0, false, 32 * 1024, + MASK_RELAY_IDLE_TIMEOUT, ) .await; }); @@ -81,6 +82,7 @@ async fn relay_to_mask_propagates_client_half_close_without_waiting_for_other_di 0, false, 32 * 1024, + MASK_RELAY_IDLE_TIMEOUT, ) .await; }); diff --git a/src/proxy/tests/masking_security_tests.rs b/src/proxy/tests/masking_security_tests.rs index c698b55..84a0e6f 100644 --- a/src/proxy/tests/masking_security_tests.rs +++ b/src/proxy/tests/masking_security_tests.rs @@ -1377,6 +1377,7 @@ async fn relay_to_mask_keeps_backend_to_client_flow_when_client_to_backend_stall 0, false, 5 * 1024 * 1024, + MASK_RELAY_IDLE_TIMEOUT, ) .await; }); @@ -1508,6 +1509,7 @@ async fn relay_to_mask_timeout_cancels_and_drops_all_io_endpoints() { 0, false, 5 * 1024 * 1024, + MASK_RELAY_IDLE_TIMEOUT, ), ) .await; diff --git a/src/proxy/tests/masking_self_target_loop_security_tests.rs b/src/proxy/tests/masking_self_target_loop_security_tests.rs index 7f6cb29..975b4fc 100644 --- a/src/proxy/tests/masking_self_target_loop_security_tests.rs +++ b/src/proxy/tests/masking_self_target_loop_security_tests.rs @@ -228,6 +228,7 @@ async fn relay_path_idle_timeout_eviction_remains_effective() { 0, false, 5 * 1024 * 1024, + MASK_RELAY_IDLE_TIMEOUT, ) .await; diff --git a/src/proxy/tests/masking_shape_guard_adversarial_tests.rs b/src/proxy/tests/masking_shape_guard_adversarial_tests.rs index 4fa8da7..6c3c4bf 100644 --- a/src/proxy/tests/masking_shape_guard_adversarial_tests.rs +++ b/src/proxy/tests/masking_shape_guard_adversarial_tests.rs @@ -44,6 +44,7 @@ async fn run_relay_case( above_cap_blur_max_bytes, false, 5 * 1024 * 1024, + MASK_RELAY_IDLE_TIMEOUT, ) .await; }); diff --git a/src/proxy/tests/masking_shape_hardening_adversarial_tests.rs b/src/proxy/tests/masking_shape_hardening_adversarial_tests.rs index 9abf3c0..4e0aa18 100644 --- a/src/proxy/tests/masking_shape_hardening_adversarial_tests.rs +++ b/src/proxy/tests/masking_shape_hardening_adversarial_tests.rs @@ -89,6 +89,7 @@ async fn relay_to_mask_applies_cap_clamped_padding_for_non_power_of_two_cap() { 0, false, 5 * 1024 * 1024, + MASK_RELAY_IDLE_TIMEOUT, ) .await; });