mirror of
https://github.com/telemt/telemt.git
synced 2026-04-15 17:44:11 +03:00
Merge pull request #693 from telemt/flow-timeouts
Configureable mask timeouts
This commit is contained in:
@@ -2498,6 +2498,8 @@ Note: This section also accepts the legacy alias `[server.admin_api]` (same sche
|
|||||||
| [`mask_shape_above_cap_blur`](#cfg-censorship-mask_shape_above_cap_blur) | `bool` | `false` |
|
| [`mask_shape_above_cap_blur`](#cfg-censorship-mask_shape_above_cap_blur) | `bool` | `false` |
|
||||||
| [`mask_shape_above_cap_blur_max_bytes`](#cfg-censorship-mask_shape_above_cap_blur_max_bytes) | `usize` | `512` |
|
| [`mask_shape_above_cap_blur_max_bytes`](#cfg-censorship-mask_shape_above_cap_blur_max_bytes) | `usize` | `512` |
|
||||||
| [`mask_relay_max_bytes`](#cfg-censorship-mask_relay_max_bytes) | `usize` | `5242880` |
|
| [`mask_relay_max_bytes`](#cfg-censorship-mask_relay_max_bytes) | `usize` | `5242880` |
|
||||||
|
| [`mask_relay_timeout_ms`](#cfg-censorship-mask_relay_timeout_ms) | `u64` | `60_000` |
|
||||||
|
| [`mask_relay_idle_timeout_ms`](#cfg-censorship-mask_relay_idle_timeout_ms) | `u64` | `5_000` |
|
||||||
| [`mask_classifier_prefetch_timeout_ms`](#cfg-censorship-mask_classifier_prefetch_timeout_ms) | `u64` | `5` |
|
| [`mask_classifier_prefetch_timeout_ms`](#cfg-censorship-mask_classifier_prefetch_timeout_ms) | `u64` | `5` |
|
||||||
| [`mask_timing_normalization_enabled`](#cfg-censorship-mask_timing_normalization_enabled) | `bool` | `false` |
|
| [`mask_timing_normalization_enabled`](#cfg-censorship-mask_timing_normalization_enabled) | `bool` | `false` |
|
||||||
| [`mask_timing_normalization_floor_ms`](#cfg-censorship-mask_timing_normalization_floor_ms) | `u64` | `0` |
|
| [`mask_timing_normalization_floor_ms`](#cfg-censorship-mask_timing_normalization_floor_ms) | `u64` | `0` |
|
||||||
@@ -2768,6 +2770,26 @@ Note: This section also accepts the legacy alias `[server.admin_api]` (same sche
|
|||||||
[censorship]
|
[censorship]
|
||||||
mask_relay_max_bytes = 5242880
|
mask_relay_max_bytes = 5242880
|
||||||
```
|
```
|
||||||
|
## "cfg-censorship-mask_relay_timeout_ms"
|
||||||
|
- `mask_relay_timeout_ms`
|
||||||
|
- **Constraints / validation**: Should be `>= mask_relay_idle_timeout_ms`.
|
||||||
|
- **Description**: Wall-clock cap for the full masking relay on non-MTProto fallback paths. Raise when the mask target is a long-lived service (e.g. WebSocket). Default: 60 000 ms (1 minute).
|
||||||
|
- **Example**:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[censorship]
|
||||||
|
mask_relay_timeout_ms = 60000
|
||||||
|
```
|
||||||
|
## "cfg-censorship-mask_relay_idle_timeout_ms"
|
||||||
|
- `mask_relay_idle_timeout_ms`
|
||||||
|
- **Constraints / validation**: Should be `<= mask_relay_timeout_ms`.
|
||||||
|
- **Description**: Per-read idle timeout on masking relay and drain paths. Limits resource consumption by slow-loris attacks and port scanners. A read call stalling beyond this value is treated as an abandoned connection. Default: 5 000 ms (5 s).
|
||||||
|
- **Example**:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[censorship]
|
||||||
|
mask_relay_idle_timeout_ms = 5000
|
||||||
|
```
|
||||||
## "cfg-censorship-mask_classifier_prefetch_timeout_ms"
|
## "cfg-censorship-mask_classifier_prefetch_timeout_ms"
|
||||||
- `mask_classifier_prefetch_timeout_ms`
|
- `mask_classifier_prefetch_timeout_ms`
|
||||||
- **Constraints / validation**: Must be within `[5, 50]` (milliseconds).
|
- **Constraints / validation**: Must be within `[5, 50]` (milliseconds).
|
||||||
|
|||||||
@@ -615,6 +615,26 @@ pub(crate) fn default_mask_relay_max_bytes() -> usize {
|
|||||||
32 * 1024
|
32 * 1024
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(test))]
|
||||||
|
pub(crate) fn default_mask_relay_timeout_ms() -> u64 {
|
||||||
|
60_000
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn default_mask_relay_timeout_ms() -> u64 {
|
||||||
|
200
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(test))]
|
||||||
|
pub(crate) fn default_mask_relay_idle_timeout_ms() -> u64 {
|
||||||
|
5_000
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn default_mask_relay_idle_timeout_ms() -> u64 {
|
||||||
|
100
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_mask_classifier_prefetch_timeout_ms() -> u64 {
|
pub(crate) fn default_mask_classifier_prefetch_timeout_ms() -> u64 {
|
||||||
5
|
5
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -611,6 +611,8 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
|
|||||||
|| old.censorship.mask_shape_above_cap_blur_max_bytes
|
|| old.censorship.mask_shape_above_cap_blur_max_bytes
|
||||||
!= new.censorship.mask_shape_above_cap_blur_max_bytes
|
!= new.censorship.mask_shape_above_cap_blur_max_bytes
|
||||||
|| old.censorship.mask_relay_max_bytes != new.censorship.mask_relay_max_bytes
|
|| old.censorship.mask_relay_max_bytes != new.censorship.mask_relay_max_bytes
|
||||||
|
|| old.censorship.mask_relay_timeout_ms != new.censorship.mask_relay_timeout_ms
|
||||||
|
|| old.censorship.mask_relay_idle_timeout_ms != new.censorship.mask_relay_idle_timeout_ms
|
||||||
|| old.censorship.mask_classifier_prefetch_timeout_ms
|
|| old.censorship.mask_classifier_prefetch_timeout_ms
|
||||||
!= new.censorship.mask_classifier_prefetch_timeout_ms
|
!= new.censorship.mask_classifier_prefetch_timeout_ms
|
||||||
|| old.censorship.mask_timing_normalization_enabled
|
|| old.censorship.mask_timing_normalization_enabled
|
||||||
|
|||||||
@@ -1710,6 +1710,19 @@ pub struct AntiCensorshipConfig {
|
|||||||
#[serde(default = "default_mask_relay_max_bytes")]
|
#[serde(default = "default_mask_relay_max_bytes")]
|
||||||
pub mask_relay_max_bytes: usize,
|
pub mask_relay_max_bytes: usize,
|
||||||
|
|
||||||
|
/// Wall-clock cap for the full masking relay on non-MTProto fallback paths.
|
||||||
|
/// Raise when the mask target is a long-lived service (e.g. WebSocket).
|
||||||
|
/// Default: 60 000 ms (60 s).
|
||||||
|
#[serde(default = "default_mask_relay_timeout_ms")]
|
||||||
|
pub mask_relay_timeout_ms: u64,
|
||||||
|
|
||||||
|
/// Per-read idle timeout on masking relay and drain paths.
|
||||||
|
/// Limits resource consumption by slow-loris attacks and port scanners.
|
||||||
|
/// A read call stalling beyond this is treated as an abandoned connection.
|
||||||
|
/// Default: 5 000 ms (5 s).
|
||||||
|
#[serde(default = "default_mask_relay_idle_timeout_ms")]
|
||||||
|
pub mask_relay_idle_timeout_ms: u64,
|
||||||
|
|
||||||
/// Prefetch timeout (ms) for extending fragmented masking classifier window.
|
/// Prefetch timeout (ms) for extending fragmented masking classifier window.
|
||||||
#[serde(default = "default_mask_classifier_prefetch_timeout_ms")]
|
#[serde(default = "default_mask_classifier_prefetch_timeout_ms")]
|
||||||
pub mask_classifier_prefetch_timeout_ms: u64,
|
pub mask_classifier_prefetch_timeout_ms: u64,
|
||||||
@@ -1755,6 +1768,8 @@ impl Default for AntiCensorshipConfig {
|
|||||||
mask_shape_above_cap_blur: default_mask_shape_above_cap_blur(),
|
mask_shape_above_cap_blur: default_mask_shape_above_cap_blur(),
|
||||||
mask_shape_above_cap_blur_max_bytes: default_mask_shape_above_cap_blur_max_bytes(),
|
mask_shape_above_cap_blur_max_bytes: default_mask_shape_above_cap_blur_max_bytes(),
|
||||||
mask_relay_max_bytes: default_mask_relay_max_bytes(),
|
mask_relay_max_bytes: default_mask_relay_max_bytes(),
|
||||||
|
mask_relay_timeout_ms: default_mask_relay_timeout_ms(),
|
||||||
|
mask_relay_idle_timeout_ms: default_mask_relay_idle_timeout_ms(),
|
||||||
mask_classifier_prefetch_timeout_ms: default_mask_classifier_prefetch_timeout_ms(),
|
mask_classifier_prefetch_timeout_ms: default_mask_classifier_prefetch_timeout_ms(),
|
||||||
mask_timing_normalization_enabled: default_mask_timing_normalization_enabled(),
|
mask_timing_normalization_enabled: default_mask_timing_normalization_enabled(),
|
||||||
mask_timing_normalization_floor_ms: default_mask_timing_normalization_floor_ms(),
|
mask_timing_normalization_floor_ms: default_mask_timing_normalization_floor_ms(),
|
||||||
|
|||||||
@@ -28,14 +28,10 @@ use tracing::debug;
|
|||||||
const MASK_TIMEOUT: Duration = Duration::from_secs(5);
|
const MASK_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
const MASK_TIMEOUT: Duration = Duration::from_millis(50);
|
const MASK_TIMEOUT: Duration = Duration::from_millis(50);
|
||||||
/// Maximum duration for the entire masking relay.
|
/// Maximum duration for the entire masking relay under test (replaced by config at runtime).
|
||||||
/// Limits resource consumption from slow-loris attacks and port scanners.
|
|
||||||
#[cfg(not(test))]
|
|
||||||
const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60);
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
const MASK_RELAY_TIMEOUT: Duration = Duration::from_millis(200);
|
const MASK_RELAY_TIMEOUT: Duration = Duration::from_millis(200);
|
||||||
#[cfg(not(test))]
|
/// Per-read idle timeout for masking relay and drain paths under test (replaced by config at runtime).
|
||||||
const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_secs(5);
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
|
const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
|
||||||
const MASK_BUFFER_SIZE: usize = 8192;
|
const MASK_BUFFER_SIZE: usize = 8192;
|
||||||
@@ -55,6 +51,7 @@ async fn copy_with_idle_timeout<R, W>(
|
|||||||
writer: &mut W,
|
writer: &mut W,
|
||||||
byte_cap: usize,
|
byte_cap: usize,
|
||||||
shutdown_on_eof: bool,
|
shutdown_on_eof: bool,
|
||||||
|
idle_timeout: Duration,
|
||||||
) -> CopyOutcome
|
) -> CopyOutcome
|
||||||
where
|
where
|
||||||
R: AsyncRead + Unpin,
|
R: AsyncRead + Unpin,
|
||||||
@@ -78,7 +75,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
let read_len = remaining_budget.min(MASK_BUFFER_SIZE);
|
let read_len = remaining_budget.min(MASK_BUFFER_SIZE);
|
||||||
let read_res = timeout(MASK_RELAY_IDLE_TIMEOUT, reader.read(&mut buf[..read_len])).await;
|
let read_res = timeout(idle_timeout, reader.read(&mut buf[..read_len])).await;
|
||||||
let n = match read_res {
|
let n = match read_res {
|
||||||
Ok(Ok(n)) => n,
|
Ok(Ok(n)) => n,
|
||||||
Ok(Err(_)) | Err(_) => break,
|
Ok(Err(_)) | Err(_) => break,
|
||||||
@@ -86,13 +83,13 @@ where
|
|||||||
if n == 0 {
|
if n == 0 {
|
||||||
ended_by_eof = true;
|
ended_by_eof = true;
|
||||||
if shutdown_on_eof {
|
if shutdown_on_eof {
|
||||||
let _ = timeout(MASK_RELAY_IDLE_TIMEOUT, writer.shutdown()).await;
|
let _ = timeout(idle_timeout, writer.shutdown()).await;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
total = total.saturating_add(n);
|
total = total.saturating_add(n);
|
||||||
|
|
||||||
let write_res = timeout(MASK_RELAY_IDLE_TIMEOUT, writer.write_all(&buf[..n])).await;
|
let write_res = timeout(idle_timeout, writer.write_all(&buf[..n])).await;
|
||||||
match write_res {
|
match write_res {
|
||||||
Ok(Ok(())) => {}
|
Ok(Ok(())) => {}
|
||||||
Ok(Err(_)) | Err(_) => break,
|
Ok(Err(_)) | Err(_) => break,
|
||||||
@@ -230,11 +227,18 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn consume_client_data_with_timeout_and_cap<R>(reader: R, byte_cap: usize)
|
async fn consume_client_data_with_timeout_and_cap<R>(
|
||||||
where
|
reader: R,
|
||||||
|
byte_cap: usize,
|
||||||
|
relay_timeout: Duration,
|
||||||
|
idle_timeout: Duration,
|
||||||
|
) where
|
||||||
R: AsyncRead + Unpin,
|
R: AsyncRead + Unpin,
|
||||||
{
|
{
|
||||||
if timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader, byte_cap))
|
if timeout(
|
||||||
|
relay_timeout,
|
||||||
|
consume_client_data(reader, byte_cap, idle_timeout),
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.is_err()
|
.is_err()
|
||||||
{
|
{
|
||||||
@@ -639,9 +643,17 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
beobachten.record(client_type, peer.ip(), ttl);
|
beobachten.record(client_type, peer.ip(), ttl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let relay_timeout = Duration::from_millis(config.censorship.mask_relay_timeout_ms);
|
||||||
|
let idle_timeout = Duration::from_millis(config.censorship.mask_relay_idle_timeout_ms);
|
||||||
|
|
||||||
if !config.censorship.mask {
|
if !config.censorship.mask {
|
||||||
// Masking disabled, just consume data
|
// Masking disabled, just consume data
|
||||||
consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes)
|
consume_client_data_with_timeout_and_cap(
|
||||||
|
reader,
|
||||||
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
relay_timeout,
|
||||||
|
idle_timeout,
|
||||||
|
)
|
||||||
.await;
|
.await;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -674,7 +686,7 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if timeout(
|
if timeout(
|
||||||
MASK_RELAY_TIMEOUT,
|
relay_timeout,
|
||||||
relay_to_mask(
|
relay_to_mask(
|
||||||
reader,
|
reader,
|
||||||
writer,
|
writer,
|
||||||
@@ -688,6 +700,7 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
config.censorship.mask_shape_above_cap_blur_max_bytes,
|
config.censorship.mask_shape_above_cap_blur_max_bytes,
|
||||||
config.censorship.mask_shape_hardening_aggressive_mode,
|
config.censorship.mask_shape_hardening_aggressive_mode,
|
||||||
config.censorship.mask_relay_max_bytes,
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
idle_timeout,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -703,6 +716,8 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
consume_client_data_with_timeout_and_cap(
|
consume_client_data_with_timeout_and_cap(
|
||||||
reader,
|
reader,
|
||||||
config.censorship.mask_relay_max_bytes,
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
relay_timeout,
|
||||||
|
idle_timeout,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
wait_mask_outcome_budget(outcome_started, config).await;
|
wait_mask_outcome_budget(outcome_started, config).await;
|
||||||
@@ -712,6 +727,8 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
consume_client_data_with_timeout_and_cap(
|
consume_client_data_with_timeout_and_cap(
|
||||||
reader,
|
reader,
|
||||||
config.censorship.mask_relay_max_bytes,
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
relay_timeout,
|
||||||
|
idle_timeout,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
wait_mask_outcome_budget(outcome_started, config).await;
|
wait_mask_outcome_budget(outcome_started, config).await;
|
||||||
@@ -742,7 +759,12 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
local = %local_addr,
|
local = %local_addr,
|
||||||
"Mask target resolves to local listener; refusing self-referential masking fallback"
|
"Mask target resolves to local listener; refusing self-referential masking fallback"
|
||||||
);
|
);
|
||||||
consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes)
|
consume_client_data_with_timeout_and_cap(
|
||||||
|
reader,
|
||||||
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
relay_timeout,
|
||||||
|
idle_timeout,
|
||||||
|
)
|
||||||
.await;
|
.await;
|
||||||
wait_mask_outcome_budget(outcome_started, config).await;
|
wait_mask_outcome_budget(outcome_started, config).await;
|
||||||
return;
|
return;
|
||||||
@@ -777,7 +799,7 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if timeout(
|
if timeout(
|
||||||
MASK_RELAY_TIMEOUT,
|
relay_timeout,
|
||||||
relay_to_mask(
|
relay_to_mask(
|
||||||
reader,
|
reader,
|
||||||
writer,
|
writer,
|
||||||
@@ -791,6 +813,7 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
config.censorship.mask_shape_above_cap_blur_max_bytes,
|
config.censorship.mask_shape_above_cap_blur_max_bytes,
|
||||||
config.censorship.mask_shape_hardening_aggressive_mode,
|
config.censorship.mask_shape_hardening_aggressive_mode,
|
||||||
config.censorship.mask_relay_max_bytes,
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
idle_timeout,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -806,6 +829,8 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
consume_client_data_with_timeout_and_cap(
|
consume_client_data_with_timeout_and_cap(
|
||||||
reader,
|
reader,
|
||||||
config.censorship.mask_relay_max_bytes,
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
relay_timeout,
|
||||||
|
idle_timeout,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
wait_mask_outcome_budget(outcome_started, config).await;
|
wait_mask_outcome_budget(outcome_started, config).await;
|
||||||
@@ -815,6 +840,8 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
consume_client_data_with_timeout_and_cap(
|
consume_client_data_with_timeout_and_cap(
|
||||||
reader,
|
reader,
|
||||||
config.censorship.mask_relay_max_bytes,
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
relay_timeout,
|
||||||
|
idle_timeout,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
wait_mask_outcome_budget(outcome_started, config).await;
|
wait_mask_outcome_budget(outcome_started, config).await;
|
||||||
@@ -836,6 +863,7 @@ async fn relay_to_mask<R, W, MR, MW>(
|
|||||||
shape_above_cap_blur_max_bytes: usize,
|
shape_above_cap_blur_max_bytes: usize,
|
||||||
shape_hardening_aggressive_mode: bool,
|
shape_hardening_aggressive_mode: bool,
|
||||||
mask_relay_max_bytes: usize,
|
mask_relay_max_bytes: usize,
|
||||||
|
idle_timeout: Duration,
|
||||||
) where
|
) where
|
||||||
R: AsyncRead + Unpin + Send + 'static,
|
R: AsyncRead + Unpin + Send + 'static,
|
||||||
W: AsyncWrite + Unpin + Send + 'static,
|
W: AsyncWrite + Unpin + Send + 'static,
|
||||||
@@ -857,11 +885,19 @@ async fn relay_to_mask<R, W, MR, MW>(
|
|||||||
&mut mask_write,
|
&mut mask_write,
|
||||||
mask_relay_max_bytes,
|
mask_relay_max_bytes,
|
||||||
!shape_hardening_enabled,
|
!shape_hardening_enabled,
|
||||||
|
idle_timeout,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
},
|
},
|
||||||
async {
|
async {
|
||||||
copy_with_idle_timeout(&mut mask_read, &mut writer, mask_relay_max_bytes, true).await
|
copy_with_idle_timeout(
|
||||||
|
&mut mask_read,
|
||||||
|
&mut writer,
|
||||||
|
mask_relay_max_bytes,
|
||||||
|
true,
|
||||||
|
idle_timeout,
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -889,7 +925,11 @@ async fn relay_to_mask<R, W, MR, MW>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Just consume all data from client without responding.
|
/// Just consume all data from client without responding.
|
||||||
async fn consume_client_data<R: AsyncRead + Unpin>(mut reader: R, byte_cap: usize) {
|
async fn consume_client_data<R: AsyncRead + Unpin>(
|
||||||
|
mut reader: R,
|
||||||
|
byte_cap: usize,
|
||||||
|
idle_timeout: Duration,
|
||||||
|
) {
|
||||||
if byte_cap == 0 {
|
if byte_cap == 0 {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -905,7 +945,7 @@ async fn consume_client_data<R: AsyncRead + Unpin>(mut reader: R, byte_cap: usiz
|
|||||||
}
|
}
|
||||||
|
|
||||||
let read_len = remaining_budget.min(MASK_BUFFER_SIZE);
|
let read_len = remaining_budget.min(MASK_BUFFER_SIZE);
|
||||||
let n = match timeout(MASK_RELAY_IDLE_TIMEOUT, reader.read(&mut buf[..read_len])).await {
|
let n = match timeout(idle_timeout, reader.read(&mut buf[..read_len])).await {
|
||||||
Ok(Ok(n)) => n,
|
Ok(Ok(n)) => n,
|
||||||
Ok(Err(_)) | Err(_) => break,
|
Ok(Err(_)) | Err(_) => break,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ async fn consume_client_data_stops_after_byte_cap_without_eof() {
|
|||||||
};
|
};
|
||||||
let cap = 10_000usize;
|
let cap = 10_000usize;
|
||||||
|
|
||||||
consume_client_data(reader, cap).await;
|
consume_client_data(reader, cap, MASK_RELAY_IDLE_TIMEOUT).await;
|
||||||
|
|
||||||
let total = produced.load(Ordering::Relaxed);
|
let total = produced.load(Ordering::Relaxed);
|
||||||
assert!(
|
assert!(
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ async fn stalling_client_terminates_at_idle_not_relay_timeout() {
|
|||||||
|
|
||||||
let result = tokio::time::timeout(
|
let result = tokio::time::timeout(
|
||||||
MASK_RELAY_TIMEOUT,
|
MASK_RELAY_TIMEOUT,
|
||||||
consume_client_data(reader, MASK_BUFFER_SIZE * 4),
|
consume_client_data(reader, MASK_BUFFER_SIZE * 4, MASK_RELAY_IDLE_TIMEOUT),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -57,7 +57,10 @@ async fn fast_reader_drains_to_eof() {
|
|||||||
let data = vec![0xAAu8; 32 * 1024];
|
let data = vec![0xAAu8; 32 * 1024];
|
||||||
let reader = std::io::Cursor::new(data);
|
let reader = std::io::Cursor::new(data);
|
||||||
|
|
||||||
tokio::time::timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader, usize::MAX))
|
tokio::time::timeout(
|
||||||
|
MASK_RELAY_TIMEOUT,
|
||||||
|
consume_client_data(reader, usize::MAX, MASK_RELAY_IDLE_TIMEOUT),
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.expect("consume_client_data did not complete for fast EOF reader");
|
.expect("consume_client_data did not complete for fast EOF reader");
|
||||||
}
|
}
|
||||||
@@ -81,7 +84,7 @@ async fn io_error_terminates_cleanly() {
|
|||||||
|
|
||||||
tokio::time::timeout(
|
tokio::time::timeout(
|
||||||
MASK_RELAY_TIMEOUT,
|
MASK_RELAY_TIMEOUT,
|
||||||
consume_client_data(ErrReader, usize::MAX),
|
consume_client_data(ErrReader, usize::MAX, MASK_RELAY_IDLE_TIMEOUT),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.expect("consume_client_data did not return on I/O error");
|
.expect("consume_client_data did not return on I/O error");
|
||||||
|
|||||||
@@ -34,7 +34,11 @@ async fn consume_stall_stress_finishes_within_idle_budget() {
|
|||||||
set.spawn(async {
|
set.spawn(async {
|
||||||
tokio::time::timeout(
|
tokio::time::timeout(
|
||||||
MASK_RELAY_TIMEOUT,
|
MASK_RELAY_TIMEOUT,
|
||||||
consume_client_data(OneByteThenStall { sent: false }, usize::MAX),
|
consume_client_data(
|
||||||
|
OneByteThenStall { sent: false },
|
||||||
|
usize::MAX,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
|
),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.expect("consume_client_data exceeded relay timeout under stall load");
|
.expect("consume_client_data exceeded relay timeout under stall load");
|
||||||
@@ -56,7 +60,7 @@ async fn consume_stall_stress_finishes_within_idle_budget() {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn consume_zero_cap_returns_immediately() {
|
async fn consume_zero_cap_returns_immediately() {
|
||||||
let started = Instant::now();
|
let started = Instant::now();
|
||||||
consume_client_data(tokio::io::empty(), 0).await;
|
consume_client_data(tokio::io::empty(), 0, MASK_RELAY_IDLE_TIMEOUT).await;
|
||||||
assert!(
|
assert!(
|
||||||
started.elapsed() < MASK_RELAY_IDLE_TIMEOUT,
|
started.elapsed() < MASK_RELAY_IDLE_TIMEOUT,
|
||||||
"zero byte cap must return immediately"
|
"zero byte cap must return immediately"
|
||||||
|
|||||||
@@ -127,7 +127,14 @@ async fn positive_copy_with_production_cap_stops_exactly_at_budget() {
|
|||||||
let mut reader = FinitePatternReader::new(PROD_CAP_BYTES + (256 * 1024), 4096, read_calls);
|
let mut reader = FinitePatternReader::new(PROD_CAP_BYTES + (256 * 1024), 4096, read_calls);
|
||||||
let mut writer = CountingWriter::default();
|
let mut writer = CountingWriter::default();
|
||||||
|
|
||||||
let outcome = copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await;
|
let outcome = copy_with_idle_timeout(
|
||||||
|
&mut reader,
|
||||||
|
&mut writer,
|
||||||
|
PROD_CAP_BYTES,
|
||||||
|
true,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
outcome.total, PROD_CAP_BYTES,
|
outcome.total, PROD_CAP_BYTES,
|
||||||
@@ -145,7 +152,13 @@ async fn negative_consume_with_zero_cap_performs_no_reads() {
|
|||||||
let read_calls = Arc::new(AtomicUsize::new(0));
|
let read_calls = Arc::new(AtomicUsize::new(0));
|
||||||
let reader = FinitePatternReader::new(1024, 64, Arc::clone(&read_calls));
|
let reader = FinitePatternReader::new(1024, 64, Arc::clone(&read_calls));
|
||||||
|
|
||||||
consume_client_data_with_timeout_and_cap(reader, 0).await;
|
consume_client_data_with_timeout_and_cap(
|
||||||
|
reader,
|
||||||
|
0,
|
||||||
|
MASK_RELAY_TIMEOUT,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
read_calls.load(Ordering::Relaxed),
|
read_calls.load(Ordering::Relaxed),
|
||||||
@@ -161,7 +174,14 @@ async fn edge_copy_below_cap_reports_eof_without_overread() {
|
|||||||
let mut reader = FinitePatternReader::new(payload, 3072, read_calls);
|
let mut reader = FinitePatternReader::new(payload, 3072, read_calls);
|
||||||
let mut writer = CountingWriter::default();
|
let mut writer = CountingWriter::default();
|
||||||
|
|
||||||
let outcome = copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await;
|
let outcome = copy_with_idle_timeout(
|
||||||
|
&mut reader,
|
||||||
|
&mut writer,
|
||||||
|
PROD_CAP_BYTES,
|
||||||
|
true,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
assert_eq!(outcome.total, payload);
|
assert_eq!(outcome.total, payload);
|
||||||
assert_eq!(writer.written, payload);
|
assert_eq!(writer.written, payload);
|
||||||
@@ -175,7 +195,13 @@ async fn edge_copy_below_cap_reports_eof_without_overread() {
|
|||||||
async fn adversarial_blackhat_never_ready_reader_is_bounded_by_timeout_guards() {
|
async fn adversarial_blackhat_never_ready_reader_is_bounded_by_timeout_guards() {
|
||||||
let started = Instant::now();
|
let started = Instant::now();
|
||||||
|
|
||||||
consume_client_data_with_timeout_and_cap(NeverReadyReader, PROD_CAP_BYTES).await;
|
consume_client_data_with_timeout_and_cap(
|
||||||
|
NeverReadyReader,
|
||||||
|
PROD_CAP_BYTES,
|
||||||
|
MASK_RELAY_TIMEOUT,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
started.elapsed() < Duration::from_millis(350),
|
started.elapsed() < Duration::from_millis(350),
|
||||||
@@ -190,7 +216,12 @@ async fn integration_consume_path_honors_production_cap_for_large_payload() {
|
|||||||
|
|
||||||
let bounded = timeout(
|
let bounded = timeout(
|
||||||
Duration::from_millis(350),
|
Duration::from_millis(350),
|
||||||
consume_client_data_with_timeout_and_cap(reader, PROD_CAP_BYTES),
|
consume_client_data_with_timeout_and_cap(
|
||||||
|
reader,
|
||||||
|
PROD_CAP_BYTES,
|
||||||
|
MASK_RELAY_TIMEOUT,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
|
),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -206,7 +237,13 @@ async fn adversarial_consume_path_never_reads_beyond_declared_byte_cap() {
|
|||||||
let total_read = Arc::new(AtomicUsize::new(0));
|
let total_read = Arc::new(AtomicUsize::new(0));
|
||||||
let reader = BudgetProbeReader::new(256 * 1024, Arc::clone(&total_read));
|
let reader = BudgetProbeReader::new(256 * 1024, Arc::clone(&total_read));
|
||||||
|
|
||||||
consume_client_data_with_timeout_and_cap(reader, byte_cap).await;
|
consume_client_data_with_timeout_and_cap(
|
||||||
|
reader,
|
||||||
|
byte_cap,
|
||||||
|
MASK_RELAY_TIMEOUT,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
total_read.load(Ordering::Relaxed) <= byte_cap,
|
total_read.load(Ordering::Relaxed) <= byte_cap,
|
||||||
@@ -231,7 +268,9 @@ async fn light_fuzz_cap_and_payload_matrix_preserves_min_budget_invariant() {
|
|||||||
let mut reader = FinitePatternReader::new(payload, chunk, read_calls);
|
let mut reader = FinitePatternReader::new(payload, chunk, read_calls);
|
||||||
let mut writer = CountingWriter::default();
|
let mut writer = CountingWriter::default();
|
||||||
|
|
||||||
let outcome = copy_with_idle_timeout(&mut reader, &mut writer, cap, true).await;
|
let outcome =
|
||||||
|
copy_with_idle_timeout(&mut reader, &mut writer, cap, true, MASK_RELAY_IDLE_TIMEOUT)
|
||||||
|
.await;
|
||||||
let expected = payload.min(cap);
|
let expected = payload.min(cap);
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
@@ -261,7 +300,14 @@ async fn stress_parallel_copy_tasks_with_production_cap_complete_without_leaks()
|
|||||||
read_calls,
|
read_calls,
|
||||||
);
|
);
|
||||||
let mut writer = CountingWriter::default();
|
let mut writer = CountingWriter::default();
|
||||||
copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await
|
copy_with_idle_timeout(
|
||||||
|
&mut reader,
|
||||||
|
&mut writer,
|
||||||
|
PROD_CAP_BYTES,
|
||||||
|
true,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
|
)
|
||||||
|
.await
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ async fn relay_to_mask_enforces_masking_session_byte_cap() {
|
|||||||
0,
|
0,
|
||||||
false,
|
false,
|
||||||
32 * 1024,
|
32 * 1024,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
@@ -81,6 +82,7 @@ async fn relay_to_mask_propagates_client_half_close_without_waiting_for_other_di
|
|||||||
0,
|
0,
|
||||||
false,
|
false,
|
||||||
32 * 1024,
|
32 * 1024,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1377,6 +1377,7 @@ async fn relay_to_mask_keeps_backend_to_client_flow_when_client_to_backend_stall
|
|||||||
0,
|
0,
|
||||||
false,
|
false,
|
||||||
5 * 1024 * 1024,
|
5 * 1024 * 1024,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
@@ -1508,6 +1509,7 @@ async fn relay_to_mask_timeout_cancels_and_drops_all_io_endpoints() {
|
|||||||
0,
|
0,
|
||||||
false,
|
false,
|
||||||
5 * 1024 * 1024,
|
5 * 1024 * 1024,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|||||||
@@ -228,6 +228,7 @@ async fn relay_path_idle_timeout_eviction_remains_effective() {
|
|||||||
0,
|
0,
|
||||||
false,
|
false,
|
||||||
5 * 1024 * 1024,
|
5 * 1024 * 1024,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ async fn run_relay_case(
|
|||||||
above_cap_blur_max_bytes,
|
above_cap_blur_max_bytes,
|
||||||
false,
|
false,
|
||||||
5 * 1024 * 1024,
|
5 * 1024 * 1024,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -89,6 +89,7 @@ async fn relay_to_mask_applies_cap_clamped_padding_for_non_power_of_two_cap() {
|
|||||||
0,
|
0,
|
||||||
false,
|
false,
|
||||||
5 * 1024 * 1024,
|
5 * 1024 * 1024,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user