This commit is contained in:
Alexey 2026-03-23 20:32:55 +03:00
parent 3ceda15073
commit 814bef9d99
No known key found for this signature in database
55 changed files with 821 additions and 385 deletions

View File

@ -228,7 +228,9 @@ impl HotFields {
me_d2c_flush_batch_max_delay_us: cfg.general.me_d2c_flush_batch_max_delay_us,
me_d2c_ack_flush_immediate: cfg.general.me_d2c_ack_flush_immediate,
me_quota_soft_overshoot_bytes: cfg.general.me_quota_soft_overshoot_bytes,
me_d2c_frame_buf_shrink_threshold_bytes: cfg.general.me_d2c_frame_buf_shrink_threshold_bytes,
me_d2c_frame_buf_shrink_threshold_bytes: cfg
.general
.me_d2c_frame_buf_shrink_threshold_bytes,
direct_relay_copy_buf_c2s_bytes: cfg.general.direct_relay_copy_buf_c2s_bytes,
direct_relay_copy_buf_s2c_bytes: cfg.general.direct_relay_copy_buf_s2c_bytes,
me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy,

View File

@ -444,8 +444,7 @@ impl ProxyConfig {
if !(5..=50).contains(&config.censorship.mask_classifier_prefetch_timeout_ms) {
return Err(ProxyError::Config(
"censorship.mask_classifier_prefetch_timeout_ms must be within [5, 50]"
.to_string(),
"censorship.mask_classifier_prefetch_timeout_ms must be within [5, 50]".to_string(),
));
}
@ -558,7 +557,9 @@ impl ProxyConfig {
));
}
if !(4096..=16 * 1024 * 1024).contains(&config.general.me_d2c_frame_buf_shrink_threshold_bytes) {
if !(4096..=16 * 1024 * 1024)
.contains(&config.general.me_d2c_frame_buf_shrink_threshold_bytes)
{
return Err(ProxyError::Config(
"general.me_d2c_frame_buf_shrink_threshold_bytes must be within [4096, 16777216]"
.to_string(),

View File

@ -8,8 +8,9 @@ fn write_temp_config(contents: &str) -> PathBuf {
.duration_since(UNIX_EPOCH)
.expect("system time must be after unix epoch")
.as_nanos();
let path = std::env::temp_dir()
.join(format!("telemt-load-mask-prefetch-timeout-security-{nonce}.toml"));
let path = std::env::temp_dir().join(format!(
"telemt-load-mask-prefetch-timeout-security-{nonce}.toml"
));
fs::write(&path, contents).expect("temp config write must succeed");
path
}
@ -67,8 +68,8 @@ mask_classifier_prefetch_timeout_ms = 20
"#,
);
let cfg = ProxyConfig::load(&path)
.expect("prefetch timeout within security bounds must be accepted");
let cfg =
ProxyConfig::load(&path).expect("prefetch timeout within security bounds must be accepted");
assert_eq!(cfg.censorship.mask_classifier_prefetch_timeout_ms, 20);
remove_temp_config(&path);

View File

@ -265,8 +265,8 @@ mask_relay_max_bytes = 67108865
"#,
);
let err = ProxyConfig::load(&path)
.expect_err("mask_relay_max_bytes above hard cap must be rejected");
let err =
ProxyConfig::load(&path).expect_err("mask_relay_max_bytes above hard cap must be rejected");
let msg = err.to_string();
assert!(
msg.contains("censorship.mask_relay_max_bytes must be <= 67108864"),

View File

@ -954,7 +954,8 @@ impl Default for GeneralConfig {
me_d2c_flush_batch_max_delay_us: default_me_d2c_flush_batch_max_delay_us(),
me_d2c_ack_flush_immediate: default_me_d2c_ack_flush_immediate(),
me_quota_soft_overshoot_bytes: default_me_quota_soft_overshoot_bytes(),
me_d2c_frame_buf_shrink_threshold_bytes: default_me_d2c_frame_buf_shrink_threshold_bytes(),
me_d2c_frame_buf_shrink_threshold_bytes:
default_me_d2c_frame_buf_shrink_threshold_bytes(),
direct_relay_copy_buf_c2s_bytes: default_direct_relay_copy_buf_c2s_bytes(),
direct_relay_copy_buf_s2c_bytes: default_direct_relay_copy_buf_s2c_bytes(),
me_warmup_stagger_enabled: default_true(),

View File

@ -7,12 +7,12 @@ mod crypto;
mod error;
mod ip_tracker;
#[cfg(test)]
#[path = "tests/ip_tracker_hotpath_adversarial_tests.rs"]
mod ip_tracker_hotpath_adversarial_tests;
#[cfg(test)]
#[path = "tests/ip_tracker_encapsulation_adversarial_tests.rs"]
mod ip_tracker_encapsulation_adversarial_tests;
#[cfg(test)]
#[path = "tests/ip_tracker_hotpath_adversarial_tests.rs"]
mod ip_tracker_hotpath_adversarial_tests;
#[cfg(test)]
#[path = "tests/ip_tracker_regression_tests.rs"]
mod ip_tracker_regression_tests;
mod maestro;

View File

@ -1233,10 +1233,7 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
out,
"# HELP telemt_me_d2c_batch_bytes_bucket_total DC->Client batch byte size buckets"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_batch_bytes_bucket_total counter"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_batch_bytes_bucket_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"0_1k\"}} {}",

View File

@ -210,7 +210,9 @@ fn should_prefetch_mask_classifier_window(initial_data: &[u8]) -> bool {
return false;
}
initial_data.iter().all(|b| b.is_ascii_alphabetic() || *b == b' ')
initial_data
.iter()
.all(|b| b.is_ascii_alphabetic() || *b == b' ')
}
#[cfg(test)]
@ -218,16 +220,19 @@ async fn extend_masking_initial_window<R>(reader: &mut R, initial_data: &mut Vec
where
R: AsyncRead + Unpin,
{
extend_masking_initial_window_with_timeout(reader, initial_data, MASK_CLASSIFIER_PREFETCH_TIMEOUT)
.await;
extend_masking_initial_window_with_timeout(
reader,
initial_data,
MASK_CLASSIFIER_PREFETCH_TIMEOUT,
)
.await;
}
async fn extend_masking_initial_window_with_timeout<R>(
reader: &mut R,
initial_data: &mut Vec<u8>,
prefetch_timeout: Duration,
)
where
) where
R: AsyncRead + Unpin,
{
if !should_prefetch_mask_classifier_window(initial_data) {

View File

@ -10,10 +10,10 @@ use rand::rngs::StdRng;
use rand::{Rng, RngExt, SeedableRng};
use std::net::{IpAddr, SocketAddr};
use std::str;
#[cfg(unix)]
use std::sync::{Mutex, OnceLock};
#[cfg(test)]
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(unix)]
use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant as StdInstant};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
@ -107,15 +107,7 @@ where
fn is_http_probe(data: &[u8]) -> bool {
// RFC 7540 section 3.5: HTTP/2 client preface starts with "PRI ".
const HTTP_METHODS: [&[u8]; 10] = [
b"GET ",
b"POST",
b"HEAD",
b"PUT ",
b"DELETE",
b"OPTIONS",
b"CONNECT",
b"TRACE",
b"PATCH",
b"GET ", b"POST", b"HEAD", b"PUT ", b"DELETE", b"OPTIONS", b"CONNECT", b"TRACE", b"PATCH",
b"PRI ",
];
@ -328,7 +320,10 @@ fn parse_mask_host_ip_literal(host: &str) -> Option<IpAddr> {
fn canonical_ip(ip: IpAddr) -> IpAddr {
match ip {
IpAddr::V6(v6) => v6.to_ipv4_mapped().map(IpAddr::V4).unwrap_or(IpAddr::V6(v6)),
IpAddr::V6(v6) => v6
.to_ipv4_mapped()
.map(IpAddr::V4)
.unwrap_or(IpAddr::V6(v6)),
IpAddr::V4(v4) => IpAddr::V4(v4),
}
}
@ -664,12 +659,20 @@ pub async fn handle_bad_client<R, W>(
Ok(Err(e)) => {
wait_mask_connect_budget_if_needed(connect_started, config).await;
debug!(error = %e, "Failed to connect to mask unix socket");
consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes).await;
consume_client_data_with_timeout_and_cap(
reader,
config.censorship.mask_relay_max_bytes,
)
.await;
wait_mask_outcome_budget(outcome_started, config).await;
}
Err(_) => {
debug!("Timeout connecting to mask unix socket");
consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes).await;
consume_client_data_with_timeout_and_cap(
reader,
config.censorship.mask_relay_max_bytes,
)
.await;
wait_mask_outcome_budget(outcome_started, config).await;
}
}
@ -698,7 +701,8 @@ pub async fn handle_bad_client<R, W>(
local = %local_addr,
"Mask target resolves to local listener; refusing self-referential masking fallback"
);
consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes).await;
consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes)
.await;
wait_mask_outcome_budget(outcome_started, config).await;
return;
}
@ -758,12 +762,20 @@ pub async fn handle_bad_client<R, W>(
Ok(Err(e)) => {
wait_mask_connect_budget_if_needed(connect_started, config).await;
debug!(error = %e, "Failed to connect to mask host");
consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes).await;
consume_client_data_with_timeout_and_cap(
reader,
config.censorship.mask_relay_max_bytes,
)
.await;
wait_mask_outcome_budget(outcome_started, config).await;
}
Err(_) => {
debug!("Timeout connecting to mask host");
consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes).await;
consume_client_data_with_timeout_and_cap(
reader,
config.censorship.mask_relay_max_bytes,
)
.await;
wait_mask_outcome_budget(outcome_started, config).await;
}
}

View File

