Compare commits

...

14 Commits

Author SHA1 Message Date
Alexey 25847c9a00
Merge branch 'flow-sec' into fix/api-docker-connection 2026-03-17 11:24:57 +03:00
Alexey 822bcbf7a5
Update Cargo.toml 2026-03-17 11:21:35 +03:00
Alexey b25ec97a43
Merge pull request #447 from DavidOsipov/pr-sec-1
PR-SEC-1 (WIP): Первый PR с узкой пачкой исправлений безопасности и маскировки. Упор сделан на /src/proxy
2026-03-17 11:20:36 +03:00
David Osipov 8821e38013
feat(proxy): enhance auth probe capacity with stale entry pruning and new tests 2026-03-17 02:19:14 +04:00
David Osipov a1caebbe6f
feat(proxy): implement timeout handling for client payload reads and add corresponding tests 2026-03-17 01:53:44 +04:00
David Osipov e0d821c6b6
Merge remote-tracking branch 'upstream/main' into pr-sec-1 2026-03-17 01:51:35 +04:00
David Osipov 205fc88718
feat(proxy): enhance logging and deduplication for unknown datacenters
- Implemented a mechanism to log unknown datacenter indices with a distinct limit to avoid excessive logging.
- Introduced tests to ensure that logging is deduplicated per datacenter index and respects the distinct limit.
- Updated the fallback logic for datacenter resolution to prevent panics when only a single datacenter is available.

feat(proxy): add authentication probe throttling

- Added a pre-authentication probe throttling mechanism to limit the rate of invalid TLS and MTProto handshake attempts.
- Introduced a backoff strategy for repeated failures and ensured that successful handshakes reset the failure count.
- Implemented tests to validate the behavior of the authentication probe under various conditions.

fix(proxy): ensure proper flushing of masked writes

- Added a flush operation after writing initial data to the mask writer to ensure data integrity.

refactor(proxy): optimize desynchronization deduplication

- Replaced the Mutex-based deduplication structure with a DashMap for improved concurrency and performance.
- Implemented a bounded cache for deduplication to limit memory usage and prevent stale entries from persisting.

test(proxy): enhance security tests for middle relay and handshake

- Added comprehensive tests for the middle relay and handshake processes, including scenarios for deduplication and authentication probe behavior.
- Ensured that the tests cover edge cases and validate the expected behavior of the system under load.
2026-03-17 01:29:30 +04:00
David Osipov e4a50f9286
feat(tls): add boot time timestamp constant and validation for SNI hostnames
- Introduced `BOOT_TIME_MAX_SECS` constant to define the maximum accepted boot-time timestamp.
- Updated `validate_tls_handshake_at_time` to utilize the new boot time constant for timestamp validation.
- Enhanced `extract_sni_from_client_hello` to validate SNI hostnames against specified criteria, rejecting invalid hostnames.
- Added tests to ensure proper handling of boot time timestamps and SNI validation.

feat(handshake): improve user secret decoding and ALPN enforcement

- Refactored user secret decoding to provide better error handling and logging for invalid secrets.
- Added tests for concurrent identical handshakes to ensure replay protection works as expected.
- Implemented ALPN enforcement in handshake processing, rejecting unsupported protocols and allowing valid ones.

fix(masking): implement timeout handling for masking operations

- Added timeout handling for writing proxy headers and consuming client data in masking.
- Adjusted timeout durations for testing to ensure faster feedback during unit tests.
- Introduced tests to verify behavior when masking is disabled and when proxy header writes exceed the timeout.

test(masking): add tests for slowloris connections and proxy header timeouts

- Created tests to validate that slowloris connections are closed by consume timeout when masking is disabled.
- Added a test for proxy header write timeout to ensure it returns false when the write operation does not complete.
2026-03-16 21:37:59 +04:00
David Osipov 213ce4555a
Merge remote-tracking branch 'upstream/main' into pr-sec-1 2026-03-16 20:51:53 +04:00
David Osipov 5a16e68487
Enhance TLS record handling and security tests
- Enforce TLS record length constraints in client handling to comply with RFC 8446, rejecting records outside the range of 512 to 16,384 bytes.
- Update security tests to validate behavior for oversized and undersized TLS records, ensuring they are correctly masked or rejected.
- Introduce new tests to verify the handling of TLS records in both generic and client handler pipelines.
- Refactor handshake logic to enforce mode restrictions based on transport type, preventing misuse of secure tags.
- Add tests for nonce generation and encryption consistency, ensuring correct behavior for different configurations.
- Improve masking tests to ensure proper logging and detection of client types, including SSH and unknown probes.
2026-03-16 20:43:49 +04:00
David Osipov 6ffbc51fb0
security: harden handshake/masking flows and add adversarial regressions
- forward valid-TLS/invalid-MTProto clients to mask backend in both client paths\n- harden TLS validation against timing and clock edge cases\n- move replay tracking behind successful authentication to avoid cache pollution\n- tighten secret decoding and key-material handling paths\n- add dedicated security test modules for tls/client/handshake/masking\n- include production-path regression for ClientHandler fallback behavior
2026-03-16 20:04:41 +04:00
David Osipov dcab19a64f
ci: remove CI workflow changes (deferred to later PR) 2026-03-16 13:56:46 +04:00
David Osipov f10ca192fa
chore: merge upstream/main (92972ab) into pr-sec-1 2026-03-16 13:50:46 +04:00
David Osipov 2bd9036908
ci: add security policy, cargo-deny configuration, and audit workflow
- Add deny.toml with license/advisory policy for cargo-deny
- Add security.yml GitHub Actions workflow for automated audit
- Update rust.yml with hardened clippy lint enforcement
- Update Cargo.toml/Cargo.lock with audit-related dependency additions
- Fix clippy lint placement in config.toml (Clippy lints must not live in rustflags)

Part of PR-SEC-1: no Rust source changes, establishes CI gates for all subsequent PRs.
2026-03-15 00:30:36 +04:00
19 changed files with 5983 additions and 655 deletions

15
.cargo/deny.toml Normal file
View File

@ -0,0 +1,15 @@
[bans]
multiple-versions = "deny"
wildcards = "allow"
highlight = "all"
# Explicitly flag the weak cryptography so the agent is forced to justify its existence
[[bans.skip]]
name = "md-5"
version = "*"
reason = "MUST VERIFY: Only allowed for legacy checksums, never for security."
[[bans.skip]]
name = "sha1"
version = "*"
reason = "MUST VERIFY: Only allowed for backwards compatibility."

View File

@ -5,6 +5,22 @@ Your responses are precise, minimal, and architecturally sound. You are working
---
### Context: The Telemt Project
You are working on **Telemt**, a high-performance, production-grade Telegram MTProxy implementation written in Rust. It is explicitly designed to operate in highly hostile network environments and evade advanced network censorship.
**Adversarial Threat Model:**
The proxy operates under constant surveillance by DPI (Deep Packet Inspection) systems and active scanners (state firewalls, mobile operator fraud controls). These entities actively probe IPs, analyze protocol handshakes, and look for known proxy signatures to block or throttle traffic.
**Core Architectural Pillars:**
1. **TLS-Fronting (TLS-F) & TCP-Splitting (TCP-S):** To the outside world, Telemt looks like a standard TLS server. If a client presents a valid MTProxy key, the connection is handled internally. If a censor's scanner, web browser, or unauthorized crawler connects, Telemt seamlessly splices the TCP connection (L4) to a real, legitimate HTTPS fallback server (e.g., Nginx) without modifying the `ClientHello` or terminating the TLS handshake.
2. **Middle-End (ME) Orchestration:** A highly concurrent, generation-based pool managing upstream connections to Telegram Datacenters (DCs). It utilizes an **Adaptive Floor** (dynamically scaling writer connections based on traffic), **Hardswaps** (zero-downtime pool reconfiguration), and **STUN/NAT** reflection mechanisms.
3. **Strict KDF Routing:** Cryptographic Key Derivation Functions (KDF) in this protocol strictly rely on the exact pairing of Source IP/Port and Destination IP/Port. Deviations or missing port logic will silently break the MTProto handshake.
4. **Data Plane vs. Control Plane Isolation:** The Data Plane (readers, writers, payload relay, TCP splicing) must remain strictly non-blocking, zero-allocation in hot paths, and highly resilient to network backpressure. The Control Plane (API, metrics, pool generation swaps, config reloads) orchestrates the state asynchronously without stalling the Data Plane.
Any modification you make must preserve Telemt's invisibility to censors, its strict memory-safety invariants, and its hot-path throughput.
### 0. Priority Resolution — Scope Control
This section resolves conflicts between code quality enforcement and scope limitation.

10
Cargo.lock generated
View File

