mirror of
https://github.com/telemt/telemt.git
synced 2026-04-17 10:34:11 +03:00
Implement idle timeout for masking relay and add corresponding tests
- Introduced `copy_with_idle_timeout` function to handle reading and writing with an idle timeout. - Updated the proxy masking logic to use the new idle timeout function. - Added tests to verify that idle relays are closed by the idle timeout before the global relay timeout. - Ensured that connect refusal paths respect the masking budget and that responses followed by silence are cut off by the idle timeout. - Added tests for adversarial scenarios where clients may attempt to drip-feed data beyond the idle timeout.
This commit is contained in:
@@ -36,6 +36,7 @@ const AUTH_PROBE_TRACK_MAX_ENTRIES: usize = 256;
|
||||
const AUTH_PROBE_TRACK_MAX_ENTRIES: usize = 65_536;
|
||||
const AUTH_PROBE_PRUNE_SCAN_LIMIT: usize = 1_024;
|
||||
const AUTH_PROBE_BACKOFF_START_FAILS: u32 = 4;
|
||||
const AUTH_PROBE_SATURATION_GRACE_FAILS: u32 = 2;
|
||||
|
||||
#[cfg(test)]
|
||||
const AUTH_PROBE_BACKOFF_BASE_MS: u64 = 1;
|
||||
@@ -54,12 +55,24 @@ struct AuthProbeState {
|
||||
last_seen: Instant,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct AuthProbeSaturationState {
|
||||
fail_streak: u32,
|
||||
blocked_until: Instant,
|
||||
last_seen: Instant,
|
||||
}
|
||||
|
||||
static AUTH_PROBE_STATE: OnceLock<DashMap<IpAddr, AuthProbeState>> = OnceLock::new();
|
||||
static AUTH_PROBE_SATURATION_STATE: OnceLock<Mutex<Option<AuthProbeSaturationState>>> = OnceLock::new();
|
||||
|
||||
fn auth_probe_state_map() -> &'static DashMap<IpAddr, AuthProbeState> {
|
||||
AUTH_PROBE_STATE.get_or_init(DashMap::new)
|
||||
}
|
||||
|
||||
fn auth_probe_saturation_state() -> &'static Mutex<Option<AuthProbeSaturationState>> {
|
||||
AUTH_PROBE_SATURATION_STATE.get_or_init(|| Mutex::new(None))
|
||||
}
|
||||
|
||||
fn normalize_auth_probe_ip(peer_ip: IpAddr) -> IpAddr {
|
||||
match peer_ip {
|
||||
IpAddr::V4(ip) => IpAddr::V4(ip),
|
||||
@@ -108,6 +121,83 @@ fn auth_probe_is_throttled(peer_ip: IpAddr, now: Instant) -> bool {
|
||||
now < entry.blocked_until
|
||||
}
|
||||
|
||||
fn auth_probe_saturation_grace_exhausted(peer_ip: IpAddr, now: Instant) -> bool {
|
||||
let peer_ip = normalize_auth_probe_ip(peer_ip);
|
||||
let state = auth_probe_state_map();
|
||||
let Some(entry) = state.get(&peer_ip) else {
|
||||
return false;
|
||||
};
|
||||
if auth_probe_state_expired(&entry, now) {
|
||||
drop(entry);
|
||||
state.remove(&peer_ip);
|
||||
return false;
|
||||
}
|
||||
|
||||
entry.fail_streak >= AUTH_PROBE_BACKOFF_START_FAILS + AUTH_PROBE_SATURATION_GRACE_FAILS
|
||||
}
|
||||
|
||||
fn auth_probe_should_apply_preauth_throttle(peer_ip: IpAddr, now: Instant) -> bool {
|
||||
if !auth_probe_is_throttled(peer_ip, now) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if !auth_probe_saturation_is_throttled(now) {
|
||||
return true;
|
||||
}
|
||||
|
||||
auth_probe_saturation_grace_exhausted(peer_ip, now)
|
||||
}
|
||||
|
||||
fn auth_probe_saturation_is_throttled(now: Instant) -> bool {
|
||||
let saturation = auth_probe_saturation_state();
|
||||
let mut guard = match saturation.lock() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => return false,
|
||||
};
|
||||
|
||||
let Some(state) = guard.as_mut() else {
|
||||
return false;
|
||||
};
|
||||
|
||||
if now.duration_since(state.last_seen) > Duration::from_secs(AUTH_PROBE_TRACK_RETENTION_SECS) {
|
||||
*guard = None;
|
||||
return false;
|
||||
}
|
||||
|
||||
if now < state.blocked_until {
|
||||
return true;
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
fn auth_probe_note_saturation(now: Instant) {
|
||||
let saturation = auth_probe_saturation_state();
|
||||
let mut guard = match saturation.lock() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
match guard.as_mut() {
|
||||
Some(state)
|
||||
if now.duration_since(state.last_seen)
|
||||
<= Duration::from_secs(AUTH_PROBE_TRACK_RETENTION_SECS) =>
|
||||
{
|
||||
state.fail_streak = state.fail_streak.saturating_add(1);
|
||||
state.last_seen = now;
|
||||
state.blocked_until = now + auth_probe_backoff(state.fail_streak);
|
||||
}
|
||||
_ => {
|
||||
let fail_streak = AUTH_PROBE_BACKOFF_START_FAILS;
|
||||
*guard = Some(AuthProbeSaturationState {
|
||||
fail_streak,
|
||||
blocked_until: now + auth_probe_backoff(fail_streak),
|
||||
last_seen: now,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn auth_probe_record_failure(peer_ip: IpAddr, now: Instant) {
|
||||
let peer_ip = normalize_auth_probe_ip(peer_ip);
|
||||
let state = auth_probe_state_map();
|
||||
@@ -157,11 +247,11 @@ fn auth_probe_record_failure_with_state(
|
||||
}
|
||||
if state.len() >= AUTH_PROBE_TRACK_MAX_ENTRIES {
|
||||
if eviction_candidates.is_empty() {
|
||||
auth_probe_note_saturation(now);
|
||||
return;
|
||||
}
|
||||
let idx = auth_probe_eviction_offset(peer_ip, now) % eviction_candidates.len();
|
||||
let evict_key = eviction_candidates[idx];
|
||||
state.remove(&evict_key);
|
||||
auth_probe_note_saturation(now);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,6 +276,11 @@ fn clear_auth_probe_state_for_testing() {
|
||||
if let Some(state) = AUTH_PROBE_STATE.get() {
|
||||
state.clear();
|
||||
}
|
||||
if let Some(saturation) = AUTH_PROBE_SATURATION_STATE.get()
|
||||
&& let Ok(mut guard) = saturation.lock()
|
||||
{
|
||||
*guard = None;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -200,6 +295,11 @@ fn auth_probe_is_throttled_for_testing(peer_ip: IpAddr) -> bool {
|
||||
auth_probe_is_throttled(peer_ip, Instant::now())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn auth_probe_saturation_is_throttled_for_testing() -> bool {
|
||||
auth_probe_saturation_is_throttled(Instant::now())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn auth_probe_test_lock() -> &'static Mutex<()> {
|
||||
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
|
||||
@@ -385,7 +485,8 @@ where
|
||||
{
|
||||
debug!(peer = %peer, handshake_len = handshake.len(), "Processing TLS handshake");
|
||||
|
||||
if auth_probe_is_throttled(peer.ip(), Instant::now()) {
|
||||
let throttle_now = Instant::now();
|
||||
if auth_probe_should_apply_preauth_throttle(peer.ip(), throttle_now) {
|
||||
maybe_apply_server_hello_delay(config).await;
|
||||
debug!(peer = %peer, "TLS handshake rejected by pre-auth probe throttle");
|
||||
return HandshakeResult::BadClient { reader, writer };
|
||||
@@ -554,7 +655,8 @@ where
|
||||
{
|
||||
trace!(peer = %peer, handshake = ?hex::encode(handshake), "MTProto handshake bytes");
|
||||
|
||||
if auth_probe_is_throttled(peer.ip(), Instant::now()) {
|
||||
let throttle_now = Instant::now();
|
||||
if auth_probe_should_apply_preauth_throttle(peer.ip(), throttle_now) {
|
||||
maybe_apply_server_hello_delay(config).await;
|
||||
debug!(peer = %peer, "MTProto handshake rejected by pre-auth probe throttle");
|
||||
return HandshakeResult::BadClient { reader, writer };
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -24,8 +24,36 @@ const MASK_TIMEOUT: Duration = Duration::from_millis(50);
|
||||
const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
#[cfg(test)]
|
||||
const MASK_RELAY_TIMEOUT: Duration = Duration::from_millis(200);
|
||||
#[cfg(not(test))]
|
||||
const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
#[cfg(test)]
|
||||
const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
|
||||
const MASK_BUFFER_SIZE: usize = 8192;
|
||||
|
||||
async fn copy_with_idle_timeout<R, W>(reader: &mut R, writer: &mut W)
|
||||
where
|
||||
R: AsyncRead + Unpin,
|
||||
W: AsyncWrite + Unpin,
|
||||
{
|
||||
let mut buf = vec![0u8; MASK_BUFFER_SIZE];
|
||||
loop {
|
||||
let read_res = timeout(MASK_RELAY_IDLE_TIMEOUT, reader.read(&mut buf)).await;
|
||||
let n = match read_res {
|
||||
Ok(Ok(n)) => n,
|
||||
Ok(Err(_)) | Err(_) => break,
|
||||
};
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
let write_res = timeout(MASK_RELAY_IDLE_TIMEOUT, writer.write_all(&buf[..n])).await;
|
||||
match write_res {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(_)) | Err(_) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_proxy_header_with_timeout<W>(mask_write: &mut W, header: &[u8]) -> bool
|
||||
where
|
||||
W: AsyncWrite + Unpin,
|
||||
@@ -264,11 +292,11 @@ where
|
||||
|
||||
let _ = tokio::join!(
|
||||
async {
|
||||
let _ = tokio::io::copy(&mut reader, &mut mask_write).await;
|
||||
copy_with_idle_timeout(&mut reader, &mut mask_write).await;
|
||||
let _ = mask_write.shutdown().await;
|
||||
},
|
||||
async {
|
||||
let _ = tokio::io::copy(&mut mask_read, &mut writer).await;
|
||||
copy_with_idle_timeout(&mut mask_read, &mut writer).await;
|
||||
let _ = writer.shutdown().await;
|
||||
}
|
||||
);
|
||||
|
||||
@@ -234,8 +234,9 @@ async fn backend_connect_refusal_waits_mask_connect_budget_before_fallback() {
|
||||
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
|
||||
let probe = b"GET /probe HTTP/1.1\r\nHost: x\r\n\r\n";
|
||||
|
||||
// Keep reader open so fallback path does not terminate immediately on EOF.
|
||||
let (_client_reader_side, client_reader) = duplex(256);
|
||||
// Close client reader immediately to force the refusal path to rely on masking budget timing.
|
||||
let (client_reader_side, client_reader) = duplex(256);
|
||||
drop(client_reader_side);
|
||||
let (_client_visible_reader, client_visible_writer) = duplex(256);
|
||||
let beobachten = BeobachtenStore::new();
|
||||
|
||||
@@ -890,6 +891,59 @@ async fn mask_disabled_slowloris_connection_is_closed_by_consume_timeout() {
|
||||
timeout(Duration::from_secs(1), task).await.unwrap().unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mask_enabled_idle_relay_is_closed_by_idle_timeout_before_global_relay_timeout() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let backend_addr = listener.local_addr().unwrap();
|
||||
let probe = b"GET /idle HTTP/1.1\r\nHost: front.example\r\n\r\n".to_vec();
|
||||
|
||||
let accept_task = tokio::spawn({
|
||||
let probe = probe.clone();
|
||||
async move {
|
||||
let (mut stream, _) = listener.accept().await.unwrap();
|
||||
let mut received = vec![0u8; probe.len()];
|
||||
stream.read_exact(&mut received).await.unwrap();
|
||||
assert_eq!(received, probe);
|
||||
sleep(Duration::from_millis(300)).await;
|
||||
}
|
||||
});
|
||||
|
||||
let mut config = ProxyConfig::default();
|
||||
config.general.beobachten = false;
|
||||
config.censorship.mask = true;
|
||||
config.censorship.mask_host = Some("127.0.0.1".to_string());
|
||||
config.censorship.mask_port = backend_addr.port();
|
||||
config.censorship.mask_unix_sock = None;
|
||||
config.censorship.mask_proxy_protocol = 0;
|
||||
|
||||
let peer: SocketAddr = "198.51.100.34:45456".parse().unwrap();
|
||||
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
|
||||
|
||||
let (_client_reader_side, client_reader) = duplex(512);
|
||||
let (_client_visible_reader, client_visible_writer) = duplex(512);
|
||||
let beobachten = BeobachtenStore::new();
|
||||
|
||||
let started = Instant::now();
|
||||
handle_bad_client(
|
||||
client_reader,
|
||||
client_visible_writer,
|
||||
&probe,
|
||||
peer,
|
||||
local_addr,
|
||||
&config,
|
||||
&beobachten,
|
||||
)
|
||||
.await;
|
||||
|
||||
let elapsed = started.elapsed();
|
||||
assert!(
|
||||
elapsed < Duration::from_millis(150),
|
||||
"idle unauth relay must terminate on idle timeout instead of waiting for full relay timeout"
|
||||
);
|
||||
|
||||
accept_task.await.unwrap();
|
||||
}
|
||||
|
||||
struct PendingWriter;
|
||||
|
||||
impl tokio::io::AsyncWrite for PendingWriter {
|
||||
@@ -1250,3 +1304,166 @@ async fn timing_matrix_masking_classes_under_controlled_inputs() {
|
||||
(reachable_mean as u128) / BUCKET_MS
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn backend_connect_refusal_completes_within_bounded_mask_budget() {
|
||||
let temp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let unused_port = temp_listener.local_addr().unwrap().port();
|
||||
drop(temp_listener);
|
||||
|
||||
let mut config = ProxyConfig::default();
|
||||
config.general.beobachten = false;
|
||||
config.censorship.mask = true;
|
||||
config.censorship.mask_host = Some("127.0.0.1".to_string());
|
||||
config.censorship.mask_port = unused_port;
|
||||
config.censorship.mask_unix_sock = None;
|
||||
config.censorship.mask_proxy_protocol = 0;
|
||||
|
||||
let peer: SocketAddr = "203.0.113.41:51001".parse().unwrap();
|
||||
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
|
||||
let probe = b"GET /bounded HTTP/1.1\r\nHost: x\r\n\r\n";
|
||||
|
||||
let (_client_reader_side, client_reader) = duplex(256);
|
||||
let (_client_visible_reader, client_visible_writer) = duplex(256);
|
||||
let beobachten = BeobachtenStore::new();
|
||||
|
||||
let started = Instant::now();
|
||||
handle_bad_client(
|
||||
client_reader,
|
||||
client_visible_writer,
|
||||
probe,
|
||||
peer,
|
||||
local_addr,
|
||||
&config,
|
||||
&beobachten,
|
||||
)
|
||||
.await;
|
||||
|
||||
let elapsed = started.elapsed();
|
||||
assert!(
|
||||
elapsed >= Duration::from_millis(45),
|
||||
"connect refusal path must respect minimum masking budget"
|
||||
);
|
||||
assert!(
|
||||
elapsed < Duration::from_millis(500),
|
||||
"connect refusal path must stay bounded and avoid unbounded stall"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reachable_backend_one_response_then_silence_is_cut_by_idle_timeout() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let backend_addr = listener.local_addr().unwrap();
|
||||
let probe = b"GET /oneshot HTTP/1.1\r\nHost: front.example\r\n\r\n".to_vec();
|
||||
let response = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK".to_vec();
|
||||
|
||||
let accept_task = tokio::spawn({
|
||||
let probe = probe.clone();
|
||||
let response = response.clone();
|
||||
async move {
|
||||
let (mut stream, _) = listener.accept().await.unwrap();
|
||||
let mut received = vec![0u8; probe.len()];
|
||||
stream.read_exact(&mut received).await.unwrap();
|
||||
assert_eq!(received, probe);
|
||||
stream.write_all(&response).await.unwrap();
|
||||
sleep(Duration::from_millis(300)).await;
|
||||
}
|
||||
});
|
||||
|
||||
let mut config = ProxyConfig::default();
|
||||
config.general.beobachten = false;
|
||||
config.censorship.mask = true;
|
||||
config.censorship.mask_host = Some("127.0.0.1".to_string());
|
||||
config.censorship.mask_port = backend_addr.port();
|
||||
config.censorship.mask_unix_sock = None;
|
||||
config.censorship.mask_proxy_protocol = 0;
|
||||
|
||||
let peer: SocketAddr = "203.0.113.42:51002".parse().unwrap();
|
||||
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
|
||||
|
||||
let (_client_reader_side, client_reader) = duplex(256);
|
||||
let (mut client_visible_reader, client_visible_writer) = duplex(512);
|
||||
let beobachten = BeobachtenStore::new();
|
||||
|
||||
let started = Instant::now();
|
||||
handle_bad_client(
|
||||
client_reader,
|
||||
client_visible_writer,
|
||||
&probe,
|
||||
peer,
|
||||
local_addr,
|
||||
&config,
|
||||
&beobachten,
|
||||
)
|
||||
.await;
|
||||
let elapsed = started.elapsed();
|
||||
|
||||
let mut observed = vec![0u8; response.len()];
|
||||
client_visible_reader.read_exact(&mut observed).await.unwrap();
|
||||
assert_eq!(observed, response);
|
||||
assert!(
|
||||
elapsed < Duration::from_millis(190),
|
||||
"idle backend silence after first response must be cut by relay idle timeout"
|
||||
);
|
||||
|
||||
accept_task.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn adversarial_client_drip_feed_longer_than_idle_timeout_is_cut_off() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let backend_addr = listener.local_addr().unwrap();
|
||||
let initial = b"GET /drip HTTP/1.1\r\nHost: front.example\r\n\r\n".to_vec();
|
||||
|
||||
let accept_task = tokio::spawn({
|
||||
let initial = initial.clone();
|
||||
async move {
|
||||
let (mut stream, _) = listener.accept().await.unwrap();
|
||||
let mut observed = vec![0u8; initial.len()];
|
||||
stream.read_exact(&mut observed).await.unwrap();
|
||||
assert_eq!(observed, initial);
|
||||
|
||||
let mut extra = [0u8; 1];
|
||||
let read_res = timeout(Duration::from_millis(220), stream.read_exact(&mut extra)).await;
|
||||
assert!(
|
||||
read_res.is_err() || read_res.unwrap().is_err(),
|
||||
"drip-fed post-probe byte arriving after idle timeout should not be forwarded"
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
let mut config = ProxyConfig::default();
|
||||
config.general.beobachten = false;
|
||||
config.censorship.mask = true;
|
||||
config.censorship.mask_host = Some("127.0.0.1".to_string());
|
||||
config.censorship.mask_port = backend_addr.port();
|
||||
config.censorship.mask_unix_sock = None;
|
||||
config.censorship.mask_proxy_protocol = 0;
|
||||
|
||||
let peer: SocketAddr = "203.0.113.43:51003".parse().unwrap();
|
||||
let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap();
|
||||
|
||||
let (mut client_writer_side, client_reader) = duplex(256);
|
||||
let (_client_visible_reader, client_visible_writer) = duplex(256);
|
||||
let beobachten = BeobachtenStore::new();
|
||||
|
||||
let relay_task = tokio::spawn(async move {
|
||||
handle_bad_client(
|
||||
client_reader,
|
||||
client_visible_writer,
|
||||
&initial,
|
||||
peer,
|
||||
local_addr,
|
||||
&config,
|
||||
&beobachten,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
sleep(Duration::from_millis(160)).await;
|
||||
let _ = client_writer_side.write_all(b"X").await;
|
||||
drop(client_writer_side);
|
||||
|
||||
timeout(Duration::from_secs(1), relay_task).await.unwrap().unwrap();
|
||||
accept_task.await.unwrap();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user