@ -23,7 +23,9 @@ use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay,
};
use crate::stats::{MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, QuotaReserveError, Stats, UserStats};
use crate::stats::{
MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, QuotaReserveError, Stats, UserStats,
};
use crate::stream::{BufferPool, CryptoReader, CryptoWriter, PooledBuffer};
use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
@ -91,7 +93,8 @@ fn relay_idle_candidate_registry() -> &'static Mutex<RelayIdleCandidateRegistry>
RELAY_IDLE_CANDIDATE_REGISTRY.get_or_init(|| Mutex::new(RelayIdleCandidateRegistry::default()))
}
fn relay_idle_candidate_registry_lock() -> std::sync::MutexGuard<'static, RelayIdleCandidateRegistry> {
fn relay_idle_candidate_registry_lock() -> std::sync::MutexGuard<'static, RelayIdleCandidateRegistry>
{
let registry = relay_idle_candidate_registry();
match registry.lock() {
Ok(guard) => guard,
@ -1520,8 +1523,7 @@ where
}
if !idle_policy.enabled {
consecutive_zero_len_frames =
consecutive_zero_len_frames.saturating_add(1);
consecutive_zero_len_frames = consecutive_zero_len_frames.saturating_add(1);
if consecutive_zero_len_frames > LEGACY_MAX_CONSECUTIVE_ZERO_LEN_FRAMES {
stats.increment_relay_protocol_desync_close_total();
return Err(ProxyError::Proxy(
@ -1835,8 +1837,14 @@ where
MeD2cWriteMode::Coalesced
} else {
let header = [first];
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
client_writer
.write_all(&header)
.await
.map_err(ProxyError::Io)?;
client_writer
.write_all(data)
.await
.map_err(ProxyError::Io)?;
MeD2cWriteMode::Split
}
} else if len_words < (1 << 24) {
@ -1858,8 +1866,14 @@ where
MeD2cWriteMode::Coalesced
} else {
let header = [first, lw[0], lw[1], lw[2]];
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
client_writer
.write_all(&header)
.await
.map_err(ProxyError::Io)?;
client_writer
.write_all(data)
.await
.map_err(ProxyError::Io)?;
MeD2cWriteMode::Split
}
} else {
@ -1901,8 +1915,14 @@ where
MeD2cWriteMode::Coalesced
} else {
let header = len_val.to_le_bytes();
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
client_writer
.write_all(&header)
.await
.map_err(ProxyError::Io)?;
client_writer
.write_all(data)
.await
.map_err(ProxyError::Io)?;
if padding_len > 0 {
frame_buf.clear();
if frame_buf.capacity() < padding_len {

View File

@ -4,58 +4,58 @@
#![cfg_attr(test, allow(warnings))]
#![cfg_attr(not(test), forbid(clippy::undocumented_unsafe_blocks))]
#![cfg_attr(
not(test),
deny(
clippy::unwrap_used,
clippy::expect_used,
clippy::panic,
clippy::todo,
clippy::unimplemented,
clippy::correctness,
clippy::option_if_let_else,
clippy::or_fun_call,
clippy::branches_sharing_code,
clippy::single_option_map,
clippy::useless_let_if_seq,
clippy::redundant_locals,
clippy::cloned_ref_to_slice_refs,
unsafe_code,
clippy::await_holding_lock,
clippy::await_holding_refcell_ref,
clippy::debug_assert_with_mut_call,
clippy::macro_use_imports,
clippy::cast_ptr_alignment,
clippy::cast_lossless,
clippy::ptr_as_ptr,
clippy::large_stack_arrays,
clippy::same_functions_in_if_condition,
trivial_casts,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
rust_2018_idioms
)
not(test),
deny(
clippy::unwrap_used,
clippy::expect_used,
clippy::panic,
clippy::todo,
clippy::unimplemented,
clippy::correctness,
clippy::option_if_let_else,
clippy::or_fun_call,
clippy::branches_sharing_code,
clippy::single_option_map,
clippy::useless_let_if_seq,
clippy::redundant_locals,
clippy::cloned_ref_to_slice_refs,
unsafe_code,
clippy::await_holding_lock,
clippy::await_holding_refcell_ref,
clippy::debug_assert_with_mut_call,
clippy::macro_use_imports,
clippy::cast_ptr_alignment,
clippy::cast_lossless,
clippy::ptr_as_ptr,
clippy::large_stack_arrays,
clippy::same_functions_in_if_condition,
trivial_casts,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
rust_2018_idioms
)
)]
#![cfg_attr(
not(test),
allow(
clippy::use_self,
clippy::redundant_closure,
clippy::too_many_arguments,
clippy::doc_markdown,
clippy::missing_const_for_fn,
clippy::unnecessary_operation,
clippy::redundant_pub_crate,
clippy::derive_partial_eq_without_eq,
clippy::type_complexity,
clippy::new_ret_no_self,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::significant_drop_tightening,
clippy::significant_drop_in_scrutinee,
clippy::float_cmp,
clippy::nursery
)
not(test),
allow(
clippy::use_self,
clippy::redundant_closure,
clippy::too_many_arguments,
clippy::doc_markdown,
clippy::missing_const_for_fn,
clippy::unnecessary_operation,
clippy::redundant_pub_crate,
clippy::derive_partial_eq_without_eq,
clippy::type_complexity,
clippy::new_ret_no_self,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::significant_drop_tightening,
clippy::significant_drop_in_scrutinee,
clippy::float_cmp,
clippy::nursery
)
)]
pub mod adaptive_buffers;

View File

@ -56,8 +56,8 @@ use crate::stats::{Stats, UserStats};
use crate::stream::BufferPool;
use std::io;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf, copy_bidirectional_with_sizes};
@ -272,12 +272,10 @@ const QUOTA_ADAPTIVE_INTERVAL_MAX_BYTES: u64 = 64 * 1024;
#[inline]
fn quota_adaptive_interval_bytes(remaining_before: u64) -> u64 {
remaining_before
.saturating_div(2)
.clamp(
QUOTA_ADAPTIVE_INTERVAL_MIN_BYTES,
QUOTA_ADAPTIVE_INTERVAL_MAX_BYTES,
)
remaining_before.saturating_div(2).clamp(
QUOTA_ADAPTIVE_INTERVAL_MIN_BYTES,
QUOTA_ADAPTIVE_INTERVAL_MAX_BYTES,
)
}
#[inline]

View File

@ -1,5 +1,5 @@
use super::*;
use crate::config::{UpstreamConfig, UpstreamType, ProxyConfig};
use crate::config::{ProxyConfig, UpstreamConfig, UpstreamType};
use crate::protocol::constants::{MAX_TLS_PLAINTEXT_SIZE, MIN_TLS_CLIENT_HELLO_SIZE};
use crate::stats::Stats;
use crate::transport::UpstreamManager;
@ -41,7 +41,9 @@ fn edge_handshake_timeout_with_mask_grace_saturating_add_prevents_overflow() {
#[test]
fn edge_tls_clienthello_len_in_bounds_exact_boundaries() {
assert!(tls_clienthello_len_in_bounds(MIN_TLS_CLIENT_HELLO_SIZE));
assert!(!tls_clienthello_len_in_bounds(MIN_TLS_CLIENT_HELLO_SIZE - 1));
assert!(!tls_clienthello_len_in_bounds(
MIN_TLS_CLIENT_HELLO_SIZE - 1
));
assert!(tls_clienthello_len_in_bounds(MAX_TLS_PLAINTEXT_SIZE));
assert!(!tls_clienthello_len_in_bounds(MAX_TLS_PLAINTEXT_SIZE + 1));
}
@ -87,7 +89,15 @@ async fn adversarial_tls_handshake_timeout_during_masking_delay() {
"198.51.100.1:55000".parse().unwrap(),
config,
stats.clone(),
Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())),
Arc::new(UpstreamManager::new(
vec![],
1,
1,
1,
1,
false,
stats.clone(),
)),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),
@ -99,7 +109,10 @@ async fn adversarial_tls_handshake_timeout_during_masking_delay() {
false,
));
client_side.write_all(&[0x16, 0x03, 0x01, 0xFF, 0xFF]).await.unwrap();
client_side
.write_all(&[0x16, 0x03, 0x01, 0xFF, 0xFF])
.await
.unwrap();
let result = tokio::time::timeout(Duration::from_secs(4), handle)
.await
@ -123,7 +136,15 @@ async fn blackhat_proxy_protocol_slowloris_timeout() {
"198.51.100.2:55000".parse().unwrap(),
config,
stats.clone(),
Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())),
Arc::new(UpstreamManager::new(
vec![],
1,
1,
1,
1,
false,
stats.clone(),
)),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),
@ -167,7 +188,15 @@ async fn negative_proxy_protocol_enabled_but_client_sends_tls_hello() {
"198.51.100.3:55000".parse().unwrap(),
config,
stats.clone(),
Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())),
Arc::new(UpstreamManager::new(
vec![],
1,
1,
1,
1,
false,
stats.clone(),
)),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),
@ -179,7 +208,10 @@ async fn negative_proxy_protocol_enabled_but_client_sends_tls_hello() {
true,
));
client_side.write_all(&[0x16, 0x03, 0x01, 0x02, 0x00]).await.unwrap();
client_side
.write_all(&[0x16, 0x03, 0x01, 0x02, 0x00])
.await
.unwrap();
let result = tokio::time::timeout(Duration::from_secs(2), handle)
.await
@ -202,7 +234,15 @@ async fn edge_client_stream_exactly_4_bytes_eof() {
"198.51.100.4:55000".parse().unwrap(),
config,
stats.clone(),
Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())),
Arc::new(UpstreamManager::new(
vec![],
1,
1,
1,
1,
false,
stats.clone(),
)),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),
@ -214,7 +254,10 @@ async fn edge_client_stream_exactly_4_bytes_eof() {
false,
));
client_side.write_all(&[0x16, 0x03, 0x01, 0x00]).await.unwrap();
client_side
.write_all(&[0x16, 0x03, 0x01, 0x00])
.await
.unwrap();
client_side.shutdown().await.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
@ -234,7 +277,15 @@ async fn edge_client_stream_tls_header_valid_but_body_1_byte_short_eof() {
"198.51.100.5:55000".parse().unwrap(),
config,
stats.clone(),
Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())),
Arc::new(UpstreamManager::new(
vec![],
1,
1,
1,
1,
false,
stats.clone(),
)),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),
@ -246,7 +297,10 @@ async fn edge_client_stream_tls_header_valid_but_body_1_byte_short_eof() {
false,
));
client_side.write_all(&[0x16, 0x03, 0x01, 0x00, 100]).await.unwrap();
client_side
.write_all(&[0x16, 0x03, 0x01, 0x00, 100])
.await
.unwrap();
client_side.write_all(&vec![0x41; 99]).await.unwrap();
client_side.shutdown().await.unwrap();
@ -269,7 +323,15 @@ async fn integration_non_tls_modes_disabled_immediately_masks() {
"198.51.100.6:55000".parse().unwrap(),
config,
stats.clone(),
Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())),
Arc::new(UpstreamManager::new(
vec![],
1,
1,
1,
1,
false,
stats.clone(),
)),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),
@ -372,11 +434,7 @@ async fn stress_user_connection_reservation_concurrent_same_ip_exhaustion() {
let ip_tracker = ip_tracker.clone();
tasks.spawn(async move {
RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats,
peer,
ip_tracker,
user, &config, stats, peer, ip_tracker,
)
.await
});

View File