@ -2025,6 +2025,12 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596"
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "subtle"
version = "2.6.1"
@ -2087,7 +2093,7 @@ dependencies = [
[[package]]
name = "telemt"
version = "3.3.15"
version = "3.3.19"
dependencies = [
"aes",
"anyhow",
@ -2127,6 +2133,8 @@ dependencies = [
"sha1",
"sha2",
"socket2 0.5.10",
"static_assertions",
"subtle",
"thiserror 2.0.18",
"tokio",
"tokio-rustls",

View File

@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.3.19"
version = "3.3.20"
edition = "2024"
[dependencies]
@ -22,6 +22,8 @@ hmac = "0.12"
crc32fast = "1.4"
crc32c = "0.6"
zeroize = { version = "1.8", features = ["derive"] }
subtle = "2.6"
static_assertions = "1.1"
# Network
socket2 = { version = "0.5", features = ["all"] }

View File

@ -1156,6 +1156,13 @@ pub struct ServerConfig {
#[serde(default = "default_proxy_protocol_header_timeout_ms")]
pub proxy_protocol_header_timeout_ms: u64,
/// Trusted source CIDRs allowed to send incoming PROXY protocol headers.
///
/// When non-empty, connections from addresses outside this allowlist are
/// rejected before `src_addr` is applied.
#[serde(default)]
pub proxy_protocol_trusted_cidrs: Vec<IpNetwork>,
#[serde(default)]
pub metrics_port: Option<u16>,
@ -1185,6 +1192,7 @@ impl Default for ServerConfig {
listen_tcp: None,
proxy_protocol: false,
proxy_protocol_header_timeout_ms: default_proxy_protocol_header_timeout_ms(),
proxy_protocol_trusted_cidrs: Vec::new(),
metrics_port: None,
metrics_whitelist: default_metrics_whitelist(),
api: ApiConfig::default(),

View File

@ -13,6 +13,7 @@ use super::constants::*;
use std::time::{SystemTime, UNIX_EPOCH};
use num_bigint::BigUint;
use num_traits::One;
use subtle::ConstantTimeEq;
// ============= Public Constants =============
@ -28,6 +29,8 @@ pub const TLS_DIGEST_HALF_LEN: usize = 16;
/// Time skew limits for anti-replay (in seconds)
pub const TIME_SKEW_MIN: i64 = -20 * 60; // 20 minutes before
pub const TIME_SKEW_MAX: i64 = 10 * 60; // 10 minutes after
/// Maximum accepted boot-time timestamp (seconds) before skew checks are enforced.
pub const BOOT_TIME_MAX_SECS: u32 = 7 * 24 * 60 * 60;
// ============= Private Constants =============
@ -125,7 +128,7 @@ impl TlsExtensionBuilder {
// protocol name length (1 byte)
// protocol name bytes
let proto_len = proto.len() as u8;
let list_len: u16 = 1 + proto_len as u16;
let list_len: u16 = 1 + u16::from(proto_len);
let ext_len: u16 = 2 + list_len;
self.extensions.extend_from_slice(&ext_len.to_be_bytes());
@ -273,13 +276,86 @@ impl ServerHelloBuilder {
// ============= Public Functions =============
/// Validate TLS ClientHello against user secrets
/// Validate TLS ClientHello against user secrets.
///
/// Returns validation result if a matching user is found.
/// The result **must** be used — ignoring it silently bypasses authentication.
#[must_use]
pub fn validate_tls_handshake(
handshake: &[u8],
secrets: &[(String, Vec<u8>)],
ignore_time_skew: bool,
) -> Option<TlsValidation> {
validate_tls_handshake_with_replay_window(
handshake,
secrets,
ignore_time_skew,
u64::from(BOOT_TIME_MAX_SECS),
)
}
/// Validate TLS ClientHello and cap the boot-time bypass by replay-cache TTL.
///
/// A boot-time timestamp is only accepted when it falls below both
/// `BOOT_TIME_MAX_SECS` and the configured replay window, preventing timestamp
/// reuse outside replay cache coverage.
#[must_use]
pub fn validate_tls_handshake_with_replay_window(
handshake: &[u8],
secrets: &[(String, Vec<u8>)],
ignore_time_skew: bool,
replay_window_secs: u64,
) -> Option<TlsValidation> {
// Only pay the clock syscall when we will actually compare against it.
// If `ignore_time_skew` is set, a broken or unavailable system clock
// must not block legitimate clients — that would be a DoS via clock failure.
let now = if !ignore_time_skew {
system_time_to_unix_secs(SystemTime::now())?
} else {
0_i64
};
let replay_window_u32 = u32::try_from(replay_window_secs).unwrap_or(u32::MAX);
let boot_time_cap_secs = BOOT_TIME_MAX_SECS.min(replay_window_u32);
validate_tls_handshake_at_time_with_boot_cap(
handshake,
secrets,
ignore_time_skew,
now,
boot_time_cap_secs,
)
}
fn system_time_to_unix_secs(now: SystemTime) -> Option<i64> {
// `try_from` rejects values that overflow i64 (> ~292 billion years CE),
// whereas `as i64` would silently wrap to a negative timestamp and corrupt
// every subsequent time-skew comparison.
let d = now.duration_since(UNIX_EPOCH).ok()?;
i64::try_from(d.as_secs()).ok()
}
fn validate_tls_handshake_at_time(
handshake: &[u8],
secrets: &[(String, Vec<u8>)],
ignore_time_skew: bool,
now: i64,
) -> Option<TlsValidation> {
validate_tls_handshake_at_time_with_boot_cap(
handshake,
secrets,
ignore_time_skew,
now,
BOOT_TIME_MAX_SECS,
)
}
fn validate_tls_handshake_at_time_with_boot_cap(
handshake: &[u8],
secrets: &[(String, Vec<u8>)],
ignore_time_skew: bool,
now: i64,
boot_time_cap_secs: u32,
) -> Option<TlsValidation> {
if handshake.len() < TLS_DIGEST_POS + TLS_DIGEST_LEN + 1 {
return None;
@ -305,50 +381,56 @@ pub fn validate_tls_handshake(
let mut msg = handshake.to_vec();
msg[TLS_DIGEST_POS..TLS_DIGEST_POS + TLS_DIGEST_LEN].fill(0);
// Get current time
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let mut first_match: Option<TlsValidation> = None;
for (user, secret) in secrets {
let computed = sha256_hmac(secret, &msg);
// XOR digests
let xored: Vec<u8> = digest.iter()
.zip(computed.iter())
.map(|(a, b)| a ^ b)
.collect();
// Check that first 28 bytes are zeros (timestamp in last 4)
if !xored[..28].iter().all(|&b| b == 0) {
// Constant-time equality check on the 28-byte HMAC window.
// A variable-time short-circuit here lets an active censor measure how many
// bytes matched, enabling secret brute-force via timing side-channels.
// Direct comparison on the original arrays avoids a heap allocation and
// removes the `try_into().unwrap()` that the intermediate Vec would require.
if !bool::from(digest[..28].ct_eq(&computed[..28])) {
continue;
}
// Extract timestamp
let timestamp = u32::from_le_bytes(xored[28..32].try_into().unwrap());
let time_diff = now - timestamp as i64;
// Check time skew
// The last 4 bytes encode the timestamp as XOR(digest[28..32], computed[28..32]).
// Inline array construction is infallible: both slices are [u8; 32] by construction.
let timestamp = u32::from_le_bytes([
digest[28] ^ computed[28],
digest[29] ^ computed[29],
digest[30] ^ computed[30],
digest[31] ^ computed[31],
]);
// time_diff is only meaningful (and `now` is only valid) when we are
// actually checking the window. Keep both inside the guard to make
// the dead-code path explicit and prevent accidental future use of
// a sentinel `now` value outside its intended scope.
if !ignore_time_skew {
// Allow very small timestamps (boot time instead of unix time)
// This is a quirk in some clients that use uptime instead of real time
let is_boot_time = timestamp < 60 * 60 * 24 * 1000; // < ~2.7 years in seconds
if !is_boot_time && !(TIME_SKEW_MIN..=TIME_SKEW_MAX).contains(&time_diff) {
continue;
let is_boot_time = timestamp < boot_time_cap_secs;
if !is_boot_time {
let time_diff = now - i64::from(timestamp);
if !(TIME_SKEW_MIN..=TIME_SKEW_MAX).contains(&time_diff) {
continue;
}
}
}
return Some(TlsValidation {
user: user.clone(),
session_id,
digest,
timestamp,
});
if first_match.is_none() {
first_match = Some(TlsValidation {
user: user.clone(),
session_id: session_id.clone(),
digest,
timestamp,
});
}
}
None
first_match
}
fn curve25519_prime() -> BigUint {
@ -528,7 +610,9 @@ pub fn extract_sni_from_client_hello(handshake: &[u8]) -> Option<String> {
if name_type == 0 && name_len > 0
&& let Ok(host) = std::str::from_utf8(&handshake[sn_pos..sn_pos + name_len])
{
return Some(host.to_string());
if is_valid_sni_hostname(host) {
return Some(host.to_string());
}
}
sn_pos += name_len;
}
@ -539,6 +623,35 @@ pub fn extract_sni_from_client_hello(handshake: &[u8]) -> Option<String> {
None
}
fn is_valid_sni_hostname(host: &str) -> bool {
if host.is_empty() || host.len() > 253 {
return false;
}
if host.starts_with('.') || host.ends_with('.') {
return false;
}
if host.parse::<std::net::IpAddr>().is_ok() {
return false;
}
for label in host.split('.') {
if label.is_empty() || label.len() > 63 {
return false;
}
if label.starts_with('-') || label.ends_with('-') {
return false;
}
if !label
.bytes()
.all(|b| b.is_ascii_alphanumeric() || b == b'-')
{
return false;
}
}
true
}
/// Extract ALPN protocol list from ClientHello, return in offered order.
pub fn extract_alpn_from_client_hello(handshake: &[u8]) -> Vec<Vec<u8>> {
let mut pos = 5; // after record header
@ -667,291 +780,29 @@ fn validate_server_hello_structure(data: &[u8]) -> Result<(), ProxyError> {
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_is_tls_handshake() {
assert!(is_tls_handshake(&[0x16, 0x03, 0x01]));
assert!(is_tls_handshake(&[0x16, 0x03, 0x01, 0x02, 0x00]));
assert!(!is_tls_handshake(&[0x17, 0x03, 0x01])); // Application data
assert!(!is_tls_handshake(&[0x16, 0x03, 0x02])); // Wrong version
assert!(!is_tls_handshake(&[0x16, 0x03])); // Too short
}
#[test]
fn test_parse_tls_record_header() {
let header = [0x16, 0x03, 0x01, 0x02, 0x00];
let result = parse_tls_record_header(&header).unwrap();
assert_eq!(result.0, TLS_RECORD_HANDSHAKE);
assert_eq!(result.1, 512);
let header = [0x17, 0x03, 0x03, 0x40, 0x00];
let result = parse_tls_record_header(&header).unwrap();
assert_eq!(result.0, TLS_RECORD_APPLICATION);
assert_eq!(result.1, 16384);
}
#[test]
fn test_gen_fake_x25519_key() {
let rng = SecureRandom::new();
let key1 = gen_fake_x25519_key(&rng);
let key2 = gen_fake_x25519_key(&rng);
assert_eq!(key1.len(), 32);
assert_eq!(key2.len(), 32);
assert_ne!(key1, key2); // Should be random
}
// ============= Compile-time Security Invariants =============
#[test]
fn test_fake_x25519_key_is_quadratic_residue() {
let rng = SecureRandom::new();
let key = gen_fake_x25519_key(&rng);
let p = curve25519_prime();
let k_num = BigUint::from_bytes_le(&key);
let exponent = (&p - BigUint::one()) >> 1;
let legendre = k_num.modpow(&exponent, &p);
assert_eq!(legendre, BigUint::one());
}
#[test]
fn test_tls_extension_builder() {
let key = [0x42u8; 32];
let mut builder = TlsExtensionBuilder::new();
builder.add_key_share(&key);
builder.add_supported_versions(0x0304);
let result = builder.build();
// Check length prefix
let len = u16::from_be_bytes([result[0], result[1]]) as usize;
assert_eq!(len, result.len() - 2);
// Check key_share extension is present
assert!(result.len() > 40); // At least key share
}
#[test]
fn test_server_hello_builder() {
let session_id = vec![0x01, 0x02, 0x03, 0x04];
let key = [0x55u8; 32];
let builder = ServerHelloBuilder::new(session_id.clone())
.with_x25519_key(&key)
.with_tls13_version();
let record = builder.build_record();
// Validate structure
validate_server_hello_structure(&record).expect("Invalid ServerHello structure");
// Check record type
assert_eq!(record[0], TLS_RECORD_HANDSHAKE);
// Check version
assert_eq!(&record[1..3], &TLS_VERSION);
// Check message type (ServerHello = 0x02)
assert_eq!(record[5], 0x02);
}
#[test]
fn test_build_server_hello_structure() {
let secret = b"test secret";
let client_digest = [0x42u8; 32];
let session_id = vec![0xAA; 32];
let rng = SecureRandom::new();
let response = build_server_hello(secret, &client_digest, &session_id, 2048, &rng, None, 0);
// Should have at least 3 records
assert!(response.len() > 100);
// First record should be ServerHello
assert_eq!(response[0], TLS_RECORD_HANDSHAKE);
// Validate ServerHello structure
validate_server_hello_structure(&response).expect("Invalid ServerHello");
// Find Change Cipher Spec
let server_hello_len = 5 + u16::from_be_bytes([response[3], response[4]]) as usize;
let ccs_start = server_hello_len;
assert!(response.len() > ccs_start + 6);
assert_eq!(response[ccs_start], TLS_RECORD_CHANGE_CIPHER);
// Find Application Data
let ccs_len = 5 + u16::from_be_bytes([response[ccs_start + 3], response[ccs_start + 4]]) as usize;
let app_start = ccs_start + ccs_len;
assert!(response.len() > app_start + 5);
assert_eq!(response[app_start], TLS_RECORD_APPLICATION);
}
#[test]
fn test_build_server_hello_digest() {
let secret = b"test secret key here";
let client_digest = [0x42u8; 32];
let session_id = vec![0xAA; 32];
let rng = SecureRandom::new();
let response1 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng, None, 0);
let response2 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng, None, 0);
// Digest position should have non-zero data
let digest1 = &response1[TLS_DIGEST_POS..TLS_DIGEST_POS + TLS_DIGEST_LEN];
assert!(!digest1.iter().all(|&b| b == 0));
// Different calls should have different digests (due to random cert)
let digest2 = &response2[TLS_DIGEST_POS..TLS_DIGEST_POS + TLS_DIGEST_LEN];
assert_ne!(digest1, digest2);
}
#[test]
fn test_server_hello_extensions_length() {
let session_id = vec![0x01; 32];
let key = [0x55u8; 32];
let builder = ServerHelloBuilder::new(session_id)
.with_x25519_key(&key)
.with_tls13_version();
let record = builder.build_record();
// Parse to find extensions
let msg_start = 5; // After record header
let msg_len = u32::from_be_bytes([0, record[6], record[7], record[8]]) as usize;
// Skip to session ID
let session_id_pos = msg_start + 4 + 2 + 32; // header(4) + version(2) + random(32)
let session_id_len = record[session_id_pos] as usize;
// Skip to extensions
let ext_len_pos = session_id_pos + 1 + session_id_len + 2 + 1; // session_id + cipher(2) + compression(1)
let ext_len = u16::from_be_bytes([record[ext_len_pos], record[ext_len_pos + 1]]) as usize;
// Verify extensions length matches actual data
let extensions_data = &record[ext_len_pos + 2..msg_start + 4 + msg_len];
assert_eq!(ext_len, extensions_data.len(),
"Extension length mismatch: declared {}, actual {}", ext_len, extensions_data.len());
}
#[test]
fn test_validate_tls_handshake_format() {
// Build a minimal ClientHello-like structure
let mut handshake = vec![0u8; 100];
// Put a valid-looking digest at position 11
handshake[TLS_DIGEST_POS..TLS_DIGEST_POS + TLS_DIGEST_LEN]
.copy_from_slice(&[0x42; 32]);
// Session ID length
handshake[TLS_DIGEST_POS + TLS_DIGEST_LEN] = 32;
// This won't validate (wrong HMAC) but shouldn't panic
let secrets = vec![("test".to_string(), b"secret".to_vec())];
let result = validate_tls_handshake(&handshake, &secrets, true);
// Should return None (no match) but not panic
assert!(result.is_none());
}
/// Compile-time checks that enforce invariants the rest of the code relies on.
/// Using `static_assertions` ensures these can never silently break across
/// refactors without a compile error.
mod compile_time_security_checks {
use super::{TLS_DIGEST_LEN, TLS_DIGEST_HALF_LEN};
use static_assertions::const_assert;
fn build_client_hello_with_exts(exts: Vec<(u16, Vec<u8>)>, host: &str) -> Vec<u8> {
let mut body = Vec::new();
body.extend_from_slice(&TLS_VERSION); // legacy version
body.extend_from_slice(&[0u8; 32]); // random
body.push(0); // session id len
body.extend_from_slice(&2u16.to_be_bytes()); // cipher suites len
body.extend_from_slice(&[0x13, 0x01]); // TLS_AES_128_GCM_SHA256
body.push(1); // compression len
body.push(0); // null compression
// The digest must be exactly one SHA-256 output.
const_assert!(TLS_DIGEST_LEN == 32);
// Build SNI extension
let host_bytes = host.as_bytes();
let mut sni_ext = Vec::new();
sni_ext.extend_from_slice(&(host_bytes.len() as u16 + 3).to_be_bytes());
sni_ext.push(0);
sni_ext.extend_from_slice(&(host_bytes.len() as u16).to_be_bytes());
sni_ext.extend_from_slice(host_bytes);
// Replay-dedup stores the first half; verify it is literally half.
const_assert!(TLS_DIGEST_HALF_LEN * 2 == TLS_DIGEST_LEN);
let mut ext_blob = Vec::new();
for (typ, data) in exts {
ext_blob.extend_from_slice(&typ.to_be_bytes());
ext_blob.extend_from_slice(&(data.len() as u16).to_be_bytes());
ext_blob.extend_from_slice(&data);
}
// SNI last
ext_blob.extend_from_slice(&0x0000u16.to_be_bytes());
ext_blob.extend_from_slice(&(sni_ext.len() as u16).to_be_bytes());
ext_blob.extend_from_slice(&sni_ext);
body.extend_from_slice(&(ext_blob.len() as u16).to_be_bytes());
body.extend_from_slice(&ext_blob);
let mut handshake = Vec::new();
handshake.push(0x01); // ClientHello
let len_bytes = (body.len() as u32).to_be_bytes();
handshake.extend_from_slice(&len_bytes[1..4]);
handshake.extend_from_slice(&body);
let mut record = Vec::new();
record.push(TLS_RECORD_HANDSHAKE);
record.extend_from_slice(&[0x03, 0x01]);
record.extend_from_slice(&(handshake.len() as u16).to_be_bytes());
record.extend_from_slice(&handshake);
record
}
#[test]
fn test_extract_sni_with_grease_extension() {
// GREASE type 0x0a0a with zero length before SNI
let ch = build_client_hello_with_exts(vec![(0x0a0a, Vec::new())], "example.com");
let sni = extract_sni_from_client_hello(&ch);
assert_eq!(sni.as_deref(), Some("example.com"));
}
#[test]
fn test_extract_sni_tolerates_empty_unknown_extension() {
let ch = build_client_hello_with_exts(vec![(0x1234, Vec::new())], "test.local");
let sni = extract_sni_from_client_hello(&ch);
assert_eq!(sni.as_deref(), Some("test.local"));
}
#[test]
fn test_extract_alpn_single() {
let mut alpn_data = Vec::new();
// list length = 3 (1 length byte + "h2")
alpn_data.extend_from_slice(&3u16.to_be_bytes());
alpn_data.push(2);
alpn_data.extend_from_slice(b"h2");
let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test");
let alpn = extract_alpn_from_client_hello(&ch);
let alpn_str: Vec<String> = alpn
.iter()
.map(|p| std::str::from_utf8(p).unwrap().to_string())
.collect();
assert_eq!(alpn_str, vec!["h2"]);
}
#[test]
fn test_extract_alpn_multiple() {
let mut alpn_data = Vec::new();
// list length = 11 (sum of per-proto lengths including length bytes)
alpn_data.extend_from_slice(&11u16.to_be_bytes());
alpn_data.push(2);
alpn_data.extend_from_slice(b"h2");
alpn_data.push(4);
alpn_data.extend_from_slice(b"spdy");
alpn_data.push(2);
alpn_data.extend_from_slice(b"h3");
let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test");
let alpn = extract_alpn_from_client_hello(&ch);
let alpn_str: Vec<String> = alpn
.iter()
.map(|p| std::str::from_utf8(p).unwrap().to_string())
.collect();
assert_eq!(alpn_str, vec!["h2", "spdy", "h3"]);
}
// The HMAC check window (28 bytes) plus the embedded timestamp (4 bytes)
// must exactly fill the digest. If TLS_DIGEST_LEN ever changes, these
// assertions will catch the mismatch before any timing-oracle fix is broke.
const_assert!(28 + 4 == TLS_DIGEST_LEN);
}
// ============= Security-focused regression tests =============
#[cfg(test)]
#[path = "tls_security_tests.rs"]
mod security_tests;

File diff suppressed because it is too large Load Diff

View File

@ -4,7 +4,10 @@ use std::future::Future;
use std::net::{IpAddr, SocketAddr};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use ipnetwork::IpNetwork;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
use tokio::net::TcpStream;
use tokio::time::timeout;
@ -23,7 +26,7 @@ enum HandshakeOutcome {
use crate::config::ProxyConfig;
use crate::crypto::SecureRandom;
use crate::error::{HandshakeResult, ProxyError, Result};
use crate::error::{HandshakeResult, ProxyError, Result, StreamError};
use crate::ip_tracker::UserIpTracker;
use crate::protocol::constants::*;
use crate::protocol::tls;
@ -63,14 +66,30 @@ fn record_handshake_failure_class(
peer_ip: IpAddr,
error: &ProxyError,
) {
let class = if error.to_string().contains("expected 64 bytes, got 0") {
"expected_64_got_0"
} else {
"other"
let class = match error {
ProxyError::Io(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {
"expected_64_got_0"
}
ProxyError::Stream(StreamError::UnexpectedEof) => "expected_64_got_0",
_ => "other",
};
record_beobachten_class(beobachten, config, peer_ip, class);
}
fn is_trusted_proxy_source(peer_ip: IpAddr, trusted: &[IpNetwork]) -> bool {
if trusted.is_empty() {
static EMPTY_PROXY_TRUST_WARNED: OnceLock<AtomicBool> = OnceLock::new();
let warned = EMPTY_PROXY_TRUST_WARNED.get_or_init(|| AtomicBool::new(false));
if !warned.swap(true, Ordering::Relaxed) {
warn!(
"PROXY protocol enabled but server.proxy_protocol_trusted_cidrs is empty; rejecting all PROXY headers by default"
);
}
return false;
}
trusted.iter().any(|cidr| cidr.contains(peer_ip))
}
pub async fn handle_client_stream<S>(
mut stream: S,
peer: SocketAddr,
@ -104,6 +123,17 @@ where
);
match timeout(proxy_header_timeout, parse_proxy_protocol(&mut stream, peer)).await {
Ok(Ok(info)) => {
if !is_trusted_proxy_source(peer.ip(), &config.server.proxy_protocol_trusted_cidrs)
{
stats.increment_connects_bad();
warn!(
peer = %peer,
trusted = ?config.server.proxy_protocol_trusted_cidrs,
"Rejecting PROXY protocol header from untrusted source"
);
record_beobachten_class(&beobachten, &config, peer.ip(), "other");
return Err(ProxyError::InvalidProxyProtocol);
}
debug!(
peer = %peer,
client = %info.src_addr,
@ -149,8 +179,13 @@ where
if is_tls {
let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize;
if tls_len < 512 {
debug!(peer = %real_peer, tls_len = tls_len, "TLS handshake too short");
// RFC 8446 §5.1 mandates that TLSPlaintext records must not exceed 2^14
// bytes (16_384). A client claiming a larger record is non-compliant and
// may be an active probe attempting to force large allocations.
//
// Also enforce a minimum record size to avoid trivial/garbage probes.
if !(512..=MAX_TLS_RECORD_SIZE).contains(&tls_len) {
debug!(peer = %real_peer, tls_len = tls_len, max_tls_len = MAX_TLS_RECORD_SIZE, "TLS handshake length out of bounds");
stats.increment_connects_bad();
let (reader, writer) = tokio::io::split(stream);
handle_bad_client(
@ -204,9 +239,19 @@ where
&config, &replay_checker, true, Some(tls_user.as_str()),
).await {
HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader: _, writer: _ } => {
HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad();
debug!(peer = %peer, "Valid TLS but invalid MTProto handshake");
handle_bad_client(
reader,
writer,
&mtproto_handshake,
real_peer,
local_addr,
&config,
&beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled);
}
HandshakeResult::Error(e) => return Err(e),
@ -445,6 +490,24 @@ impl RunningClientHandler {
.await
{
Ok(Ok(info)) => {
if !is_trusted_proxy_source(
self.peer.ip(),
&self.config.server.proxy_protocol_trusted_cidrs,
) {
self.stats.increment_connects_bad();
warn!(
peer = %self.peer,
trusted = ?self.config.server.proxy_protocol_trusted_cidrs,
"Rejecting PROXY protocol header from untrusted source"
);
record_beobachten_class(
&self.beobachten,
&self.config,
self.peer.ip(),
"other",
);
return Err(ProxyError::InvalidProxyProtocol);
}
debug!(
peer = %self.peer,
client = %info.src_addr,
@ -513,8 +576,10 @@ impl RunningClientHandler {
debug!(peer = %peer, tls_len = tls_len, "Reading TLS handshake");
if tls_len < 512 {
debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short");
// See RFC 8446 §5.1: TLSPlaintext records must not exceed 16_384 bytes.
// Treat too-small or too-large lengths as active probes and mask them.
if !(512..=MAX_TLS_RECORD_SIZE).contains(&tls_len) {
debug!(peer = %peer, tls_len = tls_len, max_tls_len = MAX_TLS_RECORD_SIZE, "TLS handshake length out of bounds");
self.stats.increment_connects_bad();
let (reader, writer) = self.stream.into_split();
handle_bad_client(
@ -590,12 +655,19 @@ impl RunningClientHandler {
.await
{
HandshakeResult::Success(result) => result,
HandshakeResult::BadClient {
reader: _,
writer: _,
} => {
HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad();
debug!(peer = %peer, "Valid TLS but invalid MTProto handshake");
handle_bad_client(
reader,
writer,
&mtproto_handshake,
peer,
local_addr,
&config,
&self.beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled);
}
HandshakeResult::Error(e) => return Err(e),
@ -742,7 +814,7 @@ impl RunningClientHandler {
client_writer,
success,
pool.clone(),
stats,
stats.clone(),
config,
buffer_pool,
local_addr,
@ -759,7 +831,7 @@ impl RunningClientHandler {
client_writer,
success,
upstream_manager,
stats,
stats.clone(),
config,
buffer_pool,
rng,
@ -776,7 +848,7 @@ impl RunningClientHandler {
client_writer,
success,
upstream_manager,
stats,
stats.clone(),
config,
buffer_pool,
rng,
@ -787,6 +859,7 @@ impl RunningClientHandler {
.await
};
stats.decrement_user_curr_connects(&user);
ip_tracker.remove_ip(&user, peer_addr.ip()).await;
relay_result
}
@ -806,9 +879,29 @@ impl RunningClientHandler {
});
}
let ip_reserved = match ip_tracker.check_and_add(user, peer_addr.ip()).await {
Ok(()) => true,
if let Some(quota) = config.access.user_data_quota.get(user)
&& stats.get_user_total_octets(user) >= *quota
{
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
}
let limit = config
.access
.user_max_tcp_conns
.get(user)
.map(|v| *v as u64);
if !stats.try_acquire_user_curr_connects(user, limit) {
return Err(ProxyError::ConnectionLimitExceeded {
user: user.to_string(),
});
}
match ip_tracker.check_and_add(user, peer_addr.ip()).await {
Ok(()) => {}
Err(reason) => {
stats.decrement_user_curr_connects(user);
warn!(
user = %user,
ip = %peer_addr.ip(),
@ -819,33 +912,12 @@ impl RunningClientHandler {
user: user.to_string(),
});
}
};
// IP limit check
if let Some(limit) = config.access.user_max_tcp_conns.get(user)
&& stats.get_user_curr_connects(user) >= *limit as u64
{
if ip_reserved {
ip_tracker.remove_ip(user, peer_addr.ip()).await;
stats.increment_ip_reservation_rollback_tcp_limit_total();
}
return Err(ProxyError::ConnectionLimitExceeded {
user: user.to_string(),
});
}
if let Some(quota) = config.access.user_data_quota.get(user)
&& stats.get_user_total_octets(user) >= *quota
{
if ip_reserved {
ip_tracker.remove_ip(user, peer_addr.ip()).await;
stats.increment_ip_reservation_rollback_quota_limit_total();
}
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
}
Ok(())
}
}
#[cfg(test)]
#[path = "client_security_tests.rs"]
mod security_tests;

File diff suppressed because it is too large Load Diff

View File

@ -2,6 +2,8 @@ use std::fs::OpenOptions;
use std::io::Write;
use std::net::SocketAddr;
use std::sync::Arc;
use std::collections::HashSet;
use std::sync::{Mutex, OnceLock};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
@ -22,6 +24,45 @@ use crate::stats::Stats;
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
use crate::transport::UpstreamManager;
const UNKNOWN_DC_LOG_DISTINCT_LIMIT: usize = 1024;
static LOGGED_UNKNOWN_DCS: OnceLock<Mutex<HashSet<i16>>> = OnceLock::new();
// In tests, this function shares global mutable state. Callers that also use
// cache-reset helpers must hold `unknown_dc_test_lock()` to keep assertions
// deterministic under parallel execution.
fn should_log_unknown_dc(dc_idx: i16) -> bool {
let set = LOGGED_UNKNOWN_DCS.get_or_init(|| Mutex::new(HashSet::new()));
match set.lock() {
Ok(mut guard) => {
if guard.contains(&dc_idx) {
return false;
}
if guard.len() >= UNKNOWN_DC_LOG_DISTINCT_LIMIT {
return false;
}
guard.insert(dc_idx)
}
// If the lock is poisoned, keep logging rather than silently dropping
// operator-visible diagnostics.
Err(_) => true,
}
}
#[cfg(test)]
fn clear_unknown_dc_log_cache_for_testing() {
if let Some(set) = LOGGED_UNKNOWN_DCS.get()
&& let Ok(mut guard) = set.lock()
{
guard.clear();
}
}
#[cfg(test)]
fn unknown_dc_test_lock() -> &'static Mutex<()> {
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
TEST_LOCK.get_or_init(|| Mutex::new(()))
}
pub(crate) async fn handle_via_direct<R, W>(
client_reader: CryptoReader<R>,
client_writer: CryptoWriter<W>,
@ -64,7 +105,6 @@ where
debug!(peer = %success.peer, "TG handshake complete, starting relay");
stats.increment_user_connects(user);
stats.increment_user_curr_connects(user);
stats.increment_current_connections_direct();
let relay_result = relay_bidirectional(
@ -109,7 +149,6 @@ where
};
stats.decrement_current_connections_direct();
stats.decrement_user_curr_connects(user);
match &relay_result {
Ok(()) => debug!(user = %user, "Direct relay completed"),
@ -160,6 +199,7 @@ fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> {
warn!(dc_idx = dc_idx, "Requested non-standard DC with no override; falling back to default cluster");
if config.general.unknown_dc_file_log_enabled
&& let Some(path) = &config.general.unknown_dc_log_path
&& should_log_unknown_dc(dc_idx)
&& let Ok(handle) = tokio::runtime::Handle::try_current()
{
let path = path.clone();
@ -175,7 +215,7 @@ fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> {
let fallback_idx = if default_dc >= 1 && default_dc <= num_dcs {
default_dc - 1
} else {
1
0
};
info!(
@ -203,8 +243,6 @@ async fn do_tg_handshake_static(
let (nonce, _tg_enc_key, _tg_enc_iv, _tg_dec_key, _tg_dec_iv) = generate_tg_nonce(
success.proto_tag,
success.dc_idx,
&success.dec_key,
success.dec_iv,
&success.enc_key,
success.enc_iv,
rng,
@ -230,3 +268,7 @@ async fn do_tg_handshake_static(
CryptoWriter::new(write_half, tg_encryptor, max_pending),
))
}
#[cfg(test)]
#[path = "direct_relay_security_tests.rs"]
mod security_tests;

View File

@ -0,0 +1,51 @@
use super::*;
#[test]
fn unknown_dc_log_is_deduplicated_per_dc_idx() {
let _guard = unknown_dc_test_lock()
.lock()
.expect("unknown dc test lock must be available");
clear_unknown_dc_log_cache_for_testing();
assert!(should_log_unknown_dc(777));
assert!(
!should_log_unknown_dc(777),
"same unknown dc_idx must not be logged repeatedly"
);
assert!(
should_log_unknown_dc(778),
"different unknown dc_idx must still be loggable"
);
}
#[test]
fn unknown_dc_log_respects_distinct_limit() {
let _guard = unknown_dc_test_lock()
.lock()
.expect("unknown dc test lock must be available");
clear_unknown_dc_log_cache_for_testing();
for dc in 1..=UNKNOWN_DC_LOG_DISTINCT_LIMIT {
assert!(
should_log_unknown_dc(dc as i16),
"expected first-time unknown dc_idx to be loggable"
);
}
assert!(
!should_log_unknown_dc(i16::MAX),
"distinct unknown dc_idx entries above limit must not be logged"
);
}
#[test]
fn fallback_dc_never_panics_with_single_dc_list() {
let mut cfg = ProxyConfig::default();
cfg.network.prefer = 6;
cfg.network.ipv6 = Some(true);
cfg.default_dc = Some(42);
let addr = get_dc_addr_static(999, &cfg).expect("fallback dc must resolve safely");
let expected = SocketAddr::new(TG_DATACENTERS_V6[0], TG_DATACENTER_PORT);
assert_eq!(addr, expected);
}

View File

@ -3,8 +3,12 @@
#![allow(dead_code)]
use std::net::SocketAddr;
use std::collections::HashSet;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};
use dashmap::DashMap;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tracing::{debug, warn, trace};
use zeroize::Zeroize;
@ -19,6 +23,231 @@ use crate::stats::ReplayChecker;
use crate::config::ProxyConfig;
use crate::tls_front::{TlsFrontCache, emulator};
const ACCESS_SECRET_BYTES: usize = 16;
static INVALID_SECRET_WARNED: OnceLock<Mutex<HashSet<(String, String)>>> = OnceLock::new();
const AUTH_PROBE_TRACK_RETENTION_SECS: u64 = 10 * 60;
#[cfg(test)]
const AUTH_PROBE_TRACK_MAX_ENTRIES: usize = 256;
#[cfg(not(test))]
const AUTH_PROBE_TRACK_MAX_ENTRIES: usize = 65_536;
const AUTH_PROBE_PRUNE_SCAN_LIMIT: usize = 1_024;
const AUTH_PROBE_BACKOFF_START_FAILS: u32 = 4;
#[cfg(test)]
const AUTH_PROBE_BACKOFF_BASE_MS: u64 = 1;
#[cfg(not(test))]
const AUTH_PROBE_BACKOFF_BASE_MS: u64 = 25;
#[cfg(test)]
const AUTH_PROBE_BACKOFF_MAX_MS: u64 = 16;
#[cfg(not(test))]
const AUTH_PROBE_BACKOFF_MAX_MS: u64 = 1_000;
#[derive(Clone, Copy)]
struct AuthProbeState {
fail_streak: u32,
blocked_until: Instant,
last_seen: Instant,
}
static AUTH_PROBE_STATE: OnceLock<DashMap<IpAddr, AuthProbeState>> = OnceLock::new();
fn auth_probe_state_map() -> &'static DashMap<IpAddr, AuthProbeState> {
AUTH_PROBE_STATE.get_or_init(DashMap::new)
}
fn auth_probe_backoff(fail_streak: u32) -> Duration {
if fail_streak < AUTH_PROBE_BACKOFF_START_FAILS {
return Duration::ZERO;
}
let shift = (fail_streak - AUTH_PROBE_BACKOFF_START_FAILS).min(10);
let multiplier = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
let ms = AUTH_PROBE_BACKOFF_BASE_MS
.saturating_mul(multiplier)
.min(AUTH_PROBE_BACKOFF_MAX_MS);
Duration::from_millis(ms)
}
fn auth_probe_state_expired(state: &AuthProbeState, now: Instant) -> bool {
let retention = Duration::from_secs(AUTH_PROBE_TRACK_RETENTION_SECS);
now.duration_since(state.last_seen) > retention
}
fn auth_probe_is_throttled(peer_ip: IpAddr, now: Instant) -> bool {
let state = auth_probe_state_map();
let Some(entry) = state.get(&peer_ip) else {
return false;
};
if auth_probe_state_expired(&entry, now) {
drop(entry);
state.remove(&peer_ip);
return false;
}
now < entry.blocked_until
}
fn auth_probe_record_failure(peer_ip: IpAddr, now: Instant) {
let state = auth_probe_state_map();
auth_probe_record_failure_with_state(state, peer_ip, now);
}
fn auth_probe_record_failure_with_state(
state: &DashMap<IpAddr, AuthProbeState>,
peer_ip: IpAddr,
now: Instant,
) {
if let Some(mut entry) = state.get_mut(&peer_ip) {
if auth_probe_state_expired(&entry, now) {
*entry = AuthProbeState {
fail_streak: 1,
blocked_until: now + auth_probe_backoff(1),
last_seen: now,
};
return;
}
entry.fail_streak = entry.fail_streak.saturating_add(1);
entry.last_seen = now;
entry.blocked_until = now + auth_probe_backoff(entry.fail_streak);
return;
};
if state.len() >= AUTH_PROBE_TRACK_MAX_ENTRIES {
let mut stale_keys = Vec::new();
for entry in state.iter().take(AUTH_PROBE_PRUNE_SCAN_LIMIT) {
if auth_probe_state_expired(entry.value(), now) {
stale_keys.push(*entry.key());
}
}
for stale_key in stale_keys {
state.remove(&stale_key);
}
if state.len() >= AUTH_PROBE_TRACK_MAX_ENTRIES {
return;
}
}
state.insert(peer_ip, AuthProbeState {
fail_streak: 0,
blocked_until: now,
last_seen: now,
});
if let Some(mut entry) = state.get_mut(&peer_ip) {
entry.fail_streak = 1;
entry.blocked_until = now + auth_probe_backoff(1);
}
}
fn auth_probe_record_success(peer_ip: IpAddr) {
let state = auth_probe_state_map();
state.remove(&peer_ip);
}
#[cfg(test)]
fn clear_auth_probe_state_for_testing() {
if let Some(state) = AUTH_PROBE_STATE.get() {
state.clear();
}
}
#[cfg(test)]
fn auth_probe_fail_streak_for_testing(peer_ip: IpAddr) -> Option<u32> {
let state = AUTH_PROBE_STATE.get()?;
state.get(&peer_ip).map(|entry| entry.fail_streak)
}
#[cfg(test)]
fn auth_probe_is_throttled_for_testing(peer_ip: IpAddr) -> bool {
auth_probe_is_throttled(peer_ip, Instant::now())
}
#[cfg(test)]
fn auth_probe_test_lock() -> &'static Mutex<()> {
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
TEST_LOCK.get_or_init(|| Mutex::new(()))
}
#[cfg(test)]
fn clear_warned_secrets_for_testing() {
if let Some(warned) = INVALID_SECRET_WARNED.get()
&& let Ok(mut guard) = warned.lock()
{
guard.clear();
}
}
fn warn_invalid_secret_once(name: &str, reason: &str, expected: usize, got: Option<usize>) {
let key = (name.to_string(), reason.to_string());
let warned = INVALID_SECRET_WARNED.get_or_init(|| Mutex::new(HashSet::new()));
let should_warn = match warned.lock() {
Ok(mut guard) => guard.insert(key),
Err(_) => true,
};
if !should_warn {
return;
}
match got {
Some(actual) => {
warn!(
user = %name,
expected = expected,
got = actual,
"Skipping user: access secret has unexpected length"
);
}
None => {
warn!(
user = %name,
"Skipping user: access secret is not valid hex"
);
}
}
}
fn decode_user_secret(name: &str, secret_hex: &str) -> Option<Vec<u8>> {
match hex::decode(secret_hex) {
Ok(bytes) if bytes.len() == ACCESS_SECRET_BYTES => Some(bytes),
Ok(bytes) => {
warn_invalid_secret_once(
name,
"invalid_length",
ACCESS_SECRET_BYTES,
Some(bytes.len()),
);
None
}
Err(_) => {
warn_invalid_secret_once(name, "invalid_hex", ACCESS_SECRET_BYTES, None);
None
}
}
}
// Decide whether a client-supplied proto tag is allowed given the configured
// proxy modes and the transport that carried the handshake.
//
// A common mistake is to treat `modes.tls` and `modes.secure` as interchangeable
// even though they correspond to different transport profiles: `modes.tls` is
// for the TLS-fronted (EE-TLS) path, while `modes.secure` is for direct MTProto
// over TCP (DD). Enforcing this separation prevents an attacker from using a
// TLS-capable client to bypass the operator intent for the direct MTProto mode,
// and vice versa.
fn mode_enabled_for_proto(config: &ProxyConfig, proto_tag: ProtoTag, is_tls: bool) -> bool {
match proto_tag {
ProtoTag::Secure => {
if is_tls {
config.general.modes.tls
} else {
config.general.modes.secure
}
}
ProtoTag::Intermediate | ProtoTag::Abridged => config.general.modes.classic,
}
}
fn decode_user_secrets(
config: &ProxyConfig,
preferred_user: Option<&str>,
@ -27,7 +256,7 @@ fn decode_user_secrets(
if let Some(preferred) = preferred_user
&& let Some(secret_hex) = config.access.users.get(preferred)
&& let Ok(bytes) = hex::decode(secret_hex)
&& let Some(bytes) = decode_user_secret(preferred, secret_hex)
{
secrets.push((preferred.to_string(), bytes));
}
@ -36,7 +265,7 @@ fn decode_user_secrets(
if preferred_user.is_some_and(|preferred| preferred == name.as_str()) {
continue;
}
if let Ok(bytes) = hex::decode(secret_hex) {
if let Some(bytes) = decode_user_secret(name, secret_hex) {
secrets.push((name.clone(), bytes));
}
}
@ -48,7 +277,7 @@ fn decode_user_secrets(
///
/// Key material (`dec_key`, `dec_iv`, `enc_key`, `enc_iv`) is
/// zeroized on drop.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct HandshakeSuccess {
/// Authenticated user name
pub user: String,
@ -94,28 +323,27 @@ where
{
debug!(peer = %peer, handshake_len = handshake.len(), "Processing TLS handshake");
if auth_probe_is_throttled(peer.ip(), Instant::now()) {
debug!(peer = %peer, "TLS handshake rejected by pre-auth probe throttle");
return HandshakeResult::BadClient { reader, writer };
}
if handshake.len() < tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN + 1 {
debug!(peer = %peer, "TLS handshake too short");
return HandshakeResult::BadClient { reader, writer };
}
let digest = &handshake[tls::TLS_DIGEST_POS..tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN];
let digest_half = &digest[..tls::TLS_DIGEST_HALF_LEN];
if replay_checker.check_and_add_tls_digest(digest_half) {
warn!(peer = %peer, "TLS replay attack detected (duplicate digest)");
return HandshakeResult::BadClient { reader, writer };
}
let secrets = decode_user_secrets(config, None);
let validation = match tls::validate_tls_handshake(
let validation = match tls::validate_tls_handshake_with_replay_window(
handshake,
&secrets,
config.access.ignore_time_skew,
config.access.replay_window_secs,
) {
Some(v) => v,
None => {
auth_probe_record_failure(peer.ip(), Instant::now());
debug!(
peer = %peer,
ignore_time_skew = config.access.ignore_time_skew,
@ -125,6 +353,15 @@ where
}
};
// Replay tracking is applied only after successful authentication to avoid
// letting unauthenticated probes evict valid entries from the replay cache.
let digest_half = &validation.digest[..tls::TLS_DIGEST_HALF_LEN];
if replay_checker.check_and_add_tls_digest(digest_half) {
auth_probe_record_failure(peer.ip(), Instant::now());
warn!(peer = %peer, "TLS replay attack detected (duplicate digest)");
return HandshakeResult::BadClient { reader, writer };
}
let secret = match secrets.iter().find(|(name, _)| *name == validation.user) {
Some((_, s)) => s,
None => return HandshakeResult::BadClient { reader, writer },
@ -166,6 +403,9 @@ where
Some(b"h2".to_vec())
} else if alpn_list.iter().any(|p| p == b"http/1.1") {
Some(b"http/1.1".to_vec())
} else if !alpn_list.is_empty() {
debug!(peer = %peer, "Client ALPN list has no supported protocol; using masking fallback");
return HandshakeResult::BadClient { reader, writer };
} else {
None
}
@ -228,6 +468,8 @@ where
"TLS handshake successful"
);
auth_probe_record_success(peer.ip());
HandshakeResult::Success((
FakeTlsReader::new(reader),
FakeTlsWriter::new(writer),
@ -252,13 +494,13 @@ where
{
trace!(peer = %peer, handshake = ?hex::encode(handshake), "MTProto handshake bytes");
let dec_prekey_iv = &handshake[SKIP_LEN..SKIP_LEN + PREKEY_LEN + IV_LEN];
if replay_checker.check_and_add_handshake(dec_prekey_iv) {
warn!(peer = %peer, "MTProto replay attack detected");
if auth_probe_is_throttled(peer.ip(), Instant::now()) {
debug!(peer = %peer, "MTProto handshake rejected by pre-auth probe throttle");
return HandshakeResult::BadClient { reader, writer };
}
let dec_prekey_iv = &handshake[SKIP_LEN..SKIP_LEN + PREKEY_LEN + IV_LEN];
let enc_prekey_iv: Vec<u8> = dec_prekey_iv.iter().rev().copied().collect();
let decoded_users = decode_user_secrets(config, preferred_user);
@ -273,39 +515,33 @@ where
dec_key_input.extend_from_slice(&secret);
let dec_key = sha256(&dec_key_input);
let dec_iv = u128::from_be_bytes(dec_iv_bytes.try_into().unwrap());
let mut dec_iv_arr = [0u8; IV_LEN];
dec_iv_arr.copy_from_slice(dec_iv_bytes);
let dec_iv = u128::from_be_bytes(dec_iv_arr);
let mut decryptor = AesCtr::new(&dec_key, dec_iv);
let decrypted = decryptor.decrypt(handshake);
let tag_bytes: [u8; 4] = decrypted[PROTO_TAG_POS..PROTO_TAG_POS + 4]
.try_into()
.unwrap();
let tag_bytes: [u8; 4] = [
decrypted[PROTO_TAG_POS],
decrypted[PROTO_TAG_POS + 1],
decrypted[PROTO_TAG_POS + 2],
decrypted[PROTO_TAG_POS + 3],
];
let proto_tag = match ProtoTag::from_bytes(tag_bytes) {
Some(tag) => tag,
None => continue,
};
let mode_ok = match proto_tag {
ProtoTag::Secure => {
if is_tls {
config.general.modes.tls || config.general.modes.secure
} else {
config.general.modes.secure || config.general.modes.tls
}
}
ProtoTag::Intermediate | ProtoTag::Abridged => config.general.modes.classic,
};
let mode_ok = mode_enabled_for_proto(config, proto_tag, is_tls);
if !mode_ok {
debug!(peer = %peer, user = %user, proto = ?proto_tag, "Mode not enabled");
continue;
}
let dc_idx = i16::from_le_bytes(
decrypted[DC_IDX_POS..DC_IDX_POS + 2].try_into().unwrap()
);
let dc_idx = i16::from_le_bytes([decrypted[DC_IDX_POS], decrypted[DC_IDX_POS + 1]]);
let enc_prekey = &enc_prekey_iv[..PREKEY_LEN];
let enc_iv_bytes = &enc_prekey_iv[PREKEY_LEN..];
@ -315,10 +551,24 @@ where
enc_key_input.extend_from_slice(&secret);
let enc_key = sha256(&enc_key_input);
let enc_iv = u128::from_be_bytes(enc_iv_bytes.try_into().unwrap());
let mut enc_iv_arr = [0u8; IV_LEN];
enc_iv_arr.copy_from_slice(enc_iv_bytes);
let enc_iv = u128::from_be_bytes(enc_iv_arr);
let encryptor = AesCtr::new(&enc_key, enc_iv);
// Apply replay tracking only after successful authentication.
//
// This ordering prevents an attacker from producing invalid handshakes that
// still collide with a valid handshake's replay slot and thus evict a valid
// entry from the cache. We accept the cost of performing the full
// authentication check first to avoid poisoning the replay cache.
if replay_checker.check_and_add_handshake(dec_prekey_iv) {
auth_probe_record_failure(peer.ip(), Instant::now());
warn!(peer = %peer, user = %user, "MTProto replay attack detected");
return HandshakeResult::BadClient { reader, writer };
}
let success = HandshakeSuccess {
user: user.clone(),
dc_idx,
@ -340,6 +590,8 @@ where
"MTProto handshake successful"
);
auth_probe_record_success(peer.ip());
let max_pending = config.general.crypto_pending_buffer;
return HandshakeResult::Success((
CryptoReader::new(reader, decryptor),
@ -348,6 +600,7 @@ where
));
}
auth_probe_record_failure(peer.ip(), Instant::now());
debug!(peer = %peer, "MTProto handshake: no matching user found");
HandshakeResult::BadClient { reader, writer }
}
@ -356,8 +609,6 @@ where
pub fn generate_tg_nonce(
proto_tag: ProtoTag,
dc_idx: i16,
_client_dec_key: &[u8; 32],
_client_dec_iv: u128,
client_enc_key: &[u8; 32],
client_enc_iv: u128,
rng: &SecureRandom,
@ -365,14 +616,16 @@ pub fn generate_tg_nonce(
) -> ([u8; HANDSHAKE_LEN], [u8; 32], u128, [u8; 32], u128) {
loop {
let bytes = rng.bytes(HANDSHAKE_LEN);
let mut nonce: [u8; HANDSHAKE_LEN] = bytes.try_into().unwrap();
let Ok(mut nonce): Result<[u8; HANDSHAKE_LEN], _> = bytes.try_into() else {
continue;
};
if RESERVED_NONCE_FIRST_BYTES.contains(&nonce[0]) { continue; }
let first_four: [u8; 4] = nonce[..4].try_into().unwrap();
let first_four: [u8; 4] = [nonce[0], nonce[1], nonce[2], nonce[3]];
if RESERVED_NONCE_BEGINNINGS.contains(&first_four) { continue; }
let continue_four: [u8; 4] = nonce[4..8].try_into().unwrap();
let continue_four: [u8; 4] = [nonce[4], nonce[5], nonce[6], nonce[7]];
if RESERVED_NONCE_CONTINUES.contains(&continue_four) { continue; }
nonce[PROTO_TAG_POS..PROTO_TAG_POS + 4].copy_from_slice(&proto_tag.to_bytes());
@ -390,11 +643,17 @@ pub fn generate_tg_nonce(
let enc_key_iv = &nonce[SKIP_LEN..SKIP_LEN + KEY_LEN + IV_LEN];
let dec_key_iv: Vec<u8> = enc_key_iv.iter().rev().copied().collect();
let tg_enc_key: [u8; 32] = enc_key_iv[..KEY_LEN].try_into().unwrap();
let tg_enc_iv = u128::from_be_bytes(enc_key_iv[KEY_LEN..].try_into().unwrap());
let mut tg_enc_key = [0u8; 32];
tg_enc_key.copy_from_slice(&enc_key_iv[..KEY_LEN]);
let mut tg_enc_iv_arr = [0u8; IV_LEN];
tg_enc_iv_arr.copy_from_slice(&enc_key_iv[KEY_LEN..]);
let tg_enc_iv = u128::from_be_bytes(tg_enc_iv_arr);
let tg_dec_key: [u8; 32] = dec_key_iv[..KEY_LEN].try_into().unwrap();
let tg_dec_iv = u128::from_be_bytes(dec_key_iv[KEY_LEN..].try_into().unwrap());
let mut tg_dec_key = [0u8; 32];
tg_dec_key.copy_from_slice(&dec_key_iv[..KEY_LEN]);
let mut tg_dec_iv_arr = [0u8; IV_LEN];
tg_dec_iv_arr.copy_from_slice(&dec_key_iv[KEY_LEN..]);
let tg_dec_iv = u128::from_be_bytes(tg_dec_iv_arr);
return (nonce, tg_enc_key, tg_enc_iv, tg_dec_key, tg_dec_iv);
}
@ -405,11 +664,17 @@ pub fn encrypt_tg_nonce_with_ciphers(nonce: &[u8; HANDSHAKE_LEN]) -> (Vec<u8>, A
let enc_key_iv = &nonce[SKIP_LEN..SKIP_LEN + KEY_LEN + IV_LEN];
let dec_key_iv: Vec<u8> = enc_key_iv.iter().rev().copied().collect();
let enc_key: [u8; 32] = enc_key_iv[..KEY_LEN].try_into().unwrap();
let enc_iv = u128::from_be_bytes(enc_key_iv[KEY_LEN..].try_into().unwrap());
let mut enc_key = [0u8; 32];
enc_key.copy_from_slice(&enc_key_iv[..KEY_LEN]);
let mut enc_iv_arr = [0u8; IV_LEN];
enc_iv_arr.copy_from_slice(&enc_key_iv[KEY_LEN..]);
let enc_iv = u128::from_be_bytes(enc_iv_arr);
let dec_key: [u8; 32] = dec_key_iv[..KEY_LEN].try_into().unwrap();
let dec_iv = u128::from_be_bytes(dec_key_iv[KEY_LEN..].try_into().unwrap());
let mut dec_key = [0u8; 32];
dec_key.copy_from_slice(&dec_key_iv[..KEY_LEN]);
let mut dec_iv_arr = [0u8; IV_LEN];
dec_iv_arr.copy_from_slice(&dec_key_iv[KEY_LEN..]);
let dec_iv = u128::from_be_bytes(dec_iv_arr);
let mut encryptor = AesCtr::new(&enc_key, enc_iv);
let encrypted_full = encryptor.encrypt(nonce); // counter: 0 → 4
@ -429,80 +694,15 @@ pub fn encrypt_tg_nonce(nonce: &[u8; HANDSHAKE_LEN]) -> Vec<u8> {
}
#[cfg(test)]
mod tests {
use super::*;
#[path = "handshake_security_tests.rs"]
mod security_tests;
#[test]
fn test_generate_tg_nonce() {
let client_dec_key = [0x42u8; 32];
let client_dec_iv = 12345u128;
let client_enc_key = [0x24u8; 32];
let client_enc_iv = 54321u128;
/// Compile-time guard: HandshakeSuccess holds cryptographic key material and
/// must never be Copy. A Copy impl would allow silent key duplication,
/// undermining the zeroize-on-drop guarantee.
mod compile_time_security_checks {
use super::HandshakeSuccess;
use static_assertions::assert_not_impl_all;
let rng = SecureRandom::new();
let (nonce, _tg_enc_key, _tg_enc_iv, _tg_dec_key, _tg_dec_iv) =
generate_tg_nonce(
ProtoTag::Secure,
2,
&client_dec_key,
client_dec_iv,
&client_enc_key,
client_enc_iv,
&rng,
false,
);
assert_eq!(nonce.len(), HANDSHAKE_LEN);
let tag_bytes: [u8; 4] = nonce[PROTO_TAG_POS..PROTO_TAG_POS + 4].try_into().unwrap();
assert_eq!(ProtoTag::from_bytes(tag_bytes), Some(ProtoTag::Secure));
}
#[test]
fn test_encrypt_tg_nonce() {
let client_dec_key = [0x42u8; 32];
let client_dec_iv = 12345u128;
let client_enc_key = [0x24u8; 32];
let client_enc_iv = 54321u128;
let rng = SecureRandom::new();
let (nonce, _, _, _, _) =
generate_tg_nonce(
ProtoTag::Secure,
2,
&client_dec_key,
client_dec_iv,
&client_enc_key,
client_enc_iv,
&rng,
false,
);
let encrypted = encrypt_tg_nonce(&nonce);
assert_eq!(encrypted.len(), HANDSHAKE_LEN);
assert_eq!(&encrypted[..PROTO_TAG_POS], &nonce[..PROTO_TAG_POS]);
assert_ne!(&encrypted[PROTO_TAG_POS..], &nonce[PROTO_TAG_POS..]);
}
#[test]
fn test_handshake_success_zeroize_on_drop() {
let success = HandshakeSuccess {
user: "test".to_string(),
dc_idx: 2,
proto_tag: ProtoTag::Secure,
dec_key: [0xAA; 32],
dec_iv: 0xBBBBBBBB,
enc_key: [0xCC; 32],
enc_iv: 0xDDDDDDDD,
peer: "127.0.0.1:1234".parse().unwrap(),
is_tls: true,
};
assert_eq!(success.dec_key, [0xAA; 32]);
assert_eq!(success.enc_key, [0xCC; 32]);
drop(success);
// Drop impl zeroizes key material without panic
}
assert_not_impl_all!(HandshakeSuccess: Copy, Clone);
}

View File

@ -0,0 +1,891 @@
use super::*;
use crate::crypto::sha256_hmac;
use dashmap::DashMap;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
use std::time::{Duration, Instant};
fn make_valid_tls_handshake(secret: &[u8], timestamp: u32) -> Vec<u8> {
let session_id_len: usize = 32;
let len = tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN + 1 + session_id_len;
let mut handshake = vec![0x42u8; len];
handshake[tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN] = session_id_len as u8;
handshake[tls::TLS_DIGEST_POS..tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN].fill(0);
let computed = sha256_hmac(secret, &handshake);
let mut digest = computed;
let ts = timestamp.to_le_bytes();
for i in 0..4 {
digest[28 + i] ^= ts[i];
}
handshake[tls::TLS_DIGEST_POS..tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN]
.copy_from_slice(&digest);
handshake
}
fn make_valid_tls_client_hello_with_alpn(
secret: &[u8],
timestamp: u32,
alpn_protocols: &[&[u8]],
) -> Vec<u8> {
let mut body = Vec::new();
body.extend_from_slice(&TLS_VERSION);
body.extend_from_slice(&[0u8; 32]);
body.push(32);
body.extend_from_slice(&[0x42u8; 32]);
body.extend_from_slice(&2u16.to_be_bytes());
body.extend_from_slice(&[0x13, 0x01]);
body.push(1);
body.push(0);
let mut ext_blob = Vec::new();
if !alpn_protocols.is_empty() {
let mut alpn_list = Vec::new();
for proto in alpn_protocols {
alpn_list.push(proto.len() as u8);
alpn_list.extend_from_slice(proto);
}
let mut alpn_data = Vec::new();
alpn_data.extend_from_slice(&(alpn_list.len() as u16).to_be_bytes());
alpn_data.extend_from_slice(&alpn_list);
ext_blob.extend_from_slice(&0x0010u16.to_be_bytes());
ext_blob.extend_from_slice(&(alpn_data.len() as u16).to_be_bytes());
ext_blob.extend_from_slice(&alpn_data);
}
body.extend_from_slice(&(ext_blob.len() as u16).to_be_bytes());
body.extend_from_slice(&ext_blob);
let mut handshake = Vec::new();
handshake.push(0x01);
let body_len = (body.len() as u32).to_be_bytes();
handshake.extend_from_slice(&body_len[1..4]);
handshake.extend_from_slice(&body);
let mut record = Vec::new();
record.push(TLS_RECORD_HANDSHAKE);
record.extend_from_slice(&[0x03, 0x01]);
record.extend_from_slice(&(handshake.len() as u16).to_be_bytes());
record.extend_from_slice(&handshake);
record[tls::TLS_DIGEST_POS..tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN].fill(0);
let computed = sha256_hmac(secret, &record);
let mut digest = computed;
let ts = timestamp.to_le_bytes();
for i in 0..4 {
digest[28 + i] ^= ts[i];
}
record[tls::TLS_DIGEST_POS..tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN]
.copy_from_slice(&digest);
record
}
fn test_config_with_secret_hex(secret_hex: &str) -> ProxyConfig {
clear_auth_probe_state_for_testing();
let mut cfg = ProxyConfig::default();
cfg.access.users.clear();
cfg.access
.users
.insert("user".to_string(), secret_hex.to_string());
cfg.access.ignore_time_skew = true;
cfg
}
#[test]
fn test_generate_tg_nonce() {
let client_enc_key = [0x24u8; 32];
let client_enc_iv = 54321u128;
let rng = SecureRandom::new();
let (nonce, _tg_enc_key, _tg_enc_iv, _tg_dec_key, _tg_dec_iv) = generate_tg_nonce(
ProtoTag::Secure,
2,
&client_enc_key,
client_enc_iv,
&rng,
false,
);
assert_eq!(nonce.len(), HANDSHAKE_LEN);
let tag_bytes: [u8; 4] = nonce[PROTO_TAG_POS..PROTO_TAG_POS + 4].try_into().unwrap();
assert_eq!(ProtoTag::from_bytes(tag_bytes), Some(ProtoTag::Secure));
}
#[test]
fn test_encrypt_tg_nonce() {
let client_enc_key = [0x24u8; 32];
let client_enc_iv = 54321u128;
let rng = SecureRandom::new();
let (nonce, _, _, _, _) = generate_tg_nonce(
ProtoTag::Secure,
2,
&client_enc_key,
client_enc_iv,
&rng,
false,
);
let encrypted = encrypt_tg_nonce(&nonce);
assert_eq!(encrypted.len(), HANDSHAKE_LEN);
assert_eq!(&encrypted[..PROTO_TAG_POS], &nonce[..PROTO_TAG_POS]);
assert_ne!(&encrypted[PROTO_TAG_POS..], &nonce[PROTO_TAG_POS..]);
}
#[test]
fn test_handshake_success_drop_does_not_panic() {
let success = HandshakeSuccess {
user: "test".to_string(),
dc_idx: 2,
proto_tag: ProtoTag::Secure,
dec_key: [0xAA; 32],
dec_iv: 0xBBBBBBBB,
enc_key: [0xCC; 32],
enc_iv: 0xDDDDDDDD,
peer: "198.51.100.10:1234".parse().unwrap(),
is_tls: true,
};
assert_eq!(success.dec_key, [0xAA; 32]);
assert_eq!(success.enc_key, [0xCC; 32]);
drop(success);
}
#[test]
fn test_generate_tg_nonce_enc_dec_material_is_consistent() {
let client_enc_key = [0x34u8; 32];
let client_enc_iv = 0xffeeddccbbaa00998877665544332211u128;
let rng = SecureRandom::new();
let (nonce, tg_enc_key, tg_enc_iv, tg_dec_key, tg_dec_iv) = generate_tg_nonce(
ProtoTag::Secure,
7,
&client_enc_key,
client_enc_iv,
&rng,
false,
);
let enc_key_iv = &nonce[SKIP_LEN..SKIP_LEN + KEY_LEN + IV_LEN];
let dec_key_iv: Vec<u8> = enc_key_iv.iter().rev().copied().collect();
let mut expected_tg_enc_key = [0u8; 32];
expected_tg_enc_key.copy_from_slice(&enc_key_iv[..KEY_LEN]);
let mut expected_tg_enc_iv_arr = [0u8; IV_LEN];
expected_tg_enc_iv_arr.copy_from_slice(&enc_key_iv[KEY_LEN..]);
let expected_tg_enc_iv = u128::from_be_bytes(expected_tg_enc_iv_arr);
let mut expected_tg_dec_key = [0u8; 32];
expected_tg_dec_key.copy_from_slice(&dec_key_iv[..KEY_LEN]);
let mut expected_tg_dec_iv_arr = [0u8; IV_LEN];
expected_tg_dec_iv_arr.copy_from_slice(&dec_key_iv[KEY_LEN..]);
let expected_tg_dec_iv = u128::from_be_bytes(expected_tg_dec_iv_arr);
assert_eq!(tg_enc_key, expected_tg_enc_key);
assert_eq!(tg_enc_iv, expected_tg_enc_iv);
assert_eq!(tg_dec_key, expected_tg_dec_key);
assert_eq!(tg_dec_iv, expected_tg_dec_iv);
assert_eq!(
i16::from_le_bytes([nonce[DC_IDX_POS], nonce[DC_IDX_POS + 1]]),
7,
"Generated nonce must keep target dc index in protocol slot"
);
}
#[test]
fn test_generate_tg_nonce_fast_mode_embeds_reversed_client_enc_material() {
let client_enc_key = [0xABu8; 32];
let client_enc_iv = 0x11223344556677889900aabbccddeeffu128;
let rng = SecureRandom::new();
let (nonce, _, _, _, _) = generate_tg_nonce(
ProtoTag::Secure,
9,
&client_enc_key,
client_enc_iv,
&rng,
true,
);
let mut expected = Vec::with_capacity(KEY_LEN + IV_LEN);
expected.extend_from_slice(&client_enc_key);
expected.extend_from_slice(&client_enc_iv.to_be_bytes());
expected.reverse();
assert_eq!(&nonce[SKIP_LEN..SKIP_LEN + KEY_LEN + IV_LEN], expected.as_slice());
}
#[test]
fn test_encrypt_tg_nonce_with_ciphers_matches_manual_suffix_encryption() {
let client_enc_key = [0x24u8; 32];
let client_enc_iv = 54321u128;
let rng = SecureRandom::new();
let (nonce, _, _, _, _) = generate_tg_nonce(
ProtoTag::Secure,
2,
&client_enc_key,
client_enc_iv,
&rng,
false,
);
let (encrypted, _, _) = encrypt_tg_nonce_with_ciphers(&nonce);
let enc_key_iv = &nonce[SKIP_LEN..SKIP_LEN + KEY_LEN + IV_LEN];
let mut expected_enc_key = [0u8; 32];
expected_enc_key.copy_from_slice(&enc_key_iv[..KEY_LEN]);
let mut expected_enc_iv_arr = [0u8; IV_LEN];
expected_enc_iv_arr.copy_from_slice(&enc_key_iv[KEY_LEN..]);
let expected_enc_iv = u128::from_be_bytes(expected_enc_iv_arr);
let mut manual_encryptor = AesCtr::new(&expected_enc_key, expected_enc_iv);
let manual = manual_encryptor.encrypt(&nonce);
assert_eq!(encrypted.len(), HANDSHAKE_LEN);
assert_eq!(&encrypted[..PROTO_TAG_POS], &nonce[..PROTO_TAG_POS]);
assert_eq!(
&encrypted[PROTO_TAG_POS..],
&manual[PROTO_TAG_POS..],
"Encrypted nonce suffix must match AES-CTR output with derived enc key/iv"
);
}
#[tokio::test]
async fn tls_replay_second_identical_handshake_is_rejected() {
let secret = [0x11u8; 16];
let config = test_config_with_secret_hex("11111111111111111111111111111111");
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
let rng = SecureRandom::new();
let peer: SocketAddr = "198.51.100.21:44321".parse().unwrap();
let handshake = make_valid_tls_handshake(&secret, 0);
let first = handle_tls_handshake(
&handshake,
tokio::io::empty(),
tokio::io::sink(),
peer,
&config,
&replay_checker,
&rng,
None,
)
.await;
assert!(matches!(first, HandshakeResult::Success(_)));
let second = handle_tls_handshake(
&handshake,
tokio::io::empty(),
tokio::io::sink(),
peer,
&config,
&replay_checker,
&rng,
None,
)
.await;
assert!(matches!(second, HandshakeResult::BadClient { .. }));
}
#[tokio::test]
async fn tls_replay_concurrent_identical_handshake_allows_exactly_one_success() {
let secret = [0x77u8; 16];
let config = Arc::new(test_config_with_secret_hex("77777777777777777777777777777777"));
let replay_checker = Arc::new(ReplayChecker::new(4096, Duration::from_secs(60)));
let rng = Arc::new(SecureRandom::new());
let handshake = Arc::new(make_valid_tls_handshake(&secret, 0));
let mut tasks = Vec::new();
for _ in 0..50 {
let config = config.clone();
let replay_checker = replay_checker.clone();
let rng = rng.clone();
let handshake = handshake.clone();
tasks.push(tokio::spawn(async move {
handle_tls_handshake(
&handshake,
tokio::io::empty(),
tokio::io::sink(),
"198.51.100.22:45000".parse().unwrap(),
&config,
&replay_checker,
&rng,
None,
)
.await
}));
}
let mut success_count = 0usize;
for task in tasks {
let result = task.await.unwrap();
if matches!(result, HandshakeResult::Success(_)) {
success_count += 1;
} else {
assert!(matches!(result, HandshakeResult::BadClient { .. }));
}
}
assert_eq!(
success_count, 1,
"Concurrent replay attempts must allow exactly one successful handshake"
);
}
#[tokio::test]
async fn invalid_tls_probe_does_not_pollute_replay_cache() {
let config = test_config_with_secret_hex("11111111111111111111111111111111");
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
let rng = SecureRandom::new();
let peer: SocketAddr = "198.51.100.23:44322".parse().unwrap();
let mut invalid = vec![0x42u8; tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN + 1 + 32];
invalid[tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN] = 32;
let before = replay_checker.stats();
let result = handle_tls_handshake(
&invalid,
tokio::io::empty(),
tokio::io::sink(),
peer,
&config,
&replay_checker,
&rng,
None,
)
.await;
let after = replay_checker.stats();
assert!(matches!(result, HandshakeResult::BadClient { .. }));
assert_eq!(before.total_additions, after.total_additions);
assert_eq!(before.total_hits, after.total_hits);
}
#[tokio::test]
async fn empty_decoded_secret_is_rejected() {
clear_warned_secrets_for_testing();
let config = test_config_with_secret_hex("");
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
let rng = SecureRandom::new();
let peer: SocketAddr = "198.51.100.24:44323".parse().unwrap();
let handshake = make_valid_tls_handshake(&[], 0);
let result = handle_tls_handshake(
&handshake,
tokio::io::empty(),
tokio::io::sink(),
peer,
&config,
&replay_checker,
&rng,
None,
)
.await;
assert!(matches!(result, HandshakeResult::BadClient { .. }));
}
#[tokio::test]
async fn wrong_length_decoded_secret_is_rejected() {
clear_warned_secrets_for_testing();
let config = test_config_with_secret_hex("aa");
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
let rng = SecureRandom::new();
let peer: SocketAddr = "198.51.100.25:44324".parse().unwrap();
let handshake = make_valid_tls_handshake(&[0xaau8], 0);
let result = handle_tls_handshake(
&handshake,
tokio::io::empty(),
tokio::io::sink(),
peer,
&config,
&replay_checker,
&rng,
None,
)
.await;
assert!(matches!(result, HandshakeResult::BadClient { .. }));
}
#[tokio::test]
async fn invalid_mtproto_probe_does_not_pollute_replay_cache() {
let config = test_config_with_secret_hex("11111111111111111111111111111111");
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
let peer: SocketAddr = "198.51.100.26:44325".parse().unwrap();
let handshake = [0u8; HANDSHAKE_LEN];
let before = replay_checker.stats();
let result = handle_mtproto_handshake(
&handshake,
tokio::io::empty(),
tokio::io::sink(),
peer,
&config,
&replay_checker,
false,
None,
)
.await;
let after = replay_checker.stats();
assert!(matches!(result, HandshakeResult::BadClient { .. }));
assert_eq!(before.total_additions, after.total_additions);
assert_eq!(before.total_hits, after.total_hits);
}
#[tokio::test]
async fn mixed_secret_lengths_keep_valid_user_authenticating() {
clear_warned_secrets_for_testing();
clear_auth_probe_state_for_testing();
let good_secret = [0x22u8; 16];
let mut config = ProxyConfig::default();
config.access.users.clear();
config
.access
.users
.insert("broken_user".to_string(), "aa".to_string());
config
.access
.users
.insert("valid_user".to_string(), "22222222222222222222222222222222".to_string());
config.access.ignore_time_skew = true;
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
let rng = SecureRandom::new();
let peer: SocketAddr = "198.51.100.27:44326".parse().unwrap();
let handshake = make_valid_tls_handshake(&good_secret, 0);
let result = handle_tls_handshake(
&handshake,
tokio::io::empty(),
tokio::io::sink(),
peer,
&config,
&replay_checker,
&rng,
None,
)
.await;
assert!(matches!(result, HandshakeResult::Success(_)));
}
#[tokio::test]
async fn alpn_enforce_rejects_unsupported_client_alpn() {
let secret = [0x33u8; 16];
let mut config = test_config_with_secret_hex("33333333333333333333333333333333");
config.censorship.alpn_enforce = true;
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
let rng = SecureRandom::new();
let peer: SocketAddr = "198.51.100.28:44327".parse().unwrap();
let handshake = make_valid_tls_client_hello_with_alpn(&secret, 0, &[b"h3"]);
let result = handle_tls_handshake(
&handshake,
tokio::io::empty(),
tokio::io::sink(),
peer,
&config,
&replay_checker,
&rng,
None,
)
.await;
assert!(matches!(result, HandshakeResult::BadClient { .. }));
}
#[tokio::test]
async fn alpn_enforce_accepts_h2() {
let secret = [0x44u8; 16];
let mut config = test_config_with_secret_hex("44444444444444444444444444444444");
config.censorship.alpn_enforce = true;
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
let rng = SecureRandom::new();
let peer: SocketAddr = "198.51.100.29:44328".parse().unwrap();
let handshake = make_valid_tls_client_hello_with_alpn(&secret, 0, &[b"h2", b"h3"]);
let result = handle_tls_handshake(
&handshake,
tokio::io::empty(),
tokio::io::sink(),
peer,
&config,
&replay_checker,
&rng,
None,
)
.await;
assert!(matches!(result, HandshakeResult::Success(_)));
}
#[tokio::test]
async fn malformed_tls_classes_complete_within_bounded_time() {
let secret = [0x55u8; 16];
let mut config = test_config_with_secret_hex("55555555555555555555555555555555");
config.censorship.alpn_enforce = true;
let replay_checker = ReplayChecker::new(512, Duration::from_secs(60));
let rng = SecureRandom::new();
let peer: SocketAddr = "198.51.100.30:44329".parse().unwrap();
let too_short = vec![0x16, 0x03, 0x01];
let mut bad_hmac = make_valid_tls_handshake(&secret, 0);
bad_hmac[tls::TLS_DIGEST_POS] ^= 0x01;
let alpn_mismatch = make_valid_tls_client_hello_with_alpn(&secret, 0, &[b"h3"]);
for probe in [too_short, bad_hmac, alpn_mismatch] {
let result = tokio::time::timeout(
Duration::from_millis(200),
handle_tls_handshake(
&probe,
tokio::io::empty(),
tokio::io::sink(),
peer,
&config,
&replay_checker,
&rng,
None,
),
)
.await
.expect("Malformed TLS classes must be rejected within bounded time");
assert!(matches!(result, HandshakeResult::BadClient { .. }));
}
}
#[tokio::test]
#[ignore = "timing-sensitive; run manually on low-jitter hosts"]
async fn malformed_tls_classes_share_close_latency_buckets() {
const ITER: usize = 24;
const BUCKET_MS: u128 = 10;
let secret = [0x99u8; 16];
let mut config = test_config_with_secret_hex("99999999999999999999999999999999");
config.censorship.alpn_enforce = true;
let replay_checker = ReplayChecker::new(4096, Duration::from_secs(60));
let rng = SecureRandom::new();
let peer: SocketAddr = "198.51.100.31:44330".parse().unwrap();
let too_short = vec![0x16, 0x03, 0x01];
let mut bad_hmac = make_valid_tls_handshake(&secret, 0);
bad_hmac[tls::TLS_DIGEST_POS + 1] ^= 0x01;
let alpn_mismatch = make_valid_tls_client_hello_with_alpn(&secret, 0, &[b"h3"]);
let mut class_means_ms = Vec::new();
for probe in [too_short, bad_hmac, alpn_mismatch] {
let mut sum_micros: u128 = 0;
for _ in 0..ITER {
let started = Instant::now();
let result = handle_tls_handshake(
&probe,
tokio::io::empty(),
tokio::io::sink(),
peer,
&config,
&replay_checker,
&rng,
None,
)
.await;
let elapsed = started.elapsed();
assert!(matches!(result, HandshakeResult::BadClient { .. }));
sum_micros += elapsed.as_micros();
}
class_means_ms.push(sum_micros / ITER as u128 / 1_000);
}
let min_bucket = class_means_ms
.iter()
.map(|ms| ms / BUCKET_MS)
.min()
.unwrap();
let max_bucket = class_means_ms
.iter()
.map(|ms| ms / BUCKET_MS)
.max()
.unwrap();
assert!(
max_bucket <= min_bucket + 1,
"Malformed TLS classes diverged across latency buckets: means_ms={:?}",
class_means_ms
);
}
#[test]
fn secure_tag_requires_tls_mode_on_tls_transport() {
let mut config = ProxyConfig::default();
config.general.modes.classic = false;
config.general.modes.secure = true;
config.general.modes.tls = false;
assert!(
!mode_enabled_for_proto(&config, ProtoTag::Secure, true),
"Secure tag over TLS must be rejected when tls mode is disabled"
);
config.general.modes.tls = true;
assert!(
mode_enabled_for_proto(&config, ProtoTag::Secure, true),
"Secure tag over TLS must be accepted when tls mode is enabled"
);
}
#[test]
fn secure_tag_requires_secure_mode_on_direct_transport() {
let mut config = ProxyConfig::default();
config.general.modes.classic = false;
config.general.modes.secure = false;
config.general.modes.tls = true;
assert!(
!mode_enabled_for_proto(&config, ProtoTag::Secure, false),
"Secure tag without TLS must be rejected when secure mode is disabled"
);
config.general.modes.secure = true;
assert!(
mode_enabled_for_proto(&config, ProtoTag::Secure, false),
"Secure tag without TLS must be accepted when secure mode is enabled"
);
}
#[test]
fn mode_policy_matrix_is_stable_for_all_tag_transport_mode_combinations() {
let tags = [ProtoTag::Secure, ProtoTag::Intermediate, ProtoTag::Abridged];
for classic in [false, true] {
for secure in [false, true] {
for tls in [false, true] {
let mut config = ProxyConfig::default();
config.general.modes.classic = classic;
config.general.modes.secure = secure;
config.general.modes.tls = tls;
for is_tls in [false, true] {
for tag in tags {
let expected = match (tag, is_tls) {
(ProtoTag::Secure, true) => tls,
(ProtoTag::Secure, false) => secure,
(ProtoTag::Intermediate | ProtoTag::Abridged, _) => classic,
};
assert_eq!(
mode_enabled_for_proto(&config, tag, is_tls),
expected,
"mode policy drifted for tag={:?}, transport_tls={}, modes=(classic={}, secure={}, tls={})",
tag,
is_tls,
classic,
secure,
tls
);
}
}
}
}
}
}
#[test]
fn invalid_secret_warning_keys_do_not_collide_on_colon_boundaries() {
clear_warned_secrets_for_testing();
warn_invalid_secret_once("a:b", "c", ACCESS_SECRET_BYTES, Some(1));
warn_invalid_secret_once("a", "b:c", ACCESS_SECRET_BYTES, Some(2));
let warned = INVALID_SECRET_WARNED
.get()
.expect("warned set must be initialized");
let guard = warned.lock().expect("warned set lock must be available");
assert_eq!(
guard.len(),
2,
"(name, reason) pairs that stringify to the same colon-joined key must remain distinct"
);
}
#[tokio::test]
async fn repeated_invalid_tls_probes_trigger_pre_auth_throttle() {
let _guard = auth_probe_test_lock()
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
clear_auth_probe_state_for_testing();
let config = test_config_with_secret_hex("11111111111111111111111111111111");
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
let rng = SecureRandom::new();
let peer: SocketAddr = "198.51.100.61:44361".parse().unwrap();
let mut invalid = vec![0x42u8; tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN + 1 + 32];
invalid[tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN] = 32;
for _ in 0..AUTH_PROBE_BACKOFF_START_FAILS {
let result = handle_tls_handshake(
&invalid,
tokio::io::empty(),
tokio::io::sink(),
peer,
&config,
&replay_checker,
&rng,
None,
)
.await;
assert!(matches!(result, HandshakeResult::BadClient { .. }));
}
assert!(
auth_probe_is_throttled_for_testing(peer.ip()),
"invalid probe burst must activate per-IP pre-auth throttle"
);
}
#[tokio::test]
async fn successful_tls_handshake_clears_pre_auth_failure_streak() {
let _guard = auth_probe_test_lock()
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
clear_auth_probe_state_for_testing();
let secret = [0x23u8; 16];
let config = test_config_with_secret_hex("23232323232323232323232323232323");
let replay_checker = ReplayChecker::new(256, Duration::from_secs(60));
let rng = SecureRandom::new();
let peer: SocketAddr = "198.51.100.62:44362".parse().unwrap();
let mut invalid = vec![0x42u8; tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN + 1 + 32];
invalid[tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN] = 32;
for expected in 1..AUTH_PROBE_BACKOFF_START_FAILS {
let result = handle_tls_handshake(
&invalid,
tokio::io::empty(),
tokio::io::sink(),
peer,
&config,
&replay_checker,
&rng,
None,
)
.await;
assert!(matches!(result, HandshakeResult::BadClient { .. }));
assert_eq!(
auth_probe_fail_streak_for_testing(peer.ip()),
Some(expected),
"failure streak must grow before a successful authentication"
);
}
let valid = make_valid_tls_handshake(&secret, 0);
let success = handle_tls_handshake(
&valid,
tokio::io::empty(),
tokio::io::sink(),
peer,
&config,
&replay_checker,
&rng,
None,
)
.await;
assert!(matches!(success, HandshakeResult::Success(_)));
assert_eq!(
auth_probe_fail_streak_for_testing(peer.ip()),
None,
"successful authentication must clear accumulated pre-auth failures"
);
}
#[test]
fn auth_probe_capacity_prunes_stale_entries_for_new_ips() {
let state = DashMap::new();
let now = Instant::now();
let stale_seen = now - Duration::from_secs(AUTH_PROBE_TRACK_RETENTION_SECS + 1);
for idx in 0..AUTH_PROBE_TRACK_MAX_ENTRIES {
let ip = IpAddr::V4(Ipv4Addr::new(
10,
1,
((idx >> 8) & 0xff) as u8,
(idx & 0xff) as u8,
));
state.insert(
ip,
AuthProbeState {
fail_streak: 1,
blocked_until: now,
last_seen: stale_seen,
},
);
}
let newcomer = IpAddr::V4(Ipv4Addr::new(198, 51, 100, 200));
auth_probe_record_failure_with_state(&state, newcomer, now);
assert_eq!(
state.get(&newcomer).map(|entry| entry.fail_streak),
Some(1),
"stale-entry pruning must admit and track a new probe source"
);
assert!(
state.len() <= AUTH_PROBE_TRACK_MAX_ENTRIES,
"auth probe map must remain bounded after stale pruning"
);
}
#[test]
fn auth_probe_capacity_stays_fail_closed_when_map_is_fresh_and_full() {
let state = DashMap::new();
let now = Instant::now();
for idx in 0..AUTH_PROBE_TRACK_MAX_ENTRIES {
let ip = IpAddr::V4(Ipv4Addr::new(
172,
16,
((idx >> 8) & 0xff) as u8,
(idx & 0xff) as u8,
));
state.insert(
ip,
AuthProbeState {
fail_streak: 1,
blocked_until: now,
last_seen: now,
},
);
}
let newcomer = IpAddr::V4(Ipv4Addr::new(203, 0, 113, 55));
auth_probe_record_failure_with_state(&state, newcomer, now);
assert!(
state.get(&newcomer).is_none(),
"when all entries are fresh and full, new probes must not be admitted"
);
assert_eq!(
state.len(),
AUTH_PROBE_TRACK_MAX_ENTRIES,
"auth probe map must stay at the configured cap"
);
}

View File

@ -14,12 +14,41 @@ use crate::network::dns_overrides::resolve_socket_addr;
use crate::stats::beobachten::BeobachtenStore;
use crate::transport::proxy_protocol::{ProxyProtocolV1Builder, ProxyProtocolV2Builder};
#[cfg(not(test))]
const MASK_TIMEOUT: Duration = Duration::from_secs(5);
#[cfg(test)]
const MASK_TIMEOUT: Duration = Duration::from_millis(50);
/// Maximum duration for the entire masking relay.
/// Limits resource consumption from slow-loris attacks and port scanners.
#[cfg(not(test))]
const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60);
#[cfg(test)]
const MASK_RELAY_TIMEOUT: Duration = Duration::from_millis(200);
const MASK_BUFFER_SIZE: usize = 8192;
async fn write_proxy_header_with_timeout<W>(mask_write: &mut W, header: &[u8]) -> bool
where
W: AsyncWrite + Unpin,
{
match timeout(MASK_TIMEOUT, mask_write.write_all(header)).await {
Ok(Ok(())) => true,
Ok(Err(_)) => false,
Err(_) => {
debug!("Timeout writing proxy protocol header to mask backend");
false
}
}
}
async fn consume_client_data_with_timeout<R>(reader: R)
where
R: AsyncRead + Unpin,
{
if timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader)).await.is_err() {
debug!("Timed out while consuming client data on masking fallback path");
}
}
/// Detect client type based on initial data
fn detect_client_type(data: &[u8]) -> &'static str {
// Check for HTTP request
@ -71,7 +100,7 @@ where
if !config.censorship.mask {
// Masking disabled, just consume data
consume_client_data(reader).await;
consume_client_data_with_timeout(reader).await;
return;
}
@ -107,7 +136,7 @@ where
}
};
if let Some(header) = proxy_header {
if mask_write.write_all(&header).await.is_err() {
if !write_proxy_header_with_timeout(&mut mask_write, &header).await {
return;
}
}
@ -117,11 +146,11 @@ where
}
Ok(Err(e)) => {
debug!(error = %e, "Failed to connect to mask unix socket");
consume_client_data(reader).await;
consume_client_data_with_timeout(reader).await;
}
Err(_) => {
debug!("Timeout connecting to mask unix socket");
consume_client_data(reader).await;
consume_client_data_with_timeout(reader).await;
}
}
return;
@ -166,7 +195,7 @@ where
let (mask_read, mut mask_write) = stream.into_split();
if let Some(header) = proxy_header {
if mask_write.write_all(&header).await.is_err() {
if !write_proxy_header_with_timeout(&mut mask_write, &header).await {
return;
}
}
@ -176,11 +205,11 @@ where
}
Ok(Err(e)) => {
debug!(error = %e, "Failed to connect to mask host");
consume_client_data(reader).await;
consume_client_data_with_timeout(reader).await;
}
Err(_) => {
debug!("Timeout connecting to mask host");
consume_client_data(reader).await;
consume_client_data_with_timeout(reader).await;
}
}
}
@ -194,55 +223,51 @@ async fn relay_to_mask<R, W, MR, MW>(
initial_data: &[u8],
)
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
MR: AsyncRead + Unpin + Send + 'static,
MW: AsyncWrite + Unpin + Send + 'static,
R: AsyncRead + Unpin + Send,
W: AsyncWrite + Unpin + Send,
MR: AsyncRead + Unpin + Send,
MW: AsyncWrite + Unpin + Send,
{
// Send initial data to mask host
if mask_write.write_all(initial_data).await.is_err() {
return;
}
if mask_write.flush().await.is_err() {
return;
}
// Relay traffic
let c2m = tokio::spawn(async move {
let mut buf = vec![0u8; MASK_BUFFER_SIZE];
loop {
match reader.read(&mut buf).await {
Ok(0) | Err(_) => {
let _ = mask_write.shutdown().await;
break;
}
Ok(n) => {
if mask_write.write_all(&buf[..n]).await.is_err() {
let mut client_buf = vec![0u8; MASK_BUFFER_SIZE];
let mut mask_buf = vec![0u8; MASK_BUFFER_SIZE];
loop {
tokio::select! {
client_read = reader.read(&mut client_buf) => {
match client_read {
Ok(0) | Err(_) => {
let _ = mask_write.shutdown().await;
break;
}
Ok(n) => {
if mask_write.write_all(&client_buf[..n]).await.is_err() {
break;
}
}
}
}
mask_read_res = mask_read.read(&mut mask_buf) => {
match mask_read_res {
Ok(0) | Err(_) => {
let _ = writer.shutdown().await;
break;
}
Ok(n) => {
if writer.write_all(&mask_buf[..n]).await.is_err() {
break;
}
}
}
}
}
});
let m2c = tokio::spawn(async move {
let mut buf = vec![0u8; MASK_BUFFER_SIZE];
loop {
match mask_read.read(&mut buf).await {
Ok(0) | Err(_) => {
let _ = writer.shutdown().await;
break;
}
Ok(n) => {
if writer.write_all(&buf[..n]).await.is_err() {
break;
}
}
}
}
});
// Wait for either to complete
tokio::select! {
_ = c2m => {}
_ = m2c => {}
}
}
@ -255,3 +280,7 @@ async fn consume_client_data<R: AsyncRead + Unpin>(mut reader: R) {
}
}
}
#[cfg(test)]
#[path = "masking_security_tests.rs"]
mod security_tests;

View File

@ -0,0 +1,550 @@
use super::*;
use crate::config::ProxyConfig;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{duplex, AsyncBufReadExt, BufReader};
use tokio::net::TcpListener;
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::time::{timeout, Duration};
#[tokio::test]
async fn bad_client_probe_is_forwarded_verbatim_to_mask_backend() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
let probe = b"GET / HTTP/1.1\r\nHost: front.example\r\n\r\n".to_vec();
let backend_reply = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK".to_vec();
let accept_task = tokio::spawn({
let probe = probe.clone();
let backend_reply = backend_reply.clone();
async move {
let (mut stream, _) = listener.accept().await.unwrap();
let mut received = vec![0u8; probe.len()];
stream.read_exact(&mut received).await.unwrap();
assert_eq!(received, probe);
stream.write_all(&backend_reply).await.unwrap();
}
});
let mut config = ProxyConfig::default();
config.general.beobachten = false;
config.censorship.mask = true;
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = backend_addr.port();
config.censorship.mask_unix_sock = None;
config.censorship.mask_proxy_protocol = 0;
let peer: SocketAddr = "203.0.113.10:42424".parse().unwrap();
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
let (client_reader, _client_writer) = duplex(256);
let (mut client_visible_reader, client_visible_writer) = duplex(2048);
let beobachten = BeobachtenStore::new();
handle_bad_client(
client_reader,
client_visible_writer,
&probe,
peer,
local_addr,
&config,
&beobachten,
)
.await;
let mut observed = vec![0u8; backend_reply.len()];
client_visible_reader.read_exact(&mut observed).await.unwrap();
assert_eq!(observed, backend_reply);
accept_task.await.unwrap();
}
#[tokio::test]
async fn tls_scanner_probe_keeps_http_like_fallback_surface() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
let probe = vec![0x16, 0x03, 0x01, 0x00, 0x10, 0x01, 0x02, 0x03, 0x04];
let backend_reply = b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n".to_vec();
let accept_task = tokio::spawn({
let probe = probe.clone();
let backend_reply = backend_reply.clone();
async move {
let (mut stream, _) = listener.accept().await.unwrap();
let mut received = vec![0u8; probe.len()];
stream.read_exact(&mut received).await.unwrap();
assert_eq!(received, probe);
stream.write_all(&backend_reply).await.unwrap();
}
});
let mut config = ProxyConfig::default();
config.general.beobachten = true;
config.general.beobachten_minutes = 1;
config.censorship.mask = true;
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = backend_addr.port();
config.censorship.mask_unix_sock = None;
config.censorship.mask_proxy_protocol = 0;
let peer: SocketAddr = "198.51.100.44:55221".parse().unwrap();
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
let (client_reader, _client_writer) = duplex(256);
let (mut client_visible_reader, client_visible_writer) = duplex(2048);
let beobachten = BeobachtenStore::new();
handle_bad_client(
client_reader,
client_visible_writer,
&probe,
peer,
local_addr,
&config,
&beobachten,
)
.await;
let mut observed = vec![0u8; backend_reply.len()];
client_visible_reader.read_exact(&mut observed).await.unwrap();
assert_eq!(observed, backend_reply);
let snapshot = beobachten.snapshot_text(Duration::from_secs(60));
assert!(snapshot.contains("[TLS-scanner]"));
assert!(snapshot.contains("198.51.100.44-1"));
accept_task.await.unwrap();
}
#[test]
fn detect_client_type_covers_ssh_port_scanner_and_unknown() {
assert_eq!(detect_client_type(b"SSH-2.0-OpenSSH_9.7"), "SSH");
assert_eq!(detect_client_type(b"\x01\x02\x03"), "port-scanner");
assert_eq!(detect_client_type(b"random-binary-payload"), "unknown");
}
#[test]
fn detect_client_type_len_boundary_9_vs_10_bytes() {
assert_eq!(detect_client_type(b"123456789"), "port-scanner");
assert_eq!(detect_client_type(b"1234567890"), "unknown");
}
#[tokio::test]
async fn beobachten_records_scanner_class_when_mask_is_disabled() {
let mut config = ProxyConfig::default();
config.general.beobachten = true;
config.general.beobachten_minutes = 1;
config.censorship.mask = false;
let peer: SocketAddr = "203.0.113.99:41234".parse().unwrap();
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
let initial = b"SSH-2.0-probe";
let (mut client_reader_side, client_reader) = duplex(256);
let (_client_visible_reader, client_visible_writer) = duplex(256);
let beobachten = BeobachtenStore::new();
let task = tokio::spawn(async move {
handle_bad_client(
client_reader,
client_visible_writer,
initial,
peer,
local_addr,
&config,
&beobachten,
)
.await;
beobachten
});
client_reader_side.write_all(b"noise").await.unwrap();
drop(client_reader_side);
let beobachten = timeout(Duration::from_secs(3), task).await.unwrap().unwrap();
let snapshot = beobachten.snapshot_text(Duration::from_secs(60));
assert!(snapshot.contains("[SSH]"));
assert!(snapshot.contains("203.0.113.99-1"));
}
#[tokio::test]
async fn backend_unavailable_falls_back_to_silent_consume() {
let temp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let unused_port = temp_listener.local_addr().unwrap().port();
drop(temp_listener);
let mut config = ProxyConfig::default();
config.general.beobachten = false;
config.censorship.mask = true;
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = unused_port;
config.censorship.mask_unix_sock = None;
config.censorship.mask_proxy_protocol = 0;
let peer: SocketAddr = "203.0.113.11:42425".parse().unwrap();
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
let probe = b"GET /probe HTTP/1.1\r\nHost: x\r\n\r\n";
let (mut client_reader_side, client_reader) = duplex(256);
let (mut client_visible_reader, client_visible_writer) = duplex(256);
let beobachten = BeobachtenStore::new();
let task = tokio::spawn(async move {
handle_bad_client(
client_reader,
client_visible_writer,
probe,
peer,
local_addr,
&config,
&beobachten,
)
.await;
});
client_reader_side.write_all(b"noise").await.unwrap();
drop(client_reader_side);
timeout(Duration::from_secs(3), task).await.unwrap().unwrap();
let mut buf = [0u8; 1];
let n = timeout(Duration::from_secs(1), client_visible_reader.read(&mut buf))
.await
.unwrap()
.unwrap();
assert_eq!(n, 0);
}
#[tokio::test]
async fn mask_disabled_consumes_client_data_without_response() {
let mut config = ProxyConfig::default();
config.general.beobachten = false;
config.censorship.mask = false;
let peer: SocketAddr = "198.51.100.12:45454".parse().unwrap();
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
let initial = b"scanner";
let (mut client_reader_side, client_reader) = duplex(256);
let (mut client_visible_reader, client_visible_writer) = duplex(256);
let beobachten = BeobachtenStore::new();
let task = tokio::spawn(async move {
handle_bad_client(
client_reader,
client_visible_writer,
initial,
peer,
local_addr,
&config,
&beobachten,
)
.await;
});
client_reader_side.write_all(b"untrusted payload").await.unwrap();
drop(client_reader_side);
timeout(Duration::from_secs(3), task).await.unwrap().unwrap();
let mut buf = [0u8; 1];
let n = timeout(Duration::from_secs(1), client_visible_reader.read(&mut buf))
.await
.unwrap()
.unwrap();
assert_eq!(n, 0);
}
#[tokio::test]
async fn proxy_protocol_v1_header_is_sent_before_probe() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
let probe = b"GET / HTTP/1.1\r\nHost: front.example\r\n\r\n".to_vec();
let backend_reply = b"HTTP/1.1 204 No Content\r\nContent-Length: 0\r\n\r\n".to_vec();
let accept_task = tokio::spawn({
let probe = probe.clone();
let backend_reply = backend_reply.clone();
async move {
let (stream, _) = listener.accept().await.unwrap();
let mut reader = BufReader::new(stream);
let mut header_line = Vec::new();
reader.read_until(b'\n', &mut header_line).await.unwrap();
let header_text = String::from_utf8(header_line.clone()).unwrap();
assert!(header_text.starts_with("PROXY TCP4 "));
assert!(header_text.ends_with("\r\n"));
let mut received_probe = vec![0u8; probe.len()];
reader.read_exact(&mut received_probe).await.unwrap();
assert_eq!(received_probe, probe);
let mut stream = reader.into_inner();
stream.write_all(&backend_reply).await.unwrap();
}
});
let mut config = ProxyConfig::default();
config.general.beobachten = false;
config.censorship.mask = true;
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = backend_addr.port();
config.censorship.mask_unix_sock = None;
config.censorship.mask_proxy_protocol = 1;
let peer: SocketAddr = "203.0.113.15:50001".parse().unwrap();
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
let (client_reader, _client_writer) = duplex(256);
let (mut client_visible_reader, client_visible_writer) = duplex(2048);
let beobachten = BeobachtenStore::new();
handle_bad_client(
client_reader,
client_visible_writer,
&probe,
peer,
local_addr,
&config,
&beobachten,
)
.await;
let mut observed = vec![0u8; backend_reply.len()];
client_visible_reader.read_exact(&mut observed).await.unwrap();
assert_eq!(observed, backend_reply);
accept_task.await.unwrap();
}
#[tokio::test]
async fn proxy_protocol_v2_header_is_sent_before_probe() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
let probe = b"GET / HTTP/1.1\r\nHost: front.example\r\n\r\n".to_vec();
let backend_reply = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec();
let accept_task = tokio::spawn({
let probe = probe.clone();
let backend_reply = backend_reply.clone();
async move {
let (mut stream, _) = listener.accept().await.unwrap();
let mut sig = [0u8; 12];
stream.read_exact(&mut sig).await.unwrap();
assert_eq!(&sig, b"\r\n\r\n\0\r\nQUIT\n");
let mut fixed = [0u8; 4];
stream.read_exact(&mut fixed).await.unwrap();
let addr_len = u16::from_be_bytes([fixed[2], fixed[3]]) as usize;
let mut addr_block = vec![0u8; addr_len];
stream.read_exact(&mut addr_block).await.unwrap();
let mut received_probe = vec![0u8; probe.len()];
stream.read_exact(&mut received_probe).await.unwrap();
assert_eq!(received_probe, probe);
stream.write_all(&backend_reply).await.unwrap();
}
});
let mut config = ProxyConfig::default();
config.general.beobachten = false;
config.censorship.mask = true;
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = backend_addr.port();
config.censorship.mask_unix_sock = None;
config.censorship.mask_proxy_protocol = 2;
let peer: SocketAddr = "203.0.113.18:50004".parse().unwrap();
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
let (client_reader, _client_writer) = duplex(256);
let (mut client_visible_reader, client_visible_writer) = duplex(2048);
let beobachten = BeobachtenStore::new();
handle_bad_client(
client_reader,
client_visible_writer,
&probe,
peer,
local_addr,
&config,
&beobachten,
)
.await;
let mut observed = vec![0u8; backend_reply.len()];
client_visible_reader.read_exact(&mut observed).await.unwrap();
assert_eq!(observed, backend_reply);
accept_task.await.unwrap();
}
#[tokio::test]
async fn proxy_protocol_v1_mixed_family_falls_back_to_unknown_header() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = listener.local_addr().unwrap();
let probe = b"GET /mix HTTP/1.1\r\nHost: front.example\r\n\r\n".to_vec();
let backend_reply = b"HTTP/1.1 204 No Content\r\nContent-Length: 0\r\n\r\n".to_vec();
let accept_task = tokio::spawn({
let probe = probe.clone();
let backend_reply = backend_reply.clone();
async move {
let (stream, _) = listener.accept().await.unwrap();
let mut reader = BufReader::new(stream);
let mut header_line = Vec::new();
reader.read_until(b'\n', &mut header_line).await.unwrap();
let header_text = String::from_utf8(header_line).unwrap();
assert_eq!(header_text, "PROXY UNKNOWN\r\n");
let mut received_probe = vec![0u8; probe.len()];
reader.read_exact(&mut received_probe).await.unwrap();
assert_eq!(received_probe, probe);
let mut stream = reader.into_inner();
stream.write_all(&backend_reply).await.unwrap();
}
});
let mut config = ProxyConfig::default();
config.general.beobachten = false;
config.censorship.mask = true;
config.censorship.mask_host = Some("127.0.0.1".to_string());
config.censorship.mask_port = backend_addr.port();
config.censorship.mask_unix_sock = None;
config.censorship.mask_proxy_protocol = 1;
let peer: SocketAddr = "203.0.113.20:50006".parse().unwrap();
let local_addr: SocketAddr = "[::1]:443".parse().unwrap();
let (client_reader, _client_writer) = duplex(256);
let (mut client_visible_reader, client_visible_writer) = duplex(2048);
let beobachten = BeobachtenStore::new();
handle_bad_client(
client_reader,
client_visible_writer,
&probe,
peer,
local_addr,
&config,
&beobachten,
)
.await;
let mut observed = vec![0u8; backend_reply.len()];
client_visible_reader.read_exact(&mut observed).await.unwrap();
assert_eq!(observed, backend_reply);
accept_task.await.unwrap();
}
#[cfg(unix)]
#[tokio::test]
async fn unix_socket_mask_path_forwards_probe_and_response() {
let sock_path = format!("/tmp/telemt-mask-test-{}-{}.sock", std::process::id(), rand::random::<u64>());
let _ = std::fs::remove_file(&sock_path);
let listener = UnixListener::bind(&sock_path).unwrap();
let probe = b"GET /unix HTTP/1.1\r\nHost: front.example\r\n\r\n".to_vec();
let backend_reply = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK".to_vec();
let accept_task = tokio::spawn({
let probe = probe.clone();
let backend_reply = backend_reply.clone();
async move {
let (mut stream, _) = listener.accept().await.unwrap();
let mut received = vec![0u8; probe.len()];
stream.read_exact(&mut received).await.unwrap();
assert_eq!(received, probe);
stream.write_all(&backend_reply).await.unwrap();
}
});
let mut config = ProxyConfig::default();
config.general.beobachten = false;
config.censorship.mask = true;
config.censorship.mask_unix_sock = Some(sock_path.clone());
config.censorship.mask_proxy_protocol = 0;
let peer: SocketAddr = "203.0.113.30:50010".parse().unwrap();
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
let (client_reader, _client_writer) = duplex(256);
let (mut client_visible_reader, client_visible_writer) = duplex(2048);
let beobachten = BeobachtenStore::new();
handle_bad_client(
client_reader,
client_visible_writer,
&probe,
peer,
local_addr,
&config,
&beobachten,
)
.await;
let mut observed = vec![0u8; backend_reply.len()];
client_visible_reader.read_exact(&mut observed).await.unwrap();
assert_eq!(observed, backend_reply);
accept_task.await.unwrap();
let _ = std::fs::remove_file(sock_path);
}
#[tokio::test]
async fn mask_disabled_slowloris_connection_is_closed_by_consume_timeout() {
let mut config = ProxyConfig::default();
config.general.beobachten = false;
config.censorship.mask = false;
let peer: SocketAddr = "198.51.100.33:45455".parse().unwrap();
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
let (_client_reader_side, client_reader) = duplex(256);
let (_client_visible_reader, client_visible_writer) = duplex(256);
let beobachten = BeobachtenStore::new();
let task = tokio::spawn(async move {
handle_bad_client(
client_reader,
client_visible_writer,
b"slowloris",
peer,
local_addr,
&config,
&beobachten,
)
.await;
});
timeout(Duration::from_secs(1), task).await.unwrap().unwrap();
}
struct PendingWriter;
impl tokio::io::AsyncWrite for PendingWriter {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Poll::Pending
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}
#[tokio::test]
async fn proxy_header_write_timeout_returns_false() {
let mut writer = PendingWriter;
let ok = write_proxy_header_with_timeout(&mut writer, b"PROXY UNKNOWN\r\n").await;
assert!(!ok, "Proxy header writes that never complete must time out");
}

View File

@ -1,14 +1,17 @@
use std::collections::HashMap;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
#[cfg(test)]
use std::sync::Mutex;
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use dashmap::DashMap;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::timeout;
use tracing::{debug, trace, warn};
use crate::config::ProxyConfig;
@ -30,13 +33,15 @@ enum C2MeCommand {
}
const DESYNC_DEDUP_WINDOW: Duration = Duration::from_secs(60);
const DESYNC_DEDUP_MAX_ENTRIES: usize = 65_536;
const DESYNC_DEDUP_PRUNE_SCAN_LIMIT: usize = 1024;
const DESYNC_ERROR_CLASS: &str = "frame_too_large_crypto_desync";
const C2ME_CHANNEL_CAPACITY_FALLBACK: usize = 128;
const C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS: usize = 64;
const C2ME_SENDER_FAIRNESS_BUDGET: usize = 32;
const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1;
const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
static DESYNC_DEDUP: OnceLock<Mutex<HashMap<u64, Instant>>> = OnceLock::new();
static DESYNC_DEDUP: OnceLock<DashMap<u64, Instant>> = OnceLock::new();
struct RelayForensicsState {
trace_id: u64,
@ -90,24 +95,46 @@ fn should_emit_full_desync(key: u64, all_full: bool, now: Instant) -> bool {
return true;
}
let dedup = DESYNC_DEDUP.get_or_init(|| Mutex::new(HashMap::new()));
let mut guard = dedup.lock().expect("desync dedup mutex poisoned");
guard.retain(|_, seen_at| now.duration_since(*seen_at) < DESYNC_DEDUP_WINDOW);
let dedup = DESYNC_DEDUP.get_or_init(DashMap::new);
match guard.get_mut(&key) {
Some(seen_at) => {
if now.duration_since(*seen_at) >= DESYNC_DEDUP_WINDOW {
*seen_at = now;
true
} else {
false
if let Some(mut seen_at) = dedup.get_mut(&key) {
if now.duration_since(*seen_at) >= DESYNC_DEDUP_WINDOW {
*seen_at = now;
return true;
}
return false;
}
if dedup.len() >= DESYNC_DEDUP_MAX_ENTRIES {
let mut stale_keys = Vec::new();
for entry in dedup.iter().take(DESYNC_DEDUP_PRUNE_SCAN_LIMIT) {
if now.duration_since(*entry.value()) >= DESYNC_DEDUP_WINDOW {
stale_keys.push(*entry.key());
}
}
None => {
guard.insert(key, now);
true
for stale_key in stale_keys {
dedup.remove(&stale_key);
}
if dedup.len() >= DESYNC_DEDUP_MAX_ENTRIES {
return false;
}
}
dedup.insert(key, now);
true
}
#[cfg(test)]
fn clear_desync_dedup_for_testing() {
if let Some(dedup) = DESYNC_DEDUP.get() {
dedup.clear();
}
}
#[cfg(test)]
fn desync_dedup_test_lock() -> &'static Mutex<()> {
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
TEST_LOCK.get_or_init(|| Mutex::new(()))
}
fn report_desync_frame_too_large(
@ -229,7 +256,7 @@ pub(crate) async fn handle_via_middle_proxy<R, W>(
me_pool: Arc<MePool>,
stats: Arc<Stats>,
config: Arc<ProxyConfig>,
_buffer_pool: Arc<BufferPool>,
buffer_pool: Arc<BufferPool>,
local_addr: SocketAddr,
rng: Arc<SecureRandom>,
mut route_rx: watch::Receiver<RouteCutoverState>,
@ -271,7 +298,6 @@ where
};
stats.increment_user_connects(&user);
stats.increment_user_curr_connects(&user);
stats.increment_current_connections_me();
if let Some(cutover) = affected_cutover_state(
@ -291,7 +317,6 @@ where
let _ = me_pool.send_close(conn_id).await;
me_pool.registry().unregister(conn_id).await;
stats.decrement_current_connections_me();
stats.decrement_user_curr_connects(&user);
return Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
}
@ -557,6 +582,8 @@ where
&mut crypto_reader,
proto_tag,
frame_limit,
Duration::from_secs(config.timeouts.client_handshake.max(1)),
&buffer_pool,
&forensics,
&mut frame_counter,
&stats,
@ -638,7 +665,6 @@ where
);
me_pool.registry().unregister(conn_id).await;
stats.decrement_current_connections_me();
stats.decrement_user_curr_connects(&user);
result
}
@ -646,6 +672,8 @@ async fn read_client_payload<R>(
client_reader: &mut CryptoReader<R>,
proto_tag: ProtoTag,
max_frame: usize,
frame_read_timeout: Duration,
buffer_pool: &Arc<BufferPool>,
forensics: &RelayForensicsState,
frame_counter: &mut u64,
stats: &Stats,
@ -653,23 +681,40 @@ async fn read_client_payload<R>(
where
R: AsyncRead + Unpin + Send + 'static,
{
async fn read_exact_with_timeout<R>(
client_reader: &mut CryptoReader<R>,
buf: &mut [u8],
frame_read_timeout: Duration,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
{
match timeout(frame_read_timeout, client_reader.read_exact(buf)).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(ProxyError::Io(e)),
Err(_) => Err(ProxyError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"middle-relay client frame read timeout",
))),
}
}
loop {
let (len, quickack, raw_len_bytes) = match proto_tag {
ProtoTag::Abridged => {
let mut first = [0u8; 1];
match client_reader.read_exact(&mut first).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(ProxyError::Io(e)),
match read_exact_with_timeout(client_reader, &mut first, frame_read_timeout).await {
Ok(()) => {}
Err(ProxyError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return Ok(None);
}
Err(e) => return Err(e),
}
let quickack = (first[0] & 0x80) != 0;
let len_words = if (first[0] & 0x7f) == 0x7f {
let mut ext = [0u8; 3];
client_reader
.read_exact(&mut ext)
.await
.map_err(ProxyError::Io)?;
read_exact_with_timeout(client_reader, &mut ext, frame_read_timeout).await?;
u32::from_le_bytes([ext[0], ext[1], ext[2], 0]) as usize
} else {
(first[0] & 0x7f) as usize
@ -682,10 +727,12 @@ where
}
ProtoTag::Intermediate | ProtoTag::Secure => {
let mut len_buf = [0u8; 4];
match client_reader.read_exact(&mut len_buf).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(ProxyError::Io(e)),
match read_exact_with_timeout(client_reader, &mut len_buf, frame_read_timeout).await {
Ok(()) => {}
Err(ProxyError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return Ok(None);
}
Err(e) => return Err(e),
}
let quickack = (len_buf[3] & 0x80) != 0;
(
@ -737,18 +784,25 @@ where
len
};
let mut payload = vec![0u8; len];
client_reader
.read_exact(&mut payload)
.await
.map_err(ProxyError::Io)?;
let chunk_cap = buffer_pool.buffer_size().max(1024);
let mut payload = BytesMut::with_capacity(len.min(chunk_cap));
let mut remaining = len;
while remaining > 0 {
let chunk_len = remaining.min(chunk_cap);
let mut chunk = buffer_pool.get();
chunk.resize(chunk_len, 0);
read_exact_with_timeout(client_reader, &mut chunk[..chunk_len], frame_read_timeout)
.await?;
payload.extend_from_slice(&chunk[..chunk_len]);
remaining -= chunk_len;
}
// Secure Intermediate: strip validated trailing padding bytes.
if proto_tag == ProtoTag::Secure {
payload.truncate(secure_payload_len);
}
*frame_counter += 1;
return Ok(Some((Bytes::from(payload), quickack)));
return Ok(Some((payload.freeze(), quickack)));
}
}
@ -940,82 +994,5 @@ where
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::{Duration as TokioDuration, timeout};
#[test]
fn should_yield_sender_only_on_budget_with_backlog() {
assert!(!should_yield_c2me_sender(0, true));
assert!(!should_yield_c2me_sender(C2ME_SENDER_FAIRNESS_BUDGET - 1, true));
assert!(!should_yield_c2me_sender(C2ME_SENDER_FAIRNESS_BUDGET, false));
assert!(should_yield_c2me_sender(C2ME_SENDER_FAIRNESS_BUDGET, true));
}
#[tokio::test]
async fn enqueue_c2me_command_uses_try_send_fast_path() {
let (tx, mut rx) = mpsc::channel::<C2MeCommand>(2);
enqueue_c2me_command(
&tx,
C2MeCommand::Data {
payload: Bytes::from_static(&[1, 2, 3]),
flags: 0,
},
)
.await
.unwrap();
let recv = timeout(TokioDuration::from_millis(50), rx.recv())
.await
.unwrap()
.unwrap();
match recv {
C2MeCommand::Data { payload, flags } => {
assert_eq!(payload.as_ref(), &[1, 2, 3]);
assert_eq!(flags, 0);
}
C2MeCommand::Close => panic!("unexpected close command"),
}
}
#[tokio::test]
async fn enqueue_c2me_command_falls_back_to_send_when_queue_is_full() {
let (tx, mut rx) = mpsc::channel::<C2MeCommand>(1);
tx.send(C2MeCommand::Data {
payload: Bytes::from_static(&[9]),
flags: 9,
})
.await
.unwrap();
let tx2 = tx.clone();
let producer = tokio::spawn(async move {
enqueue_c2me_command(
&tx2,
C2MeCommand::Data {
payload: Bytes::from_static(&[7, 7]),
flags: 7,
},
)
.await
.unwrap();
});
let _ = timeout(TokioDuration::from_millis(100), rx.recv())
.await
.unwrap();
producer.await.unwrap();
let recv = timeout(TokioDuration::from_millis(100), rx.recv())
.await
.unwrap()
.unwrap();
match recv {
C2MeCommand::Data { payload, flags } => {
assert_eq!(payload.as_ref(), &[7, 7]);
assert_eq!(flags, 7);
}
C2MeCommand::Close => panic!("unexpected close command"),
}
}
}
#[path = "middle_relay_security_tests.rs"]
mod security_tests;

View File

@ -0,0 +1,201 @@
use super::*;
use crate::crypto::AesCtr;
use crate::stats::Stats;
use crate::stream::{BufferPool, CryptoReader};
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use tokio::io::AsyncWriteExt;
use tokio::io::duplex;
use tokio::time::{Duration as TokioDuration, timeout};
#[test]
fn should_yield_sender_only_on_budget_with_backlog() {
assert!(!should_yield_c2me_sender(0, true));
assert!(!should_yield_c2me_sender(C2ME_SENDER_FAIRNESS_BUDGET - 1, true));
assert!(!should_yield_c2me_sender(C2ME_SENDER_FAIRNESS_BUDGET, false));
assert!(should_yield_c2me_sender(C2ME_SENDER_FAIRNESS_BUDGET, true));
}
#[tokio::test]
async fn enqueue_c2me_command_uses_try_send_fast_path() {
let (tx, mut rx) = mpsc::channel::<C2MeCommand>(2);
enqueue_c2me_command(
&tx,
C2MeCommand::Data {
payload: Bytes::from_static(&[1, 2, 3]),
flags: 0,
},
)
.await
.unwrap();
let recv = timeout(TokioDuration::from_millis(50), rx.recv())
.await
.unwrap()
.unwrap();
match recv {
C2MeCommand::Data { payload, flags } => {
assert_eq!(payload.as_ref(), &[1, 2, 3]);
assert_eq!(flags, 0);
}
C2MeCommand::Close => panic!("unexpected close command"),
}
}
#[tokio::test]
async fn enqueue_c2me_command_falls_back_to_send_when_queue_is_full() {
let (tx, mut rx) = mpsc::channel::<C2MeCommand>(1);
tx.send(C2MeCommand::Data {
payload: Bytes::from_static(&[9]),
flags: 9,
})
.await
.unwrap();
let tx2 = tx.clone();
let producer = tokio::spawn(async move {
enqueue_c2me_command(
&tx2,
C2MeCommand::Data {
payload: Bytes::from_static(&[7, 7]),
flags: 7,
},
)
.await
.unwrap();
});
let _ = timeout(TokioDuration::from_millis(100), rx.recv())
.await
.unwrap();
producer.await.unwrap();
let recv = timeout(TokioDuration::from_millis(100), rx.recv())
.await
.unwrap()
.unwrap();
match recv {
C2MeCommand::Data { payload, flags } => {
assert_eq!(payload.as_ref(), &[7, 7]);
assert_eq!(flags, 7);
}
C2MeCommand::Close => panic!("unexpected close command"),
}
}
#[test]
fn desync_dedup_cache_is_bounded() {
let _guard = desync_dedup_test_lock()
.lock()
.expect("desync dedup test lock must be available");
clear_desync_dedup_for_testing();
let now = Instant::now();
for key in 0..DESYNC_DEDUP_MAX_ENTRIES as u64 {
assert!(
should_emit_full_desync(key, false, now),
"unique keys up to cap must be tracked"
);
}
assert!(
!should_emit_full_desync(u64::MAX, false, now),
"new key above cap must be suppressed to bound memory"
);
assert!(
!should_emit_full_desync(7, false, now),
"already tracked key inside dedup window must stay suppressed"
);
}
fn make_forensics_state() -> RelayForensicsState {
RelayForensicsState {
trace_id: 1,
conn_id: 2,
user: "test-user".to_string(),
peer: "127.0.0.1:50000".parse::<SocketAddr>().unwrap(),
peer_hash: 3,
started_at: Instant::now(),
bytes_c2me: 0,
bytes_me2c: Arc::new(AtomicU64::new(0)),
desync_all_full: false,
}
}
fn make_crypto_reader(reader: tokio::io::DuplexStream) -> CryptoReader<tokio::io::DuplexStream> {
let key = [0u8; 32];
let iv = 0u128;
CryptoReader::new(reader, AesCtr::new(&key, iv))
}
fn encrypt_for_reader(plaintext: &[u8]) -> Vec<u8> {
let key = [0u8; 32];
let iv = 0u128;
let mut cipher = AesCtr::new(&key, iv);
cipher.encrypt(plaintext)
}
#[tokio::test]
async fn read_client_payload_times_out_on_header_stall() {
let _guard = desync_dedup_test_lock()
.lock()
.expect("middle relay test lock must be available");
let (reader, _writer) = duplex(1024);
let mut crypto_reader = make_crypto_reader(reader);
let buffer_pool = Arc::new(BufferPool::new());
let stats = Stats::new();
let forensics = make_forensics_state();
let mut frame_counter = 0;
let result = read_client_payload(
&mut crypto_reader,
ProtoTag::Intermediate,
1024,
TokioDuration::from_millis(25),
&buffer_pool,
&forensics,
&mut frame_counter,
&stats,
)
.await;
assert!(
matches!(result, Err(ProxyError::Io(ref e)) if e.kind() == std::io::ErrorKind::TimedOut),
"stalled header read must time out"
);
}
#[tokio::test]
async fn read_client_payload_times_out_on_payload_stall() {
let _guard = desync_dedup_test_lock()
.lock()
.expect("middle relay test lock must be available");
let (reader, mut writer) = duplex(1024);
let encrypted_len = encrypt_for_reader(&[8, 0, 0, 0]);
writer.write_all(&encrypted_len).await.unwrap();
let mut crypto_reader = make_crypto_reader(reader);
let buffer_pool = Arc::new(BufferPool::new());
let stats = Stats::new();
let forensics = make_forensics_state();
let mut frame_counter = 0;
let result = read_client_payload(
&mut crypto_reader,
ProtoTag::Intermediate,
1024,
TokioDuration::from_millis(25),
&buffer_pool,
&forensics,
&mut frame_counter,
&stats,
)
.await;
assert!(
matches!(result, Err(ProxyError::Io(ref e)) if e.kind() == std::io::ErrorKind::TimedOut),
"stalled payload body read must time out"
);
}

View File

@ -1256,6 +1256,33 @@ impl Stats {
Self::touch_user_stats(stats.value());
stats.curr_connects.fetch_add(1, Ordering::Relaxed);
}
pub fn try_acquire_user_curr_connects(&self, user: &str, limit: Option<u64>) -> bool {
if !self.telemetry_user_enabled() {
return true;
}
self.maybe_cleanup_user_stats();
let stats = self.user_stats.entry(user.to_string()).or_default();
Self::touch_user_stats(stats.value());
let counter = &stats.curr_connects;
let mut current = counter.load(Ordering::Relaxed);
loop {
if let Some(max) = limit && current >= max {
return false;
}
match counter.compare_exchange_weak(
current,
current.saturating_add(1),
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return true,
Err(actual) => current = actual,
}
}
}
pub fn decrement_user_curr_connects(&self, user: &str) {
self.maybe_cleanup_user_stats();

View File

@ -513,6 +513,7 @@ impl FrameCodecTrait for SecureCodec {
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
use tokio_util::codec::{FramedRead, FramedWrite};
use tokio::io::duplex;
use futures::{SinkExt, StreamExt};
@ -630,4 +631,31 @@ mod tests {
let result = codec.decode(&mut buf);
assert!(result.is_err());
}
#[test]
fn secure_codec_always_adds_padding_and_jitters_wire_length() {
let codec = SecureCodec::new(Arc::new(SecureRandom::new()));
let payload = Bytes::from_static(&[1, 2, 3, 4, 5, 6, 7, 8]);
let mut wire_lens = HashSet::new();
for _ in 0..64 {
let frame = Frame::new(payload.clone());
let mut out = BytesMut::new();
codec.encode(&frame, &mut out).unwrap();
assert!(out.len() >= 4 + payload.len() + 1);
let wire_len = u32::from_le_bytes([out[0], out[1], out[2], out[3]]) as usize;
assert!(
(payload.len() + 1..=payload.len() + 3).contains(&wire_len),
"Secure wire length must be payload+1..3, got {wire_len}"
);
assert_ne!(wire_len % 4, 0, "Secure wire length must be non-4-aligned");
wire_lens.insert(wire_len);
}
assert!(
wire_lens.len() >= 2,
"Secure padding should create observable wire-length jitter"
);
}
}