@ -42,7 +42,15 @@ async fn invariant_tls_clienthello_truncation_exact_boundary_triggers_masking()
"198.51.100.20:55000".parse().unwrap(),
config,
stats.clone(),
Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())),
Arc::new(UpstreamManager::new(
vec![],
1,
1,
1,
1,
false,
stats.clone(),
)),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),
@ -65,7 +73,9 @@ async fn invariant_tls_clienthello_truncation_exact_boundary_triggers_masking()
.unwrap();
client_side.shutdown().await.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(2), handler).await.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(2), handler)
.await
.unwrap();
assert_eq!(stats.get_connects_bad(), 1);
}
@ -73,7 +83,10 @@ async fn invariant_tls_clienthello_truncation_exact_boundary_triggers_masking()
async fn invariant_acquire_reservation_ip_limit_rollback() {
let user = "rollback-test-user";
let mut config = ProxyConfig::default();
config.access.user_max_tcp_conns.insert(user.to_string(), 10);
config
.access
.user_max_tcp_conns
.insert(user.to_string(), 10);
let stats = Arc::new(Stats::new());
let ip_tracker = Arc::new(UserIpTracker::new());
@ -159,7 +172,15 @@ async fn invariant_direct_mode_partial_header_eof_is_error_not_bad_connect() {
"198.51.100.25:55000".parse().unwrap(),
config,
stats.clone(),
Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())),
Arc::new(UpstreamManager::new(
vec![],
1,
1,
1,
1,
false,
stats.clone(),
)),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),

View File

@ -100,14 +100,7 @@ async fn run_http2_fragment_case(split_at: usize, delay_ms: u64, peer: SocketAdd
#[tokio::test]
async fn http2_preface_fragmentation_matrix_is_classified_and_forwarded() {
let cases = [
(2usize, 0u64),
(3, 0),
(4, 0),
(2, 7),
(3, 7),
(8, 1),
];
let cases = [(2usize, 0u64), (3, 0), (4, 0), (2, 7), (3, 7), (8, 1)];
for (i, (split_at, delay_ms)) in cases.into_iter().enumerate() {
let peer: SocketAddr = format!("198.51.100.{}:58{}", 140 + i, 100 + i)

View File

@ -29,7 +29,10 @@ async fn configured_prefetch_budget_20ms_recovers_tail_delayed_15ms() {
.write_all(b"ONNECT example.org:443 HTTP/1.1\r\n")
.await
.expect("tail bytes must be writable");
writer.shutdown().await.expect("writer shutdown must succeed");
writer
.shutdown()
.await
.expect("writer shutdown must succeed");
});
let mut initial_data = b"C".to_vec();
@ -60,7 +63,10 @@ async fn configured_prefetch_budget_5ms_misses_tail_delayed_15ms() {
.write_all(b"ONNECT example.org:443 HTTP/1.1\r\n")
.await
.expect("tail bytes must be writable");
writer.shutdown().await.expect("writer shutdown must succeed");
writer
.shutdown()
.await
.expect("writer shutdown must succeed");
});
let mut initial_data = b"C".to_vec();

View File

@ -245,7 +245,10 @@ async fn blackhat_integration_empty_initial_data_path_is_byte_exact_and_eof_clea
assert_eq!(head[0], 0x16);
read_and_discard_tls_record_body(&mut client_side, head).await;
client_side.write_all(&invalid_mtproto_record).await.unwrap();
client_side
.write_all(&invalid_mtproto_record)
.await
.unwrap();
client_side.write_all(&trailing_record).await.unwrap();
client_side.shutdown().await.unwrap();

View File

@ -7,7 +7,9 @@ async fn run_strict_prefetch_case(prefetch_ms: u64, tail_delay_ms: u64) -> Vec<u
let writer_task = tokio::spawn(async move {
sleep(Duration::from_millis(tail_delay_ms)).await;
let _ = writer.write_all(b"ONNECT example.org:443 HTTP/1.1\r\n").await;
let _ = writer
.write_all(b"ONNECT example.org:443 HTTP/1.1\r\n")
.await;
let _ = writer.shutdown().await;
});

View File

@ -35,7 +35,10 @@ async fn run_prefetch_budget_case(prefetch_budget_ms: u64, delayed_tail_ms: u64)
.write_all(b"ONNECT example.org:443 HTTP/1.1\r\n")
.await
.expect("tail bytes must be writable");
writer.shutdown().await.expect("writer shutdown must succeed");
writer
.shutdown()
.await
.expect("writer shutdown must succeed");
});
let mut initial_data = b"C".to_vec();

View File

@ -67,9 +67,10 @@ async fn run_replay_candidate_session(
cfg.censorship.mask_port = 1;
cfg.censorship.mask_timing_normalization_enabled = false;
cfg.access.ignore_time_skew = true;
cfg.access
.users
.insert("user".to_string(), "abababababababababababababababab".to_string());
cfg.access.users.insert(
"user".to_string(),
"abababababababababababababababab".to_string(),
);
let config = Arc::new(cfg);
let stats = Arc::new(Stats::new());
@ -99,7 +100,10 @@ async fn run_replay_candidate_session(
if drive_mtproto_fail {
let mut server_hello_head = [0u8; 5];
client_side.read_exact(&mut server_hello_head).await.unwrap();
client_side
.read_exact(&mut server_hello_head)
.await
.unwrap();
assert_eq!(server_hello_head[0], 0x16);
let body_len = u16::from_be_bytes([server_hello_head[3], server_hello_head[4]]) as usize;
let mut body = vec![0u8; body_len];
@ -110,7 +114,10 @@ async fn run_replay_candidate_session(
invalid_mtproto_record.extend_from_slice(&TLS_VERSION);
invalid_mtproto_record.extend_from_slice(&(HANDSHAKE_LEN as u16).to_be_bytes());
invalid_mtproto_record.extend_from_slice(&vec![0u8; HANDSHAKE_LEN]);
client_side.write_all(&invalid_mtproto_record).await.unwrap();
client_side
.write_all(&invalid_mtproto_record)
.await
.unwrap();
client_side
.write_all(b"GET /replay-fallback HTTP/1.1\r\nHost: x\r\n\r\n")
.await
@ -154,8 +161,7 @@ async fn replay_reject_still_honors_masking_timing_budget() {
.await;
assert!(
replay_elapsed >= Duration::from_millis(40)
&& replay_elapsed < Duration::from_millis(250),
replay_elapsed >= Duration::from_millis(40) && replay_elapsed < Duration::from_millis(250),
"replay rejection path must still satisfy masking timing budget without unbounded DB/CPU delay"
);
}

View File

@ -53,11 +53,7 @@ async fn boundary_user_data_quota_exact_match_rejects() {
let peer = "198.51.100.10:55000".parse().unwrap();
let result = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats,
peer,
ip_tracker,
user, &config, stats, peer, ip_tracker,
)
.await;
@ -79,11 +75,7 @@ async fn boundary_user_expiration_in_past_rejects() {
let peer = "198.51.100.11:55000".parse().unwrap();
let result = RunningClientHandler::acquire_user_connection_reservation_static(
user,
&config,
stats,
peer,
ip_tracker,
user, &config, stats, peer, ip_tracker,
)
.await;
@ -103,7 +95,15 @@ async fn blackhat_proxy_protocol_massive_garbage_rejected_quickly() {
"198.51.100.12:55000".parse().unwrap(),
config,
stats.clone(),
Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())),
Arc::new(UpstreamManager::new(
vec![],
1,
1,
1,
1,
false,
stats.clone(),
)),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),
@ -141,7 +141,15 @@ async fn edge_tls_body_immediate_eof_triggers_masking_and_bad_connect() {
"198.51.100.13:55000".parse().unwrap(),
config,
stats.clone(),
Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())),
Arc::new(UpstreamManager::new(
vec![],
1,
1,
1,
1,
false,
stats.clone(),
)),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),
@ -153,10 +161,15 @@ async fn edge_tls_body_immediate_eof_triggers_masking_and_bad_connect() {
false,
));
client_side.write_all(&[0x16, 0x03, 0x01, 0x00, 100]).await.unwrap();
client_side
.write_all(&[0x16, 0x03, 0x01, 0x00, 100])
.await
.unwrap();
client_side.shutdown().await.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(2), handler).await.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(2), handler)
.await
.unwrap();
assert_eq!(stats.get_connects_bad(), 1);
}
@ -177,7 +190,15 @@ async fn security_classic_mode_disabled_masks_valid_length_payload() {
"198.51.100.15:55000".parse().unwrap(),
config,
stats.clone(),
Arc::new(UpstreamManager::new(vec![], 1, 1, 1, 1, false, stats.clone())),
Arc::new(UpstreamManager::new(
vec![],
1,
1,
1,
1,
false,
stats.clone(),
)),
Arc::new(ReplayChecker::new(128, Duration::from_secs(60))),
Arc::new(BufferPool::new()),
Arc::new(SecureRandom::new()),
@ -192,7 +213,9 @@ async fn security_classic_mode_disabled_masks_valid_length_payload() {
client_side.write_all(&vec![0xEF; 64]).await.unwrap();
client_side.shutdown().await.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(2), handler).await.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(2), handler)
.await
.unwrap();
assert_eq!(stats.get_connects_bad(), 1);
}
@ -200,7 +223,10 @@ async fn security_classic_mode_disabled_masks_valid_length_payload() {
async fn concurrency_ip_tracker_strict_limit_one_rapid_churn() {
let user = "rapid-churn-user";
let mut config = ProxyConfig::default();
config.access.user_max_tcp_conns.insert(user.to_string(), 10);
config
.access
.user_max_tcp_conns
.insert(user.to_string(), 10);
let stats = Arc::new(Stats::new());
let ip_tracker = Arc::new(UserIpTracker::new());

View File

@ -7,9 +7,9 @@ use crate::protocol::tls;
use crate::proxy::handshake::HandshakeSuccess;
use crate::stream::{CryptoReader, CryptoWriter};
use crate::transport::proxy_protocol::ProxyProtocolV1Builder;
use rand::rngs::StdRng;
use rand::Rng;
use rand::SeedableRng;
use rand::rngs::StdRng;
use std::net::Ipv4Addr;
use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex};
use tokio::net::{TcpListener, TcpStream};
@ -34,7 +34,10 @@ fn handshake_timeout_with_mask_grace_includes_mask_margin() {
config.timeouts.client_handshake = 2;
config.censorship.mask = false;
assert_eq!(handshake_timeout_with_mask_grace(&config), Duration::from_secs(2));
assert_eq!(
handshake_timeout_with_mask_grace(&config),
Duration::from_secs(2)
);
config.censorship.mask = true;
assert_eq!(
@ -86,7 +89,10 @@ impl tokio::io::AsyncRead for ErrorReader {
_cx: &mut std::task::Context<'_>,
_buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
std::task::Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "fake error")))
std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"fake error",
)))
}
}
@ -124,7 +130,10 @@ fn handshake_timeout_without_mask_is_exact_base() {
config.timeouts.client_handshake = 7;
config.censorship.mask = false;
assert_eq!(handshake_timeout_with_mask_grace(&config), Duration::from_secs(7));
assert_eq!(
handshake_timeout_with_mask_grace(&config),
Duration::from_secs(7)
);
}
#[test]
@ -133,7 +142,10 @@ fn handshake_timeout_mask_enabled_adds_750ms() {
config.timeouts.client_handshake = 3;
config.censorship.mask = true;
assert_eq!(handshake_timeout_with_mask_grace(&config), Duration::from_millis(3750));
assert_eq!(
handshake_timeout_with_mask_grace(&config),
Duration::from_millis(3750)
);
}
#[tokio::test]
@ -155,10 +167,12 @@ async fn read_with_progress_fragmented_io_works_over_multiple_calls() {
let mut b = vec![0u8; chunk_size];
let n = read_with_progress(&mut cursor, &mut b).await.unwrap();
result.extend_from_slice(&b[..n]);
if n == 0 { break; }
if n == 0 {
break;
}
}
assert_eq!(result, vec![1,2,3,4,5]);
assert_eq!(result, vec![1, 2, 3, 4, 5]);
}
#[tokio::test]
@ -174,7 +188,9 @@ async fn read_with_progress_stress_randomized_chunk_sizes() {
let mut b = vec![0u8; chunk];
let read = read_with_progress(&mut cursor, &mut b).await.unwrap();
collected.extend_from_slice(&b[..read]);
if read == 0 { break; }
if read == 0 {
break;
}
}
assert_eq!(collected, input);
@ -215,10 +231,12 @@ fn wrap_tls_application_record_roundtrip_size_check() {
let mut consumed = 0;
while idx + 5 <= wrapped.len() {
assert_eq!(wrapped[idx], 0x17);
let len = u16::from_be_bytes([wrapped[idx+3], wrapped[idx+4]]) as usize;
let len = u16::from_be_bytes([wrapped[idx + 3], wrapped[idx + 4]]) as usize;
consumed += len;
idx += 5 + len;
if idx >= wrapped.len() { break; }
if idx >= wrapped.len() {
break;
}
}
assert_eq!(consumed, payload_len);

View File

@ -25,13 +25,26 @@ fn wrap_tls_application_record_oversized_payload_is_chunked_without_truncation()
let len = u16::from_be_bytes([record[offset + 3], record[offset + 4]]) as usize;
let body_start = offset + 5;
let body_end = body_start + len;
assert!(body_end <= record.len(), "declared TLS record length must be in-bounds");
assert!(
body_end <= record.len(),
"declared TLS record length must be in-bounds"
);
recovered.extend_from_slice(&record[body_start..body_end]);
offset = body_end;
frames += 1;
}
assert_eq!(offset, record.len(), "record parser must consume exact output size");
assert_eq!(frames, 2, "oversized payload should split into exactly two records");
assert_eq!(recovered, payload, "chunked records must preserve full payload");
assert_eq!(
offset,
record.len(),
"record parser must consume exact output size"
);
assert_eq!(
frames, 2,
"oversized payload should split into exactly two records"
);
assert_eq!(
recovered, payload,
"chunked records must preserve full payload"
);
}

View File

@ -773,8 +773,7 @@ fn anchored_open_nix_path_writes_expected_lines() {
"target/telemt-unknown-dc-anchored-open-ok-{}/unknown-dc.log",
std::process::id()
);
let sanitized =
sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize");
let sanitized = sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize");
let _ = fs::remove_file(&sanitized.resolved_path);
let mut first = open_unknown_dc_log_append_anchored(&sanitized)
@ -787,7 +786,10 @@ fn anchored_open_nix_path_writes_expected_lines() {
let content =
fs::read_to_string(&sanitized.resolved_path).expect("anchored log file must be readable");
let lines: Vec<&str> = content.lines().filter(|line| !line.trim().is_empty()).collect();
let lines: Vec<&str> = content
.lines()
.filter(|line| !line.trim().is_empty())
.collect();
assert_eq!(lines.len(), 2, "expected one line per anchored append call");
assert!(
lines.contains(&"dc_idx=31200") && lines.contains(&"dc_idx=31201"),
@ -811,8 +813,7 @@ fn anchored_open_parallel_appends_preserve_line_integrity() {
"target/telemt-unknown-dc-anchored-open-parallel-{}/unknown-dc.log",
std::process::id()
);
let sanitized =
sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize");
let sanitized = sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize");
let _ = fs::remove_file(&sanitized.resolved_path);
let mut workers = Vec::new();
@ -831,8 +832,15 @@ fn anchored_open_parallel_appends_preserve_line_integrity() {
let content =
fs::read_to_string(&sanitized.resolved_path).expect("parallel log file must be readable");
let lines: Vec<&str> = content.lines().filter(|line| !line.trim().is_empty()).collect();
assert_eq!(lines.len(), 64, "expected one complete line per worker append");
let lines: Vec<&str> = content
.lines()
.filter(|line| !line.trim().is_empty())
.collect();
assert_eq!(
lines.len(),
64,
"expected one complete line per worker append"
);
for line in lines {
assert!(
line.starts_with("dc_idx="),
@ -867,8 +875,7 @@ fn anchored_open_creates_private_0600_file_permissions() {
"target/telemt-unknown-dc-anchored-perms-{}/unknown-dc.log",
std::process::id()
);
let sanitized =
sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize");
let sanitized = sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize");
let _ = fs::remove_file(&sanitized.resolved_path);
let mut file = open_unknown_dc_log_append_anchored(&sanitized)
@ -905,8 +912,7 @@ fn anchored_open_rejects_existing_symlink_target() {
"target/telemt-unknown-dc-anchored-symlink-target-{}/unknown-dc.log",
std::process::id()
);
let sanitized =
sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize");
let sanitized = sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize");
let outside = std::env::temp_dir().join(format!(
"telemt-unknown-dc-anchored-symlink-outside-{}.log",
@ -943,8 +949,7 @@ fn anchored_open_high_contention_multi_write_preserves_complete_lines() {
"target/telemt-unknown-dc-anchored-contention-{}/unknown-dc.log",
std::process::id()
);
let sanitized =
sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize");
let sanitized = sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize");
let _ = fs::remove_file(&sanitized.resolved_path);
let workers = 24usize;
@ -970,7 +975,10 @@ fn anchored_open_high_contention_multi_write_preserves_complete_lines() {
let content = fs::read_to_string(&sanitized.resolved_path)
.expect("contention output file must be readable");
let lines: Vec<&str> = content.lines().filter(|line| !line.trim().is_empty()).collect();
let lines: Vec<&str> = content
.lines()
.filter(|line| !line.trim().is_empty())
.collect();
assert_eq!(
lines.len(),
workers * rounds,
@ -1014,8 +1022,7 @@ fn append_unknown_dc_line_returns_error_for_read_only_descriptor() {
"target/telemt-unknown-dc-append-ro-{}/unknown-dc.log",
std::process::id()
);
let sanitized =
sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize");
let sanitized = sanitize_unknown_dc_log_path(&rel_candidate).expect("candidate must sanitize");
fs::write(&sanitized.resolved_path, "seed\n").expect("seed file must be writable");
let mut readonly = std::fs::OpenOptions::new()

View File

@ -1,5 +1,5 @@
use super::*;
use crate::crypto::{sha256, sha256_hmac, AesCtr};
use crate::crypto::{AesCtr, sha256, sha256_hmac};
use crate::protocol::constants::{ProtoTag, RESERVED_NONCE_BEGINNINGS, RESERVED_NONCE_FIRST_BYTES};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
@ -175,7 +175,10 @@ async fn tls_minimum_viable_length_boundary() {
None,
)
.await;
assert!(matches!(res, HandshakeResult::Success(_)), "Exact minimum length TLS handshake must succeed");
assert!(
matches!(res, HandshakeResult::Success(_)),
"Exact minimum length TLS handshake must succeed"
);
let short_handshake = vec![0x42u8; min_len - 1];
let res_short = handle_tls_handshake(
@ -189,7 +192,10 @@ async fn tls_minimum_viable_length_boundary() {
None,
)
.await;
assert!(matches!(res_short, HandshakeResult::BadClient { .. }), "Handshake 1 byte shorter than minimum must fail closed");
assert!(
matches!(res_short, HandshakeResult::BadClient { .. }),
"Handshake 1 byte shorter than minimum must fail closed"
);
}
#[tokio::test]
@ -219,9 +225,16 @@ async fn mtproto_extreme_dc_index_serialization() {
match res {
HandshakeResult::Success((_, _, success)) => {
assert_eq!(success.dc_idx, extreme_dc, "Extreme DC index {} must serialize/deserialize perfectly", extreme_dc);
assert_eq!(
success.dc_idx, extreme_dc,
"Extreme DC index {} must serialize/deserialize perfectly",
extreme_dc
);
}
_ => panic!("MTProto handshake with extreme DC index {} failed", extreme_dc),
_ => panic!(
"MTProto handshake with extreme DC index {} failed",
extreme_dc
),
}
}
}
@ -253,7 +266,11 @@ async fn alpn_strict_case_and_padding_rejection() {
None,
)
.await;
assert!(matches!(res, HandshakeResult::BadClient { .. }), "ALPN strict enforcement must reject {:?}", bad_alpn);
assert!(
matches!(res, HandshakeResult::BadClient { .. }),
"ALPN strict enforcement must reject {:?}",
bad_alpn
);
}
}
@ -265,8 +282,15 @@ fn ipv4_mapped_ipv6_bucketing_anomaly() {
let norm_1 = normalize_auth_probe_ip(ipv4_mapped_1);
let norm_2 = normalize_auth_probe_ip(ipv4_mapped_2);
assert_eq!(norm_1, norm_2, "IPv4-mapped IPv6 addresses must collapse into the same /64 bucket (::0)");
assert_eq!(norm_1, IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), "The bucket must be exactly ::0");
assert_eq!(
norm_1, norm_2,
"IPv4-mapped IPv6 addresses must collapse into the same /64 bucket (::0)"
);
assert_eq!(
norm_1,
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)),
"The bucket must be exactly ::0"
);
}
// --- Category 2: Adversarial & Black Hat ---
@ -309,7 +333,10 @@ async fn mtproto_invalid_ciphertext_does_not_poison_replay_cache() {
None,
)
.await;
assert!(matches!(res_valid, HandshakeResult::Success(_)), "Invalid MTProto ciphertext must not poison the replay cache");
assert!(
matches!(res_valid, HandshakeResult::Success(_)),
"Invalid MTProto ciphertext must not poison the replay cache"
);
}
#[tokio::test]
@ -352,7 +379,10 @@ async fn tls_invalid_session_does_not_poison_replay_cache() {
None,
)
.await;
assert!(matches!(res_valid, HandshakeResult::Success(_)), "Invalid TLS payload must not poison the replay cache");
assert!(
matches!(res_valid, HandshakeResult::Success(_)),
"Invalid TLS payload must not poison the replay cache"
);
}
#[tokio::test]
@ -387,7 +417,10 @@ async fn server_hello_delay_timing_neutrality_on_hmac_failure() {
let elapsed = start.elapsed();
assert!(matches!(res, HandshakeResult::BadClient { .. }));
assert!(elapsed >= Duration::from_millis(45), "Invalid HMAC must still incur the configured ServerHello delay to prevent timing side-channels");
assert!(
elapsed >= Duration::from_millis(45),
"Invalid HMAC must still incur the configured ServerHello delay to prevent timing side-channels"
);
}
#[tokio::test]
@ -421,7 +454,10 @@ async fn server_hello_delay_inversion_resilience() {
let elapsed = start.elapsed();
assert!(matches!(res, HandshakeResult::Success(_)));
assert!(elapsed >= Duration::from_millis(90), "Delay logic must gracefully handle min > max inversions via max.max(min)");
assert!(
elapsed >= Duration::from_millis(90),
"Delay logic must gracefully handle min > max inversions via max.max(min)"
);
}
#[tokio::test]
@ -436,10 +472,16 @@ async fn mixed_valid_and_invalid_user_secrets_configuration() {
for i in 0..9 {
let bad_secret = if i % 2 == 0 { "badhex!" } else { "1122" };
config.access.users.insert(format!("bad_user_{}", i), bad_secret.to_string());
config
.access
.users
.insert(format!("bad_user_{}", i), bad_secret.to_string());
}
let valid_secret_hex = "99999999999999999999999999999999";
config.access.users.insert("good_user".to_string(), valid_secret_hex.to_string());
config
.access
.users
.insert("good_user".to_string(), valid_secret_hex.to_string());
config.general.modes.secure = true;
config.general.modes.classic = true;
config.general.modes.tls = true;
@ -463,7 +505,10 @@ async fn mixed_valid_and_invalid_user_secrets_configuration() {
)
.await;
assert!(matches!(res, HandshakeResult::Success(_)), "Proxy must gracefully skip invalid secrets and authenticate the valid one");
assert!(
matches!(res, HandshakeResult::Success(_)),
"Proxy must gracefully skip invalid secrets and authenticate the valid one"
);
}
#[tokio::test]
@ -494,7 +539,10 @@ async fn tls_emulation_fallback_when_cache_missing() {
)
.await;
assert!(matches!(res, HandshakeResult::Success(_)), "TLS emulation must gracefully fall back to standard ServerHello if cache is missing");
assert!(
matches!(res, HandshakeResult::Success(_)),
"TLS emulation must gracefully fall back to standard ServerHello if cache is missing"
);
}
#[tokio::test]
@ -524,7 +572,10 @@ async fn classic_mode_over_tls_transport_protocol_confusion() {
)
.await;
assert!(matches!(res, HandshakeResult::Success(_)), "Intermediate tag over TLS must succeed if classic mode is enabled, locking in cross-transport behavior");
assert!(
matches!(res, HandshakeResult::Success(_)),
"Intermediate tag over TLS must succeed if classic mode is enabled, locking in cross-transport behavior"
);
}
#[test]
@ -543,9 +594,15 @@ fn generate_tg_nonce_never_emits_reserved_bytes() {
false,
);
assert!(!RESERVED_NONCE_FIRST_BYTES.contains(&nonce[0]), "Nonce must never start with reserved bytes");
assert!(
!RESERVED_NONCE_FIRST_BYTES.contains(&nonce[0]),
"Nonce must never start with reserved bytes"
);
let first_four: [u8; 4] = [nonce[0], nonce[1], nonce[2], nonce[3]];
assert!(!RESERVED_NONCE_BEGINNINGS.contains(&first_four), "Nonce must never match reserved 4-byte beginnings");
assert!(
!RESERVED_NONCE_BEGINNINGS.contains(&first_four),
"Nonce must never match reserved 4-byte beginnings"
);
}
}
@ -568,11 +625,18 @@ async fn dashmap_concurrent_saturation_stress() {
}
for task in tasks {
task.await.expect("Task panicked during concurrent DashMap stress");
task.await
.expect("Task panicked during concurrent DashMap stress");
}
assert!(auth_probe_is_throttled_for_testing(ip_a), "IP A must be throttled after concurrent stress");
assert!(auth_probe_is_throttled_for_testing(ip_b), "IP B must be throttled after concurrent stress");
assert!(
auth_probe_is_throttled_for_testing(ip_a),
"IP A must be throttled after concurrent stress"
);
assert!(
auth_probe_is_throttled_for_testing(ip_b),
"IP B must be throttled after concurrent stress"
);
}
#[test]
@ -586,7 +650,12 @@ fn prototag_invalid_bytes_fail_closed() {
];
for tag in invalid_tags {
assert_eq!(ProtoTag::from_bytes(tag), None, "Invalid ProtoTag bytes {:?} must fail closed", tag);
assert_eq!(
ProtoTag::from_bytes(tag),
None,
"Invalid ProtoTag bytes {:?} must fail closed",
tag
);
}
}
@ -603,7 +672,10 @@ fn auth_probe_eviction_hash_collision_stress() {
auth_probe_record_failure_with_state(state, ip, now);
}
assert!(state.len() <= AUTH_PROBE_TRACK_MAX_ENTRIES, "Eviction logic must successfully bound the map size under heavy insertion stress");
assert!(
state.len() <= AUTH_PROBE_TRACK_MAX_ENTRIES,
"Eviction logic must successfully bound the map size under heavy insertion stress"
);
}
#[test]

View File

@ -88,6 +88,9 @@ fn light_fuzz_offset_always_stays_inside_state_len() {
let now = base + Duration::from_nanos(seed & 0x0fff);
let start = auth_probe_scan_start_offset(ip, now, state_len, scan_limit);
assert!(start < state_len, "scan offset must stay inside state length");
assert!(
start < state_len,
"scan offset must stay inside state length"
);
}
}
}

View File

@ -96,4 +96,4 @@ fn light_fuzz_scan_offset_budget_never_exceeds_effective_window() {
"scan offset must stay inside state length"
);
}
}
}

View File

@ -113,4 +113,4 @@ fn light_fuzz_scan_offset_stays_within_window_for_randomized_inputs() {
"scan offset must always remain inside state length"
);
}
}
}

View File

@ -1,8 +1,8 @@
use super::*;
use crate::crypto::{sha256, sha256_hmac, AesCtr};
use crate::crypto::{AesCtr, sha256, sha256_hmac};
use crate::protocol::constants::{ProtoTag, RESERVED_NONCE_BEGINNINGS, RESERVED_NONCE_FIRST_BYTES};
use rand::{Rng, SeedableRng};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::collections::HashSet;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
@ -223,7 +223,10 @@ fn auth_probe_backoff_extreme_fail_streak_clamps_safely() {
assert_eq!(updated.fail_streak, u32::MAX);
let expected_blocked_until = now + Duration::from_millis(AUTH_PROBE_BACKOFF_MAX_MS);
assert_eq!(updated.blocked_until, expected_blocked_until, "Extreme fail streak must clamp cleanly to AUTH_PROBE_BACKOFF_MAX_MS");
assert_eq!(
updated.blocked_until, expected_blocked_until,
"Extreme fail streak must clamp cleanly to AUTH_PROBE_BACKOFF_MAX_MS"
);
}
#[test]
@ -250,12 +253,19 @@ fn generate_tg_nonce_cryptographic_uniqueness_and_entropy() {
total_set_bits += byte.count_ones() as usize;
}
assert!(nonces.insert(nonce), "generate_tg_nonce emitted a duplicate nonce! RNG is stuck.");
assert!(
nonces.insert(nonce),
"generate_tg_nonce emitted a duplicate nonce! RNG is stuck."
);
}
let total_bits = iterations * HANDSHAKE_LEN * 8;
let ratio = (total_set_bits as f64) / (total_bits as f64);
assert!(ratio > 0.48 && ratio < 0.52, "Nonce entropy is degraded. Set bit ratio: {}", ratio);
assert!(
ratio > 0.48 && ratio < 0.52,
"Nonce entropy is degraded. Set bit ratio: {}",
ratio
);
}
#[tokio::test]
@ -267,10 +277,19 @@ async fn mtproto_multi_user_decryption_isolation() {
config.general.modes.secure = true;
config.access.ignore_time_skew = true;
config.access.users.insert("user_a".to_string(), "11111111111111111111111111111111".to_string());
config.access.users.insert("user_b".to_string(), "22222222222222222222222222222222".to_string());
config.access.users.insert(
"user_a".to_string(),
"11111111111111111111111111111111".to_string(),
);
config.access.users.insert(
"user_b".to_string(),
"22222222222222222222222222222222".to_string(),
);
let good_secret_hex = "33333333333333333333333333333333";
config.access.users.insert("user_c".to_string(), good_secret_hex.to_string());
config
.access
.users
.insert("user_c".to_string(), good_secret_hex.to_string());
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
let peer: SocketAddr = "192.0.2.104:12345".parse().unwrap();
@ -291,9 +310,14 @@ async fn mtproto_multi_user_decryption_isolation() {
match res {
HandshakeResult::Success((_, _, success)) => {
assert_eq!(success.user, "user_c", "Decryption attempts on previous users must not corrupt the handshake buffer for the valid user");
assert_eq!(
success.user, "user_c",
"Decryption attempts on previous users must not corrupt the handshake buffer for the valid user"
);
}
_ => panic!("Multi-user MTProto handshake failed. Decryption buffer might be mutating in place."),
_ => panic!(
"Multi-user MTProto handshake failed. Decryption buffer might be mutating in place."
),
}
}
@ -325,7 +349,9 @@ async fn invalid_secret_warning_lock_contention_and_bound() {
}
let warned = INVALID_SECRET_WARNED.get().unwrap();
let guard = warned.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
let guard = warned
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
assert_eq!(
guard.len(),
@ -342,7 +368,11 @@ async fn mtproto_strict_concurrent_replay_race_condition() {
let secret_hex = "4A4A4A4A4A4A4A4A4A4A4A4A4A4A4A4A";
let config = Arc::new(test_config_with_secret_hex(secret_hex));
let replay_checker = Arc::new(ReplayChecker::new(4096, Duration::from_secs(60)));
let valid_handshake = Arc::new(make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 1));
let valid_handshake = Arc::new(make_valid_mtproto_handshake(
secret_hex,
ProtoTag::Secure,
1,
));
let tasks = 100;
let barrier = Arc::new(Barrier::new(tasks));
@ -355,7 +385,10 @@ async fn mtproto_strict_concurrent_replay_race_condition() {
let hs = valid_handshake.clone();
handles.push(tokio::spawn(async move {
let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, (i % 250) as u8)), 10000 + i as u16);
let peer = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(10, 0, 0, (i % 250) as u8)),
10000 + i as u16,
);
b.wait().await;
handle_mtproto_handshake(
&hs,
@ -382,8 +415,15 @@ async fn mtproto_strict_concurrent_replay_race_condition() {
}
}
assert_eq!(successes, 1, "Replay cache race condition allowed multiple identical MTProto handshakes to succeed");
assert_eq!(failures, tasks - 1, "Replay cache failed to forcefully reject concurrent duplicates");
assert_eq!(
successes, 1,
"Replay cache race condition allowed multiple identical MTProto handshakes to succeed"
);
assert_eq!(
failures,
tasks - 1,
"Replay cache failed to forcefully reject concurrent duplicates"
);
}
#[tokio::test]
@ -398,7 +438,8 @@ async fn tls_alpn_zero_length_protocol_handled_safely() {
let rng = SecureRandom::new();
let peer: SocketAddr = "192.0.2.107:12345".parse().unwrap();
let handshake = make_valid_tls_client_hello_with_sni_and_alpn(&secret, 0, "example.com", &[b""]);
let handshake =
make_valid_tls_client_hello_with_sni_and_alpn(&secret, 0, "example.com", &[b""]);
let res = handle_tls_handshake(
&handshake,
@ -412,7 +453,10 @@ async fn tls_alpn_zero_length_protocol_handled_safely() {
)
.await;
assert!(matches!(res, HandshakeResult::BadClient { .. }), "0-length ALPN must be safely rejected without panicking");
assert!(
matches!(res, HandshakeResult::BadClient { .. }),
"0-length ALPN must be safely rejected without panicking"
);
}
#[tokio::test]
@ -427,7 +471,8 @@ async fn tls_sni_massive_hostname_does_not_panic() {
let peer: SocketAddr = "192.0.2.108:12345".parse().unwrap();
let massive_hostname = String::from_utf8(vec![b'a'; 65000]).unwrap();
let handshake = make_valid_tls_client_hello_with_sni_and_alpn(&secret, 0, &massive_hostname, &[]);
let handshake =
make_valid_tls_client_hello_with_sni_and_alpn(&secret, 0, &massive_hostname, &[]);
let res = handle_tls_handshake(
&handshake,
@ -441,7 +486,13 @@ async fn tls_sni_massive_hostname_does_not_panic() {
)
.await;
assert!(matches!(res, HandshakeResult::Success(_) | HandshakeResult::BadClient { .. }), "Massive SNI hostname must be processed or ignored without stack overflow or panic");
assert!(
matches!(
res,
HandshakeResult::Success(_) | HandshakeResult::BadClient { .. }
),
"Massive SNI hostname must be processed or ignored without stack overflow or panic"
);
}
#[tokio::test]
@ -455,7 +506,8 @@ async fn tls_progressive_truncation_fuzzing_no_panics() {
let rng = SecureRandom::new();
let peer: SocketAddr = "192.0.2.109:12345".parse().unwrap();
let valid_handshake = make_valid_tls_client_hello_with_sni_and_alpn(&secret, 0, "example.com", &[b"h2"]);
let valid_handshake =
make_valid_tls_client_hello_with_sni_and_alpn(&secret, 0, "example.com", &[b"h2"]);
let full_len = valid_handshake.len();
// Truncated corpus only: full_len is a valid baseline and should not be
@ -473,7 +525,11 @@ async fn tls_progressive_truncation_fuzzing_no_panics() {
None,
)
.await;
assert!(matches!(res, HandshakeResult::BadClient { .. }), "Truncated TLS handshake at len {} must fail safely without panicking", i);
assert!(
matches!(res, HandshakeResult::BadClient { .. }),
"Truncated TLS handshake at len {} must fail safely without panicking",
i
);
}
}
@ -504,7 +560,10 @@ async fn mtproto_pure_entropy_fuzzing_no_panics() {
)
.await;
assert!(matches!(res, HandshakeResult::BadClient { .. }), "Pure entropy MTProto payload must fail closed and never panic");
assert!(
matches!(res, HandshakeResult::BadClient { .. }),
"Pure entropy MTProto payload must fail closed and never panic"
);
}
}
@ -517,10 +576,16 @@ fn decode_user_secret_odd_length_hex_rejection() {
let mut config = ProxyConfig::default();
config.access.users.clear();
config.access.users.insert("odd_user".to_string(), "1234567890123456789012345678901".to_string());
config.access.users.insert(
"odd_user".to_string(),
"1234567890123456789012345678901".to_string(),
);
let decoded = decode_user_secrets(&config, None);
assert!(decoded.is_empty(), "Odd-length hex string must be gracefully rejected by hex::decode without unwrapping");
assert!(
decoded.is_empty(),
"Odd-length hex string must be gracefully rejected by hex::decode without unwrapping"
);
}
#[test]
@ -552,7 +617,10 @@ fn saturation_grace_pre_existing_high_fail_streak_immediate_throttle() {
}
let is_throttled = auth_probe_should_apply_preauth_throttle(peer_ip, now);
assert!(is_throttled, "A peer with a pre-existing high fail streak must be immediately throttled when saturation begins, receiving no unearned grace period");
assert!(
is_throttled,
"A peer with a pre-existing high fail streak must be immediately throttled when saturation begins, receiving no unearned grace period"
);
}
#[test]
@ -586,7 +654,11 @@ fn mtproto_classic_tags_rejected_when_only_secure_mode_enabled() {
config.general.modes.tls = false;
assert!(!mode_enabled_for_proto(&config, ProtoTag::Abridged, false));
assert!(!mode_enabled_for_proto(&config, ProtoTag::Intermediate, false));
assert!(!mode_enabled_for_proto(
&config,
ProtoTag::Intermediate,
false
));
}
#[test]

View File

@ -1,5 +1,5 @@
use super::*;
use crate::crypto::{sha256, sha256_hmac, AesCtr, SecureRandom};
use crate::crypto::{AesCtr, SecureRandom, sha256, sha256_hmac};
use crate::protocol::constants::{ProtoTag, TLS_RECORD_HANDSHAKE, TLS_VERSION};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
@ -80,8 +80,7 @@ fn make_valid_tls_client_hello_with_alpn(
digest[28 + i] ^= ts[i];
}
record[tls::TLS_DIGEST_POS..tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN]
.copy_from_slice(&digest);
record[tls::TLS_DIGEST_POS..tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN].copy_from_slice(&digest);
record
}
@ -331,7 +330,11 @@ async fn saturation_grace_exhaustion_under_concurrency_keeps_peer_throttled() {
let final_state = state.get(&peer_ip).expect("state must exist");
assert!(
final_state.fail_streak >= AUTH_PROBE_BACKOFF_START_FAILS + AUTH_PROBE_SATURATION_GRACE_FAILS
final_state.fail_streak
>= AUTH_PROBE_BACKOFF_START_FAILS + AUTH_PROBE_SATURATION_GRACE_FAILS
);
assert!(auth_probe_should_apply_preauth_throttle(peer_ip, Instant::now()));
assert!(auth_probe_should_apply_preauth_throttle(
peer_ip,
Instant::now()
));
}

View File

@ -1,5 +1,5 @@
use super::*;
use crate::crypto::{sha256, sha256_hmac, AesCtr, SecureRandom};
use crate::crypto::{AesCtr, SecureRandom, sha256, sha256_hmac};
use crate::protocol::constants::{ProtoTag, TLS_RECORD_HANDSHAKE, TLS_VERSION};
use std::net::SocketAddr;
use std::time::{Duration, Instant};
@ -169,10 +169,10 @@ async fn mtproto_user_scan_timing_manual_benchmark() {
);
}
config.access.users.insert(
preferred_user.to_string(),
target_secret_hex.to_string(),
);
config
.access
.users
.insert(preferred_user.to_string(), target_secret_hex.to_string());
let replay_checker_preferred = ReplayChecker::new(65_536, Duration::from_secs(60));
let replay_checker_full_scan = ReplayChecker::new(65_536, Duration::from_secs(60));

View File

@ -544,7 +544,6 @@ async fn timing_classifier_light_fuzz_pairwise_bucketed_accuracy_stays_bounded_u
if hardened_acc + 0.05 <= baseline_acc {
meaningful_improvement_seen = true;
}
}
assert!(

View File

@ -85,7 +85,10 @@ async fn aggressive_mode_shapes_backend_silent_non_eof_path() {
let legacy = capture_forwarded_len_with_mode(body_sent, false, false, false, 0).await;
let aggressive = capture_forwarded_len_with_mode(body_sent, false, true, false, 0).await;
assert!(legacy < floor, "legacy mode should keep timeout path unshaped");
assert!(
legacy < floor,
"legacy mode should keep timeout path unshaped"
);
assert!(
aggressive >= floor,
"aggressive mode must shape backend-silent non-EOF paths (aggressive={aggressive}, floor={floor})"

View File

@ -52,7 +52,10 @@ async fn run_connect_failure_case(
.await
.unwrap()
.unwrap();
assert_eq!(n, 0, "connect-failure path must close client-visible writer");
assert_eq!(
n, 0,
"connect-failure path must close client-visible writer"
);
started.elapsed()
}
@ -67,13 +70,9 @@ async fn connect_failure_refusal_close_behavior_matrix() {
let peer: SocketAddr = format!("203.0.113.210:{}", 54100 + idx as u16)
.parse()
.unwrap();
let elapsed = run_connect_failure_case(
"127.0.0.1",
unused_port,
timing_normalization_enabled,
peer,
)
.await;
let elapsed =
run_connect_failure_case("127.0.0.1", unused_port, timing_normalization_enabled, peer)
.await;
if timing_normalization_enabled {
assert!(

View File

@ -79,7 +79,10 @@ async fn io_error_terminates_cleanly() {
}
}
tokio::time::timeout(MASK_RELAY_TIMEOUT, consume_client_data(ErrReader, usize::MAX))
.await
.expect("consume_client_data did not return on I/O error");
tokio::time::timeout(
MASK_RELAY_TIMEOUT,
consume_client_data(ErrReader, usize::MAX),
)
.await
.expect("consume_client_data did not return on I/O error");
}

View File

@ -32,8 +32,16 @@ async fn run_self_target_refusal(
let (mut client, server) = duplex(1024);
let started = Instant::now();
let task = tokio::spawn(async move {
handle_bad_client(server, tokio::io::sink(), initial, peer, local_addr, &config, &beobachten)
.await;
handle_bad_client(
server,
tokio::io::sink(),
initial,
peer,
local_addr,
&config,
&beobachten,
)
.await;
});
client
@ -214,4 +222,4 @@ async fn stress_high_fanout_self_target_refusal_no_deadlock_or_timeout() {
})
.await
.expect("high-fanout refusal workload must complete without deadlock");
}
}

View File

@ -2,7 +2,13 @@ use super::*;
#[test]
fn exact_four_byte_http_tokens_are_classified() {
for token in [b"GET ".as_ref(), b"POST".as_ref(), b"HEAD".as_ref(), b"PUT ".as_ref(), b"PRI ".as_ref()] {
for token in [
b"GET ".as_ref(),
b"POST".as_ref(),
b"HEAD".as_ref(),
b"PUT ".as_ref(),
b"PRI ".as_ref(),
] {
assert!(
is_http_probe(token),
"exact 4-byte token must be classified as HTTP probe: {:?}",
@ -76,4 +82,4 @@ fn light_fuzz_four_byte_ascii_noise_not_misclassified() {
token
);
}
}
}

View File

@ -38,4 +38,4 @@ async fn adversarial_parallel_cold_miss_performs_single_interface_refresh() {
1,
"parallel cold misses must coalesce into a single interface enumeration"
);
}
}

View File

@ -37,7 +37,10 @@ async fn tdd_non_local_port_short_circuit_does_not_enumerate_interfaces() {
let local_addr: SocketAddr = "0.0.0.0:443".parse().expect("valid local addr");
let is_local = is_mask_target_local_listener_async("127.0.0.1", 8443, local_addr, None).await;
assert!(!is_local, "different port must not be treated as local listener");
assert!(
!is_local,
"different port must not be treated as local listener"
);
assert_eq!(
local_interface_enumerations_for_tests(),
0,

View File

@ -63,17 +63,11 @@ impl AsyncWrite for CountingWriter {
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<std::io::Result<()>> {
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<std::io::Result<()>> {
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}

View File

@ -1,6 +1,6 @@
use super::*;
use std::net::TcpListener as StdTcpListener;
use std::net::SocketAddr;
use std::net::TcpListener as StdTcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex};
use tokio::net::TcpListener;
use tokio::time::{Duration, Instant, timeout};
@ -15,74 +15,38 @@ fn closed_local_port() -> u16 {
#[tokio::test]
async fn self_target_detection_matches_literal_ipv4_listener() {
let local: SocketAddr = "198.51.100.40:443".parse().unwrap();
assert!(is_mask_target_local_listener_async(
"198.51.100.40",
443,
local,
None,
)
.await);
assert!(is_mask_target_local_listener_async("198.51.100.40", 443, local, None,).await);
}
#[tokio::test]
async fn self_target_detection_matches_bracketed_ipv6_listener() {
let local: SocketAddr = "[2001:db8::44]:8443".parse().unwrap();
assert!(is_mask_target_local_listener_async(
"[2001:db8::44]",
8443,
local,
None,
)
.await);
assert!(is_mask_target_local_listener_async("[2001:db8::44]", 8443, local, None,).await);
}
#[tokio::test]
async fn self_target_detection_keeps_same_ip_different_port_forwardable() {
let local: SocketAddr = "203.0.113.44:443".parse().unwrap();
assert!(!is_mask_target_local_listener_async(
"203.0.113.44",
8443,
local,
None,
)
.await);
assert!(!is_mask_target_local_listener_async("203.0.113.44", 8443, local, None,).await);
}
#[tokio::test]
async fn self_target_detection_normalizes_ipv4_mapped_ipv6_literal() {
let local: SocketAddr = "127.0.0.1:443".parse().unwrap();
assert!(is_mask_target_local_listener_async(
"::ffff:127.0.0.1",
443,
local,
None,
)
.await);
assert!(is_mask_target_local_listener_async("::ffff:127.0.0.1", 443, local, None,).await);
}
#[tokio::test]
async fn self_target_detection_unspecified_bind_blocks_loopback_target() {
let local: SocketAddr = "0.0.0.0:443".parse().unwrap();
assert!(is_mask_target_local_listener_async(
"127.0.0.1",
443,
local,
None,
)
.await);
assert!(is_mask_target_local_listener_async("127.0.0.1", 443, local, None,).await);
}
#[tokio::test]
async fn self_target_detection_unspecified_bind_keeps_remote_target_forwardable() {
let local: SocketAddr = "0.0.0.0:443".parse().unwrap();
let remote: SocketAddr = "198.51.100.44:443".parse().unwrap();
assert!(!is_mask_target_local_listener_async(
"mask.example",
443,
local,
Some(remote),
)
.await);
assert!(!is_mask_target_local_listener_async("mask.example", 443, local, Some(remote),).await);
}
#[tokio::test]
@ -306,7 +270,10 @@ async fn offline_mask_target_refusal_respects_timing_normalization_budget() {
});
client.shutdown().await.unwrap();
timeout(Duration::from_secs(2), task).await.unwrap().unwrap();
timeout(Duration::from_secs(2), task)
.await
.unwrap()
.unwrap();
let elapsed = started.elapsed();
assert!(
@ -350,7 +317,10 @@ async fn offline_mask_target_refusal_with_idle_client_is_bounded_by_consume_time
.await
.expect("connection should still be open before consume timeout expires");
timeout(Duration::from_secs(2), task).await.unwrap().unwrap();
timeout(Duration::from_secs(2), task)
.await
.unwrap()
.unwrap();
let elapsed = started.elapsed();
assert!(

View File

@ -40,7 +40,10 @@ async fn adversarial_delayed_interface_lookup_does_not_consume_outcome_floor_bud
tokio::time::sleep(Duration::from_millis(80)).await;
drop(held_refresh_guard);
client.shutdown().await.expect("client shutdown must succeed");
client
.shutdown()
.await
.expect("client shutdown must succeed");
timeout(Duration::from_secs(2), task)
.await
@ -52,4 +55,4 @@ async fn adversarial_delayed_interface_lookup_does_not_consume_outcome_floor_bud
elapsed >= Duration::from_millis(180) && elapsed < Duration::from_millis(350),
"timing normalization floor must start after pre-outcome self-target checks"
);
}
}

View File

@ -2,8 +2,8 @@ use super::*;
use crate::crypto::AesCtr;
use crate::stats::Stats;
use crate::stream::{BufferPool, CryptoReader};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use tokio::io::AsyncWriteExt;
use tokio::io::duplex;
use tokio::time::{Duration as TokioDuration, Instant as TokioInstant, timeout};

View File

@ -29,7 +29,10 @@ fn blackhat_registry_poison_recovers_with_fail_closed_reset_and_pressure_account
let before = relay_pressure_event_seq();
note_relay_pressure_event();
let after = relay_pressure_event_seq();
assert!(after > before, "pressure accounting must still advance after poison");
assert!(
after > before,
"pressure accounting must still advance after poison"
);
clear_relay_idle_pressure_state_for_testing();
}

View File

@ -217,7 +217,9 @@ async fn adversarial_lockstep_alternating_attack_under_jitter_closes() {
}
}
writer_task.await.expect("writer jitter task must not panic");
writer_task
.await
.expect("writer jitter task must not panic");
assert!(closed, "alternating attack must close before EOF");
});
}
@ -247,7 +249,10 @@ async fn integration_mixed_population_attackers_close_benign_survive() {
plaintext.push(0x01);
plaintext.extend_from_slice(&[n, n, n, n]);
}
writer.write_all(&encrypt_for_reader(&plaintext)).await.unwrap();
writer
.write_all(&encrypt_for_reader(&plaintext))
.await
.unwrap();
drop(writer);
let mut closed = false;
@ -279,7 +284,10 @@ async fn integration_mixed_population_attackers_close_benign_survive() {
}
plaintext.push(0x01);
plaintext.extend_from_slice(&payload);
writer.write_all(&encrypt_for_reader(&plaintext)).await.unwrap();
writer
.write_all(&encrypt_for_reader(&plaintext))
.await
.unwrap();
let got = read_once(
&mut crypto_reader,
@ -329,7 +337,10 @@ async fn light_fuzz_parallel_patterns_no_hang_or_panic() {
}
}
writer.write_all(&encrypt_for_reader(&plaintext)).await.unwrap();
writer
.write_all(&encrypt_for_reader(&plaintext))
.await
.unwrap();
drop(writer);
for _ in 0..320 {

View File

@ -51,7 +51,9 @@ fn make_enabled_idle_policy() -> RelayClientIdlePolicy {
fn append_tiny_frame(plaintext: &mut Vec<u8>, proto: ProtoTag) {
match proto {
ProtoTag::Abridged => plaintext.push(0x00),
ProtoTag::Intermediate | ProtoTag::Secure => plaintext.extend_from_slice(&0u32.to_le_bytes()),
ProtoTag::Intermediate | ProtoTag::Secure => {
plaintext.extend_from_slice(&0u32.to_le_bytes())
}
}
}
@ -206,7 +208,11 @@ async fn intermediate_chunked_alternating_attack_closes_before_eof() {
let mut plaintext = Vec::with_capacity(8 * 200);
for n in 0..180u8 {
append_tiny_frame(&mut plaintext, ProtoTag::Intermediate);
append_real_frame(&mut plaintext, ProtoTag::Intermediate, [n, n ^ 1, n ^ 2, n ^ 3]);
append_real_frame(
&mut plaintext,
ProtoTag::Intermediate,
[n, n ^ 1, n ^ 2, n ^ 3],
);
}
let encrypted = encrypt_for_reader(&plaintext);
@ -240,7 +246,9 @@ async fn intermediate_chunked_alternating_attack_closes_before_eof() {
}
}
writer_task.await.expect("intermediate writer task must not panic");
writer_task
.await
.expect("intermediate writer task must not panic");
assert!(closed, "intermediate alternating attack must fail closed");
}
@ -290,7 +298,9 @@ async fn secure_chunked_alternating_attack_closes_before_eof() {
}
}
writer_task.await.expect("secure writer task must not panic");
writer_task
.await
.expect("secure writer task must not panic");
assert!(closed, "secure alternating attack must fail closed");
}

View File

@ -2,8 +2,8 @@ use super::*;
use crate::crypto::AesCtr;
use crate::stats::Stats;
use crate::stream::{BufferPool, CryptoReader};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::time::Instant;
use tokio::io::{AsyncRead, AsyncWriteExt, duplex};
@ -156,7 +156,10 @@ fn alternating_one_to_one_closes_with_bounded_real_frame_count() {
}
let (closed_at, _, reals) = simulate_tiny_debt_pattern(&pattern, pattern.len());
assert!(closed_at.is_some());
assert!(reals <= 80, "expected bounded real frames before close, got {reals}");
assert!(
reals <= 80,
"expected bounded real frames before close, got {reals}"
);
}
#[test]
@ -183,7 +186,10 @@ fn alternating_one_to_seven_eventually_closes() {
}
}
let (closed_at, _, _) = simulate_tiny_debt_pattern(&pattern, pattern.len());
assert!(closed_at.is_some(), "1:7 tiny-to-real must eventually close");
assert!(
closed_at.is_some(),
"1:7 tiny-to-real must eventually close"
);
}
#[test]

View File

@ -2,10 +2,10 @@ use super::*;
use crate::crypto::AesCtr;
use crate::stats::Stats;
use crate::stream::{BufferPool, CryptoReader};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWriteExt, duplex};
use std::sync::atomic::AtomicU64;
use std::time::Instant;
use tokio::io::{AsyncRead, AsyncWriteExt, duplex};
fn make_crypto_reader<T>(reader: T) -> CryptoReader<T>
where

View File

@ -5,10 +5,13 @@ use crate::stream::BufferPool;
use rand::rngs::StdRng;
use rand::{RngExt, SeedableRng};
use std::sync::Arc;
use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex};
use tokio::time::{Duration, timeout};
async fn read_available<R: tokio::io::AsyncRead + Unpin>(reader: &mut R, budget: Duration) -> usize {
async fn read_available<R: tokio::io::AsyncRead + Unpin>(
reader: &mut R,
budget: Duration,
) -> usize {
let start = tokio::time::Instant::now();
let mut total = 0usize;
let mut buf = [0u8; 128];
@ -57,16 +60,25 @@ async fn positive_quota_path_forwards_both_directions_within_limit() {
Arc::new(BufferPool::new()),
));
client_peer.write_all(&[0xAA, 0xBB, 0xCC, 0xDD]).await.unwrap();
client_peer
.write_all(&[0xAA, 0xBB, 0xCC, 0xDD])
.await
.unwrap();
server_peer.read_exact(&mut [0u8; 4]).await.unwrap();
server_peer.write_all(&[0x11, 0x22, 0x33, 0x44]).await.unwrap();
server_peer
.write_all(&[0x11, 0x22, 0x33, 0x44])
.await
.unwrap();
client_peer.read_exact(&mut [0u8; 4]).await.unwrap();
drop(client_peer);
drop(server_peer);
let relay_result = timeout(Duration::from_secs(2), relay).await.unwrap().unwrap();
let relay_result = timeout(Duration::from_secs(2), relay)
.await
.unwrap()
.unwrap();
assert!(relay_result.is_ok());
assert!(stats.get_user_quota_used(user) <= 16);
}
@ -98,11 +110,23 @@ async fn negative_preloaded_quota_forbids_any_forwarding() {
client_peer.write_all(&[0xAA]).await.unwrap();
server_peer.write_all(&[0xBB]).await.unwrap();
assert_eq!(read_available(&mut server_peer, Duration::from_millis(120)).await, 0);
assert_eq!(read_available(&mut client_peer, Duration::from_millis(120)).await, 0);
assert_eq!(
read_available(&mut server_peer, Duration::from_millis(120)).await,
0
);
assert_eq!(
read_available(&mut client_peer, Duration::from_millis(120)).await,
0
);
let relay_result = timeout(Duration::from_secs(2), relay).await.unwrap().unwrap();
assert!(matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. })));
let relay_result = timeout(Duration::from_secs(2), relay)
.await
.unwrap()
.unwrap();
assert!(matches!(
relay_result,
Err(ProxyError::DataQuotaExceeded { .. })
));
assert!(stats.get_user_quota_used(user) <= 8);
}
@ -135,13 +159,25 @@ async fn edge_quota_one_ensures_at_most_one_byte_across_directions() {
);
let mut buf = [0u8; 1];
let delivered_s2c = timeout(Duration::from_millis(120), client_peer.read(&mut buf)).await.unwrap().unwrap_or(0);
let delivered_c2s = timeout(Duration::from_millis(120), server_peer.read(&mut buf)).await.unwrap().unwrap_or(0);
let delivered_s2c = timeout(Duration::from_millis(120), client_peer.read(&mut buf))
.await
.unwrap()
.unwrap_or(0);
let delivered_c2s = timeout(Duration::from_millis(120), server_peer.read(&mut buf))
.await
.unwrap()
.unwrap_or(0);
assert!(delivered_s2c + delivered_c2s <= 1);
let relay_result = timeout(Duration::from_secs(2), relay).await.unwrap().unwrap();
assert!(matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. })));
let relay_result = timeout(Duration::from_secs(2), relay)
.await
.unwrap()
.unwrap();
assert!(matches!(
relay_result,
Err(ProxyError::DataQuotaExceeded { .. })
));
}
#[tokio::test]
@ -191,8 +227,14 @@ async fn adversarial_blackhat_alternating_jitter_does_not_overshoot_quota() {
tokio::time::sleep(Duration::from_millis(((i % 3) + 1) as u64)).await;
}
let relay_result = timeout(Duration::from_secs(3), relay).await.unwrap().unwrap();
assert!(matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. })));
let relay_result = timeout(Duration::from_secs(3), relay)
.await
.unwrap()
.unwrap();
assert!(matches!(
relay_result,
Err(ProxyError::DataQuotaExceeded { .. })
));
assert!(total_forwarded <= quota as usize);
assert!(stats.get_user_quota_used(user) <= quota);
}
@ -239,13 +281,17 @@ async fn light_fuzz_random_quota_schedule_preserves_quota_invariants() {
if rng.random::<bool>() {
let _ = client_peer.write_all(&[rng.random::<u8>()]).await;
let mut one = [0u8; 1];
if let Ok(Ok(n)) = timeout(Duration::from_millis(4), server_peer.read(&mut one)).await {
if let Ok(Ok(n)) =
timeout(Duration::from_millis(4), server_peer.read(&mut one)).await
{
total_forwarded += n;
}
} else {
let _ = server_peer.write_all(&[rng.random::<u8>()]).await;
let mut one = [0u8; 1];
if let Ok(Ok(n)) = timeout(Duration::from_millis(4), client_peer.read(&mut one)).await {
if let Ok(Ok(n)) =
timeout(Duration::from_millis(4), client_peer.read(&mut one)).await
{
total_forwarded += n;
}
}
@ -254,8 +300,14 @@ async fn light_fuzz_random_quota_schedule_preserves_quota_invariants() {
drop(client_peer);
drop(server_peer);
let relay_result = timeout(Duration::from_secs(2), relay).await.unwrap().unwrap();
assert!(relay_result.is_ok() || matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. })));
let relay_result = timeout(Duration::from_secs(2), relay)
.await
.unwrap()
.unwrap();
assert!(
relay_result.is_ok()
|| matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. }))
);
assert!(total_forwarded <= quota as usize);
assert!(stats.get_user_quota_used(&user) <= quota);
}
@ -305,13 +357,17 @@ async fn stress_parallel_relays_for_one_user_obey_global_quota() {
if (step as usize + worker as usize) % 2 == 0 {
let _ = client_peer.write_all(&[(step ^ 0x5A)]).await;
let mut one = [0u8; 1];
if let Ok(Ok(n)) = timeout(Duration::from_millis(6), server_peer.read(&mut one)).await {
if let Ok(Ok(n)) =
timeout(Duration::from_millis(6), server_peer.read(&mut one)).await
{
total += n;
}
} else {
let _ = server_peer.write_all(&[(step ^ 0xA5)]).await;
let mut one = [0u8; 1];
if let Ok(Ok(n)) = timeout(Duration::from_millis(6), client_peer.read(&mut one)).await {
if let Ok(Ok(n)) =
timeout(Duration::from_millis(6), client_peer.read(&mut one)).await
{
total += n;
}
}
@ -321,8 +377,14 @@ async fn stress_parallel_relays_for_one_user_obey_global_quota() {
drop(client_peer);
drop(server_peer);
let relay_result = timeout(Duration::from_secs(2), relay).await.unwrap().unwrap();
assert!(relay_result.is_ok() || matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. })));
let relay_result = timeout(Duration::from_secs(2), relay)
.await
.unwrap()
.unwrap();
assert!(
relay_result.is_ok()
|| matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. }))
);
total
}));
}

View File

@ -381,7 +381,9 @@ impl Stats {
return;
}
Self::touch_user_stats(user_stats);
user_stats.octets_from_client.fetch_add(bytes, Ordering::Relaxed);
user_stats
.octets_from_client
.fetch_add(bytes, Ordering::Relaxed);
}
#[inline]
@ -390,7 +392,9 @@ impl Stats {
return;
}
Self::touch_user_stats(user_stats);
user_stats.octets_to_client.fetch_add(bytes, Ordering::Relaxed);
user_stats
.octets_to_client
.fetch_add(bytes, Ordering::Relaxed);
}
#[inline]
@ -812,7 +816,8 @@ impl Stats {
}
pub fn increment_me_d2c_data_frames_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_d2c_data_frames_total.fetch_add(1, Ordering::Relaxed);
self.me_d2c_data_frames_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_ack_frames_total(&self) {
@ -1708,7 +1713,8 @@ impl Stats {
self.me_d2c_batch_bytes_bucket_1k_4k.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_4k_16k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_4k_16k.load(Ordering::Relaxed)
self.me_d2c_batch_bytes_bucket_4k_16k
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_16k_64k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_16k_64k
@ -2371,8 +2377,8 @@ impl ReplayStats {
mod tests {
use super::*;
use crate::config::MeTelemetryLevel;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
#[test]
fn test_stats_shared_counters() {

View File

@ -14,7 +14,10 @@ fn padding_rounding_equivalent_for_extensive_safe_domain() {
let old = old_padding_round_up_to_4(len).expect("old expression must be safe");
let new = new_padding_round_up_to_4(len).expect("new expression must be safe");
assert_eq!(old, new, "mismatch for len={len}");
assert!(new >= len, "rounded length must not shrink: len={len}, out={new}");
assert!(
new >= len,
"rounded length must not shrink: len={len}, out={new}"
);
assert_eq!(new % 4, 0, "rounded length must stay 4-byte aligned");
}
}

View File

@ -44,7 +44,10 @@ async fn encapsulation_repeated_queue_poison_recovery_preserves_forward_progress
let ip_primary = ip_from_idx(10_001);
let ip_alt = ip_from_idx(10_002);
tracker.check_and_add("encap-poison", ip_primary).await.unwrap();
tracker
.check_and_add("encap-poison", ip_primary)
.await
.unwrap();
for _ in 0..128 {
let queue = tracker.cleanup_queue_mutex_for_tests();

View File

@ -812,8 +812,8 @@ mod tests {
#[test]
fn test_encode_tls13_certificate_message_single_cert() {
let cert = vec![0x30, 0x03, 0x02, 0x01, 0x01];
let message = encode_tls13_certificate_message(std::slice::from_ref(&cert))
.expect("message");
let message =
encode_tls13_certificate_message(std::slice::from_ref(&cert)).expect("message");
assert_eq!(message[0], 0x0b);
assert_eq!(read_u24(&message[1..4]), message.len() - 4);

View File

@ -293,9 +293,7 @@ impl MePool {
WriterContour::Draining => "draining",
};
if !draining
&& let Some(dc_idx) = dc
{
if !draining && let Some(dc_idx) = dc {
*live_writers_by_dc_endpoint
.entry((dc_idx, endpoint))
.or_insert(0) += 1;

View File

@ -201,7 +201,10 @@ impl ConnectionPool {
pub async fn close_all(&self) {
let pools_snapshot: Vec<(SocketAddr, Arc<Mutex<PoolInner>>)> = {
let pools = self.pools.read();
pools.iter().map(|(addr, pool)| (*addr, Arc::clone(pool))).collect()
pools
.iter()
.map(|(addr, pool)| (*addr, Arc::clone(pool)))
.collect()
};
for (addr, pool) in pools_snapshot {