mirror of
https://github.com/telemt/telemt.git
synced 2026-04-16 01:54:11 +03:00
Merge branch 'flow' into fix/windows-run-inner-issue-690
This commit is contained in:
@@ -2268,37 +2268,40 @@ Note: This section also accepts the legacy alias `[server.admin_api]` (same sche
|
||||
|
||||
| Key | Type | Default |
|
||||
| --- | ---- | ------- |
|
||||
| [`tls_domain`](#tls_domain) | `String` | `"petrovich.ru"` |
|
||||
| [`tls_domains`](#tls_domains) | `String[]` | `[]` |
|
||||
| [`unknown_sni_action`](#unknown_sni_action) | `"drop"`, `"mask"`, `"accept"` | `"drop"` |
|
||||
| [`tls_fetch_scope`](#tls_fetch_scope) | `String` | `""` |
|
||||
| [`tls_fetch`](#tls_fetch) | `Table` | built-in defaults |
|
||||
| [`mask`](#mask) | `bool` | `true` |
|
||||
| [`mask_host`](#mask_host) | `String` | — |
|
||||
| [`mask_port`](#mask_port) | `u16` | `443` |
|
||||
| [`mask_unix_sock`](#mask_unix_sock) | `String` | — |
|
||||
| [`fake_cert_len`](#fake_cert_len) | `usize` | `2048` |
|
||||
| [`tls_emulation`](#tls_emulation) | `bool` | `true` |
|
||||
| [`tls_front_dir`](#tls_front_dir) | `String` | `"tlsfront"` |
|
||||
| [`server_hello_delay_min_ms`](#server_hello_delay_min_ms) | `u64` | `0` |
|
||||
| [`server_hello_delay_max_ms`](#server_hello_delay_max_ms) | `u64` | `0` |
|
||||
| [`tls_new_session_tickets`](#tls_new_session_tickets) | `u8` | `0` |
|
||||
| [`tls_full_cert_ttl_secs`](#tls_full_cert_ttl_secs) | `u64` | `90` |
|
||||
| [`alpn_enforce`](#alpn_enforce) | `bool` | `true` |
|
||||
| [`mask_proxy_protocol`](#mask_proxy_protocol) | `u8` | `0` |
|
||||
| [`mask_shape_hardening`](#mask_shape_hardening) | `bool` | `true` |
|
||||
| [`mask_shape_hardening_aggressive_mode`](#mask_shape_hardening_aggressive_mode) | `bool` | `false` |
|
||||
| [`mask_shape_bucket_floor_bytes`](#mask_shape_bucket_floor_bytes) | `usize` | `512` |
|
||||
| [`mask_shape_bucket_cap_bytes`](#mask_shape_bucket_cap_bytes) | `usize` | `4096` |
|
||||
| [`mask_shape_above_cap_blur`](#mask_shape_above_cap_blur) | `bool` | `false` |
|
||||
| [`mask_shape_above_cap_blur_max_bytes`](#mask_shape_above_cap_blur_max_bytes) | `usize` | `512` |
|
||||
| [`mask_relay_max_bytes`](#mask_relay_max_bytes) | `usize` | `5242880` |
|
||||
| [`mask_classifier_prefetch_timeout_ms`](#mask_classifier_prefetch_timeout_ms) | `u64` | `5` |
|
||||
| [`mask_timing_normalization_enabled`](#mask_timing_normalization_enabled) | `bool` | `false` |
|
||||
| [`mask_timing_normalization_floor_ms`](#mask_timing_normalization_floor_ms) | `u64` | `0` |
|
||||
| [`mask_timing_normalization_ceiling_ms`](#mask_timing_normalization_ceiling_ms) | `u64` | `0` |
|
||||
| [`tls_domain`](#cfg-censorship-tls_domain) | `String` | `"petrovich.ru"` |
|
||||
| [`tls_domains`](#cfg-censorship-tls_domains) | `String[]` | `[]` |
|
||||
| [`unknown_sni_action`](#cfg-censorship-unknown_sni_action) | `"drop"`, `"mask"`, `"accept"` | `"drop"` |
|
||||
| [`tls_fetch_scope`](#cfg-censorship-tls_fetch_scope) | `String` | `""` |
|
||||
| [`tls_fetch`](#cfg-censorship-tls_fetch) | `Table` | built-in defaults |
|
||||
| [`mask`](#cfg-censorship-mask) | `bool` | `true` |
|
||||
| [`mask_host`](#cfg-censorship-mask_host) | `String` | — |
|
||||
| [`mask_port`](#cfg-censorship-mask_port) | `u16` | `443` |
|
||||
| [`mask_unix_sock`](#cfg-censorship-mask_unix_sock) | `String` | — |
|
||||
| [`fake_cert_len`](#cfg-censorship-fake_cert_len) | `usize` | `2048` |
|
||||
| [`tls_emulation`](#cfg-censorship-tls_emulation) | `bool` | `true` |
|
||||
| [`tls_front_dir`](#cfg-censorship-tls_front_dir) | `String` | `"tlsfront"` |
|
||||
| [`server_hello_delay_min_ms`](#cfg-censorship-server_hello_delay_min_ms) | `u64` | `0` |
|
||||
| [`server_hello_delay_max_ms`](#cfg-censorship-server_hello_delay_max_ms) | `u64` | `0` |
|
||||
| [`tls_new_session_tickets`](#cfg-censorship-tls_new_session_tickets) | `u8` | `0` |
|
||||
| [`tls_full_cert_ttl_secs`](#cfg-censorship-tls_full_cert_ttl_secs) | `u64` | `90` |
|
||||
| [`alpn_enforce`](#cfg-censorship-alpn_enforce) | `bool` | `true` |
|
||||
| [`mask_proxy_protocol`](#cfg-censorship-mask_proxy_protocol) | `u8` | `0` |
|
||||
| [`mask_shape_hardening`](#cfg-censorship-mask_shape_hardening) | `bool` | `true` |
|
||||
| [`mask_shape_hardening_aggressive_mode`](#cfg-censorship-mask_shape_hardening_aggressive_mode) | `bool` | `false` |
|
||||
| [`mask_shape_bucket_floor_bytes`](#cfg-censorship-mask_shape_bucket_floor_bytes) | `usize` | `512` |
|
||||
| [`mask_shape_bucket_cap_bytes`](#cfg-censorship-mask_shape_bucket_cap_bytes) | `usize` | `4096` |
|
||||
| [`mask_shape_above_cap_blur`](#cfg-censorship-mask_shape_above_cap_blur) | `bool` | `false` |
|
||||
| [`mask_shape_above_cap_blur_max_bytes`](#cfg-censorship-mask_shape_above_cap_blur_max_bytes) | `usize` | `512` |
|
||||
| [`mask_relay_max_bytes`](#cfg-censorship-mask_relay_max_bytes) | `usize` | `5242880` |
|
||||
| [`mask_relay_timeout_ms`](#cfg-censorship-mask_relay_timeout_ms) | `u64` | `60_000` |
|
||||
| [`mask_relay_idle_timeout_ms`](#cfg-censorship-mask_relay_idle_timeout_ms) | `u64` | `5_000` |
|
||||
| [`mask_classifier_prefetch_timeout_ms`](#cfg-censorship-mask_classifier_prefetch_timeout_ms) | `u64` | `5` |
|
||||
| [`mask_timing_normalization_enabled`](#cfg-censorship-mask_timing_normalization_enabled) | `bool` | `false` |
|
||||
| [`mask_timing_normalization_floor_ms`](#cfg-censorship-mask_timing_normalization_floor_ms) | `u64` | `0` |
|
||||
| [`mask_timing_normalization_ceiling_ms`](#cfg-censorship-mask_timing_normalization_ceiling_ms) | `u64` | `0` |
|
||||
|
||||
## tls_domain
|
||||
## "cfg-censorship-tls_domain"
|
||||
- `tls_domain`
|
||||
- **Constraints / validation**: Must be a non-empty domain name. Must not contain spaces or `/`.
|
||||
- **Description**: Primary domain used for Fake-TLS masking / fronting profile and as the default SNI domain presented to clients.
|
||||
This value becomes part of generated `ee` links, and changing it invalidates previously generated links.
|
||||
@@ -2539,7 +2542,28 @@ Note: This section also accepts the legacy alias `[server.admin_api]` (same sche
|
||||
[censorship]
|
||||
mask_relay_max_bytes = 5242880
|
||||
```
|
||||
## mask_classifier_prefetch_timeout_ms
|
||||
## "cfg-censorship-mask_relay_timeout_ms"
|
||||
- `mask_relay_timeout_ms`
|
||||
- **Constraints / validation**: Should be `>= mask_relay_idle_timeout_ms`.
|
||||
- **Description**: Wall-clock cap for the full masking relay on non-MTProto fallback paths. Raise when the mask target is a long-lived service (e.g. WebSocket). Default: 60 000 ms (1 minute).
|
||||
- **Example**:
|
||||
|
||||
```toml
|
||||
[censorship]
|
||||
mask_relay_timeout_ms = 60000
|
||||
```
|
||||
## "cfg-censorship-mask_relay_idle_timeout_ms"
|
||||
- `mask_relay_idle_timeout_ms`
|
||||
- **Constraints / validation**: Should be `<= mask_relay_timeout_ms`.
|
||||
- **Description**: Per-read idle timeout on masking relay and drain paths. Limits resource consumption by slow-loris attacks and port scanners. A read call stalling beyond this value is treated as an abandoned connection. Default: 5 000 ms (5 s).
|
||||
- **Example**:
|
||||
|
||||
```toml
|
||||
[censorship]
|
||||
mask_relay_idle_timeout_ms = 5000
|
||||
```
|
||||
## "cfg-censorship-mask_classifier_prefetch_timeout_ms"
|
||||
- `mask_classifier_prefetch_timeout_ms`
|
||||
- **Constraints / validation**: Must be within `[5, 50]` (milliseconds).
|
||||
- **Description**: Timeout budget (ms) for extending fragmented initial classifier window on masking fallback.
|
||||
- **Example**:
|
||||
|
||||
@@ -615,6 +615,26 @@ pub(crate) fn default_mask_relay_max_bytes() -> usize {
|
||||
32 * 1024
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(crate) fn default_mask_relay_timeout_ms() -> u64 {
|
||||
60_000
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn default_mask_relay_timeout_ms() -> u64 {
|
||||
200
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(crate) fn default_mask_relay_idle_timeout_ms() -> u64 {
|
||||
5_000
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn default_mask_relay_idle_timeout_ms() -> u64 {
|
||||
100
|
||||
}
|
||||
|
||||
pub(crate) fn default_mask_classifier_prefetch_timeout_ms() -> u64 {
|
||||
5
|
||||
}
|
||||
|
||||
@@ -611,6 +611,8 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
|
||||
|| old.censorship.mask_shape_above_cap_blur_max_bytes
|
||||
!= new.censorship.mask_shape_above_cap_blur_max_bytes
|
||||
|| old.censorship.mask_relay_max_bytes != new.censorship.mask_relay_max_bytes
|
||||
|| old.censorship.mask_relay_timeout_ms != new.censorship.mask_relay_timeout_ms
|
||||
|| old.censorship.mask_relay_idle_timeout_ms != new.censorship.mask_relay_idle_timeout_ms
|
||||
|| old.censorship.mask_classifier_prefetch_timeout_ms
|
||||
!= new.censorship.mask_classifier_prefetch_timeout_ms
|
||||
|| old.censorship.mask_timing_normalization_enabled
|
||||
|
||||
@@ -1710,6 +1710,19 @@ pub struct AntiCensorshipConfig {
|
||||
#[serde(default = "default_mask_relay_max_bytes")]
|
||||
pub mask_relay_max_bytes: usize,
|
||||
|
||||
/// Wall-clock cap for the full masking relay on non-MTProto fallback paths.
|
||||
/// Raise when the mask target is a long-lived service (e.g. WebSocket).
|
||||
/// Default: 60 000 ms (60 s).
|
||||
#[serde(default = "default_mask_relay_timeout_ms")]
|
||||
pub mask_relay_timeout_ms: u64,
|
||||
|
||||
/// Per-read idle timeout on masking relay and drain paths.
|
||||
/// Limits resource consumption by slow-loris attacks and port scanners.
|
||||
/// A read call stalling beyond this is treated as an abandoned connection.
|
||||
/// Default: 5 000 ms (5 s).
|
||||
#[serde(default = "default_mask_relay_idle_timeout_ms")]
|
||||
pub mask_relay_idle_timeout_ms: u64,
|
||||
|
||||
/// Prefetch timeout (ms) for extending fragmented masking classifier window.
|
||||
#[serde(default = "default_mask_classifier_prefetch_timeout_ms")]
|
||||
pub mask_classifier_prefetch_timeout_ms: u64,
|
||||
@@ -1755,6 +1768,8 @@ impl Default for AntiCensorshipConfig {
|
||||
mask_shape_above_cap_blur: default_mask_shape_above_cap_blur(),
|
||||
mask_shape_above_cap_blur_max_bytes: default_mask_shape_above_cap_blur_max_bytes(),
|
||||
mask_relay_max_bytes: default_mask_relay_max_bytes(),
|
||||
mask_relay_timeout_ms: default_mask_relay_timeout_ms(),
|
||||
mask_relay_idle_timeout_ms: default_mask_relay_idle_timeout_ms(),
|
||||
mask_classifier_prefetch_timeout_ms: default_mask_classifier_prefetch_timeout_ms(),
|
||||
mask_timing_normalization_enabled: default_mask_timing_normalization_enabled(),
|
||||
mask_timing_normalization_floor_ms: default_mask_timing_normalization_floor_ms(),
|
||||
|
||||
@@ -28,14 +28,10 @@ use tracing::debug;
|
||||
const MASK_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
#[cfg(test)]
|
||||
const MASK_TIMEOUT: Duration = Duration::from_millis(50);
|
||||
/// Maximum duration for the entire masking relay.
|
||||
/// Limits resource consumption from slow-loris attacks and port scanners.
|
||||
#[cfg(not(test))]
|
||||
const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
/// Maximum duration for the entire masking relay under test (replaced by config at runtime).
|
||||
#[cfg(test)]
|
||||
const MASK_RELAY_TIMEOUT: Duration = Duration::from_millis(200);
|
||||
#[cfg(not(test))]
|
||||
const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
/// Per-read idle timeout for masking relay and drain paths under test (replaced by config at runtime).
|
||||
#[cfg(test)]
|
||||
const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
|
||||
const MASK_BUFFER_SIZE: usize = 8192;
|
||||
@@ -55,6 +51,7 @@ async fn copy_with_idle_timeout<R, W>(
|
||||
writer: &mut W,
|
||||
byte_cap: usize,
|
||||
shutdown_on_eof: bool,
|
||||
idle_timeout: Duration,
|
||||
) -> CopyOutcome
|
||||
where
|
||||
R: AsyncRead + Unpin,
|
||||
@@ -78,7 +75,7 @@ where
|
||||
}
|
||||
|
||||
let read_len = remaining_budget.min(MASK_BUFFER_SIZE);
|
||||
let read_res = timeout(MASK_RELAY_IDLE_TIMEOUT, reader.read(&mut buf[..read_len])).await;
|
||||
let read_res = timeout(idle_timeout, reader.read(&mut buf[..read_len])).await;
|
||||
let n = match read_res {
|
||||
Ok(Ok(n)) => n,
|
||||
Ok(Err(_)) | Err(_) => break,
|
||||
@@ -86,13 +83,13 @@ where
|
||||
if n == 0 {
|
||||
ended_by_eof = true;
|
||||
if shutdown_on_eof {
|
||||
let _ = timeout(MASK_RELAY_IDLE_TIMEOUT, writer.shutdown()).await;
|
||||
let _ = timeout(idle_timeout, writer.shutdown()).await;
|
||||
}
|
||||
break;
|
||||
}
|
||||
total = total.saturating_add(n);
|
||||
|
||||
let write_res = timeout(MASK_RELAY_IDLE_TIMEOUT, writer.write_all(&buf[..n])).await;
|
||||
let write_res = timeout(idle_timeout, writer.write_all(&buf[..n])).await;
|
||||
match write_res {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(_)) | Err(_) => break,
|
||||
@@ -230,13 +227,20 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn consume_client_data_with_timeout_and_cap<R>(reader: R, byte_cap: usize)
|
||||
where
|
||||
async fn consume_client_data_with_timeout_and_cap<R>(
|
||||
reader: R,
|
||||
byte_cap: usize,
|
||||
relay_timeout: Duration,
|
||||
idle_timeout: Duration,
|
||||
) where
|
||||
R: AsyncRead + Unpin,
|
||||
{
|
||||
if timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader, byte_cap))
|
||||
.await
|
||||
.is_err()
|
||||
if timeout(
|
||||
relay_timeout,
|
||||
consume_client_data(reader, byte_cap, idle_timeout),
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
debug!("Timed out while consuming client data on masking fallback path");
|
||||
}
|
||||
@@ -639,10 +643,18 @@ pub async fn handle_bad_client<R, W>(
|
||||
beobachten.record(client_type, peer.ip(), ttl);
|
||||
}
|
||||
|
||||
let relay_timeout = Duration::from_millis(config.censorship.mask_relay_timeout_ms);
|
||||
let idle_timeout = Duration::from_millis(config.censorship.mask_relay_idle_timeout_ms);
|
||||
|
||||
if !config.censorship.mask {
|
||||
// Masking disabled, just consume data
|
||||
consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes)
|
||||
.await;
|
||||
consume_client_data_with_timeout_and_cap(
|
||||
reader,
|
||||
config.censorship.mask_relay_max_bytes,
|
||||
relay_timeout,
|
||||
idle_timeout,
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -674,7 +686,7 @@ pub async fn handle_bad_client<R, W>(
|
||||
return;
|
||||
}
|
||||
if timeout(
|
||||
MASK_RELAY_TIMEOUT,
|
||||
relay_timeout,
|
||||
relay_to_mask(
|
||||
reader,
|
||||
writer,
|
||||
@@ -688,6 +700,7 @@ pub async fn handle_bad_client<R, W>(
|
||||
config.censorship.mask_shape_above_cap_blur_max_bytes,
|
||||
config.censorship.mask_shape_hardening_aggressive_mode,
|
||||
config.censorship.mask_relay_max_bytes,
|
||||
idle_timeout,
|
||||
),
|
||||
)
|
||||
.await
|
||||
@@ -703,6 +716,8 @@ pub async fn handle_bad_client<R, W>(
|
||||
consume_client_data_with_timeout_and_cap(
|
||||
reader,
|
||||
config.censorship.mask_relay_max_bytes,
|
||||
relay_timeout,
|
||||
idle_timeout,
|
||||
)
|
||||
.await;
|
||||
wait_mask_outcome_budget(outcome_started, config).await;
|
||||
@@ -712,6 +727,8 @@ pub async fn handle_bad_client<R, W>(
|
||||
consume_client_data_with_timeout_and_cap(
|
||||
reader,
|
||||
config.censorship.mask_relay_max_bytes,
|
||||
relay_timeout,
|
||||
idle_timeout,
|
||||
)
|
||||
.await;
|
||||
wait_mask_outcome_budget(outcome_started, config).await;
|
||||
@@ -742,8 +759,13 @@ pub async fn handle_bad_client<R, W>(
|
||||
local = %local_addr,
|
||||
"Mask target resolves to local listener; refusing self-referential masking fallback"
|
||||
);
|
||||
consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes)
|
||||
.await;
|
||||
consume_client_data_with_timeout_and_cap(
|
||||
reader,
|
||||
config.censorship.mask_relay_max_bytes,
|
||||
relay_timeout,
|
||||
idle_timeout,
|
||||
)
|
||||
.await;
|
||||
wait_mask_outcome_budget(outcome_started, config).await;
|
||||
return;
|
||||
}
|
||||
@@ -777,7 +799,7 @@ pub async fn handle_bad_client<R, W>(
|
||||
return;
|
||||
}
|
||||
if timeout(
|
||||
MASK_RELAY_TIMEOUT,
|
||||
relay_timeout,
|
||||
relay_to_mask(
|
||||
reader,
|
||||
writer,
|
||||
@@ -791,6 +813,7 @@ pub async fn handle_bad_client<R, W>(
|
||||
config.censorship.mask_shape_above_cap_blur_max_bytes,
|
||||
config.censorship.mask_shape_hardening_aggressive_mode,
|
||||
config.censorship.mask_relay_max_bytes,
|
||||
idle_timeout,
|
||||
),
|
||||
)
|
||||
.await
|
||||
@@ -806,6 +829,8 @@ pub async fn handle_bad_client<R, W>(
|
||||
consume_client_data_with_timeout_and_cap(
|
||||
reader,
|
||||
config.censorship.mask_relay_max_bytes,
|
||||
relay_timeout,
|
||||
idle_timeout,
|
||||
)
|
||||
.await;
|
||||
wait_mask_outcome_budget(outcome_started, config).await;
|
||||
@@ -815,6 +840,8 @@ pub async fn handle_bad_client<R, W>(
|
||||
consume_client_data_with_timeout_and_cap(
|
||||
reader,
|
||||
config.censorship.mask_relay_max_bytes,
|
||||
relay_timeout,
|
||||
idle_timeout,
|
||||
)
|
||||
.await;
|
||||
wait_mask_outcome_budget(outcome_started, config).await;
|
||||
@@ -836,6 +863,7 @@ async fn relay_to_mask<R, W, MR, MW>(
|
||||
shape_above_cap_blur_max_bytes: usize,
|
||||
shape_hardening_aggressive_mode: bool,
|
||||
mask_relay_max_bytes: usize,
|
||||
idle_timeout: Duration,
|
||||
) where
|
||||
R: AsyncRead + Unpin + Send + 'static,
|
||||
W: AsyncWrite + Unpin + Send + 'static,
|
||||
@@ -857,11 +885,19 @@ async fn relay_to_mask<R, W, MR, MW>(
|
||||
&mut mask_write,
|
||||
mask_relay_max_bytes,
|
||||
!shape_hardening_enabled,
|
||||
idle_timeout,
|
||||
)
|
||||
.await
|
||||
},
|
||||
async {
|
||||
copy_with_idle_timeout(&mut mask_read, &mut writer, mask_relay_max_bytes, true).await
|
||||
copy_with_idle_timeout(
|
||||
&mut mask_read,
|
||||
&mut writer,
|
||||
mask_relay_max_bytes,
|
||||
true,
|
||||
idle_timeout,
|
||||
)
|
||||
.await
|
||||
}
|
||||
);
|
||||
|
||||
@@ -889,7 +925,11 @@ async fn relay_to_mask<R, W, MR, MW>(
|
||||
}
|
||||
|
||||
/// Just consume all data from client without responding.
|
||||
async fn consume_client_data<R: AsyncRead + Unpin>(mut reader: R, byte_cap: usize) {
|
||||
async fn consume_client_data<R: AsyncRead + Unpin>(
|
||||
mut reader: R,
|
||||
byte_cap: usize,
|
||||
idle_timeout: Duration,
|
||||
) {
|
||||
if byte_cap == 0 {
|
||||
return;
|
||||
}
|
||||
@@ -905,7 +945,7 @@ async fn consume_client_data<R: AsyncRead + Unpin>(mut reader: R, byte_cap: usiz
|
||||
}
|
||||
|
||||
let read_len = remaining_budget.min(MASK_BUFFER_SIZE);
|
||||
let n = match timeout(MASK_RELAY_IDLE_TIMEOUT, reader.read(&mut buf[..read_len])).await {
|
||||
let n = match timeout(idle_timeout, reader.read(&mut buf[..read_len])).await {
|
||||
Ok(Ok(n)) => n,
|
||||
Ok(Err(_)) | Err(_) => break,
|
||||
};
|
||||
|
||||
@@ -47,7 +47,7 @@ async fn consume_client_data_stops_after_byte_cap_without_eof() {
|
||||
};
|
||||
let cap = 10_000usize;
|
||||
|
||||
consume_client_data(reader, cap).await;
|
||||
consume_client_data(reader, cap, MASK_RELAY_IDLE_TIMEOUT).await;
|
||||
|
||||
let total = produced.load(Ordering::Relaxed);
|
||||
assert!(
|
||||
|
||||
@@ -31,7 +31,7 @@ async fn stalling_client_terminates_at_idle_not_relay_timeout() {
|
||||
|
||||
let result = tokio::time::timeout(
|
||||
MASK_RELAY_TIMEOUT,
|
||||
consume_client_data(reader, MASK_BUFFER_SIZE * 4),
|
||||
consume_client_data(reader, MASK_BUFFER_SIZE * 4, MASK_RELAY_IDLE_TIMEOUT),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -57,9 +57,12 @@ async fn fast_reader_drains_to_eof() {
|
||||
let data = vec![0xAAu8; 32 * 1024];
|
||||
let reader = std::io::Cursor::new(data);
|
||||
|
||||
tokio::time::timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader, usize::MAX))
|
||||
.await
|
||||
.expect("consume_client_data did not complete for fast EOF reader");
|
||||
tokio::time::timeout(
|
||||
MASK_RELAY_TIMEOUT,
|
||||
consume_client_data(reader, usize::MAX, MASK_RELAY_IDLE_TIMEOUT),
|
||||
)
|
||||
.await
|
||||
.expect("consume_client_data did not complete for fast EOF reader");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -81,7 +84,7 @@ async fn io_error_terminates_cleanly() {
|
||||
|
||||
tokio::time::timeout(
|
||||
MASK_RELAY_TIMEOUT,
|
||||
consume_client_data(ErrReader, usize::MAX),
|
||||
consume_client_data(ErrReader, usize::MAX, MASK_RELAY_IDLE_TIMEOUT),
|
||||
)
|
||||
.await
|
||||
.expect("consume_client_data did not return on I/O error");
|
||||
|
||||
@@ -34,7 +34,11 @@ async fn consume_stall_stress_finishes_within_idle_budget() {
|
||||
set.spawn(async {
|
||||
tokio::time::timeout(
|
||||
MASK_RELAY_TIMEOUT,
|
||||
consume_client_data(OneByteThenStall { sent: false }, usize::MAX),
|
||||
consume_client_data(
|
||||
OneByteThenStall { sent: false },
|
||||
usize::MAX,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
),
|
||||
)
|
||||
.await
|
||||
.expect("consume_client_data exceeded relay timeout under stall load");
|
||||
@@ -56,7 +60,7 @@ async fn consume_stall_stress_finishes_within_idle_budget() {
|
||||
#[tokio::test]
|
||||
async fn consume_zero_cap_returns_immediately() {
|
||||
let started = Instant::now();
|
||||
consume_client_data(tokio::io::empty(), 0).await;
|
||||
consume_client_data(tokio::io::empty(), 0, MASK_RELAY_IDLE_TIMEOUT).await;
|
||||
assert!(
|
||||
started.elapsed() < MASK_RELAY_IDLE_TIMEOUT,
|
||||
"zero byte cap must return immediately"
|
||||
|
||||
@@ -127,7 +127,14 @@ async fn positive_copy_with_production_cap_stops_exactly_at_budget() {
|
||||
let mut reader = FinitePatternReader::new(PROD_CAP_BYTES + (256 * 1024), 4096, read_calls);
|
||||
let mut writer = CountingWriter::default();
|
||||
|
||||
let outcome = copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await;
|
||||
let outcome = copy_with_idle_timeout(
|
||||
&mut reader,
|
||||
&mut writer,
|
||||
PROD_CAP_BYTES,
|
||||
true,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
outcome.total, PROD_CAP_BYTES,
|
||||
@@ -145,7 +152,13 @@ async fn negative_consume_with_zero_cap_performs_no_reads() {
|
||||
let read_calls = Arc::new(AtomicUsize::new(0));
|
||||
let reader = FinitePatternReader::new(1024, 64, Arc::clone(&read_calls));
|
||||
|
||||
consume_client_data_with_timeout_and_cap(reader, 0).await;
|
||||
consume_client_data_with_timeout_and_cap(
|
||||
reader,
|
||||
0,
|
||||
MASK_RELAY_TIMEOUT,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
read_calls.load(Ordering::Relaxed),
|
||||
@@ -161,7 +174,14 @@ async fn edge_copy_below_cap_reports_eof_without_overread() {
|
||||
let mut reader = FinitePatternReader::new(payload, 3072, read_calls);
|
||||
let mut writer = CountingWriter::default();
|
||||
|
||||
let outcome = copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await;
|
||||
let outcome = copy_with_idle_timeout(
|
||||
&mut reader,
|
||||
&mut writer,
|
||||
PROD_CAP_BYTES,
|
||||
true,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(outcome.total, payload);
|
||||
assert_eq!(writer.written, payload);
|
||||
@@ -175,7 +195,13 @@ async fn edge_copy_below_cap_reports_eof_without_overread() {
|
||||
async fn adversarial_blackhat_never_ready_reader_is_bounded_by_timeout_guards() {
|
||||
let started = Instant::now();
|
||||
|
||||
consume_client_data_with_timeout_and_cap(NeverReadyReader, PROD_CAP_BYTES).await;
|
||||
consume_client_data_with_timeout_and_cap(
|
||||
NeverReadyReader,
|
||||
PROD_CAP_BYTES,
|
||||
MASK_RELAY_TIMEOUT,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(
|
||||
started.elapsed() < Duration::from_millis(350),
|
||||
@@ -190,7 +216,12 @@ async fn integration_consume_path_honors_production_cap_for_large_payload() {
|
||||
|
||||
let bounded = timeout(
|
||||
Duration::from_millis(350),
|
||||
consume_client_data_with_timeout_and_cap(reader, PROD_CAP_BYTES),
|
||||
consume_client_data_with_timeout_and_cap(
|
||||
reader,
|
||||
PROD_CAP_BYTES,
|
||||
MASK_RELAY_TIMEOUT,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -206,7 +237,13 @@ async fn adversarial_consume_path_never_reads_beyond_declared_byte_cap() {
|
||||
let total_read = Arc::new(AtomicUsize::new(0));
|
||||
let reader = BudgetProbeReader::new(256 * 1024, Arc::clone(&total_read));
|
||||
|
||||
consume_client_data_with_timeout_and_cap(reader, byte_cap).await;
|
||||
consume_client_data_with_timeout_and_cap(
|
||||
reader,
|
||||
byte_cap,
|
||||
MASK_RELAY_TIMEOUT,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(
|
||||
total_read.load(Ordering::Relaxed) <= byte_cap,
|
||||
@@ -231,7 +268,9 @@ async fn light_fuzz_cap_and_payload_matrix_preserves_min_budget_invariant() {
|
||||
let mut reader = FinitePatternReader::new(payload, chunk, read_calls);
|
||||
let mut writer = CountingWriter::default();
|
||||
|
||||
let outcome = copy_with_idle_timeout(&mut reader, &mut writer, cap, true).await;
|
||||
let outcome =
|
||||
copy_with_idle_timeout(&mut reader, &mut writer, cap, true, MASK_RELAY_IDLE_TIMEOUT)
|
||||
.await;
|
||||
let expected = payload.min(cap);
|
||||
|
||||
assert_eq!(
|
||||
@@ -261,7 +300,14 @@ async fn stress_parallel_copy_tasks_with_production_cap_complete_without_leaks()
|
||||
read_calls,
|
||||
);
|
||||
let mut writer = CountingWriter::default();
|
||||
copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await
|
||||
copy_with_idle_timeout(
|
||||
&mut reader,
|
||||
&mut writer,
|
||||
PROD_CAP_BYTES,
|
||||
true,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
)
|
||||
.await
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ async fn relay_to_mask_enforces_masking_session_byte_cap() {
|
||||
0,
|
||||
false,
|
||||
32 * 1024,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
@@ -81,6 +82,7 @@ async fn relay_to_mask_propagates_client_half_close_without_waiting_for_other_di
|
||||
0,
|
||||
false,
|
||||
32 * 1024,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
@@ -1377,6 +1377,7 @@ async fn relay_to_mask_keeps_backend_to_client_flow_when_client_to_backend_stall
|
||||
0,
|
||||
false,
|
||||
5 * 1024 * 1024,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
@@ -1508,6 +1509,7 @@ async fn relay_to_mask_timeout_cancels_and_drops_all_io_endpoints() {
|
||||
0,
|
||||
false,
|
||||
5 * 1024 * 1024,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -228,6 +228,7 @@ async fn relay_path_idle_timeout_eviction_remains_effective() {
|
||||
0,
|
||||
false,
|
||||
5 * 1024 * 1024,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@ async fn run_relay_case(
|
||||
above_cap_blur_max_bytes,
|
||||
false,
|
||||
5 * 1024 * 1024,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
@@ -89,6 +89,7 @@ async fn relay_to_mask_applies_cap_clamped_padding_for_non_power_of_two_cap() {
|
||||
0,
|
||||
false,
|
||||
5 * 1024 * 1024,
|
||||
MASK_RELAY_IDLE_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
@@ -67,10 +67,8 @@ struct FamilyReconnectOutcome {
|
||||
key: (i32, IpFamily),
|
||||
dc: i32,
|
||||
family: IpFamily,
|
||||
alive: usize,
|
||||
required: usize,
|
||||
endpoint_count: usize,
|
||||
restored: usize,
|
||||
}
|
||||
|
||||
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
|
||||
@@ -82,8 +80,6 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||
let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new();
|
||||
let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new();
|
||||
let mut degraded_interval = true;
|
||||
@@ -109,8 +105,6 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||
&mut single_endpoint_outage,
|
||||
&mut shadow_rotate_deadline,
|
||||
&mut idle_refresh_next_attempt,
|
||||
&mut adaptive_idle_since,
|
||||
&mut adaptive_recover_until,
|
||||
&mut floor_warn_next_allowed,
|
||||
)
|
||||
.await;
|
||||
@@ -126,8 +120,6 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||
&mut single_endpoint_outage,
|
||||
&mut shadow_rotate_deadline,
|
||||
&mut idle_refresh_next_attempt,
|
||||
&mut adaptive_idle_since,
|
||||
&mut adaptive_recover_until,
|
||||
&mut floor_warn_next_allowed,
|
||||
)
|
||||
.await;
|
||||
@@ -360,8 +352,6 @@ async fn check_family(
|
||||
single_endpoint_outage: &mut HashSet<(i32, IpFamily)>,
|
||||
shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
) -> bool {
|
||||
let enabled = match family {
|
||||
@@ -393,10 +383,7 @@ async fn check_family(
|
||||
let reconnect_budget = health_reconnect_budget(pool, dc_endpoints.len());
|
||||
let reconnect_sem = Arc::new(Semaphore::new(reconnect_budget));
|
||||
|
||||
if pool.floor_mode() == MeFloorMode::Static {
|
||||
adaptive_idle_since.clear();
|
||||
adaptive_recover_until.clear();
|
||||
}
|
||||
if pool.floor_mode() == MeFloorMode::Static {}
|
||||
|
||||
let mut live_addr_counts = HashMap::<(i32, SocketAddr), usize>::new();
|
||||
let mut live_writer_ids_by_addr = HashMap::<(i32, SocketAddr), Vec<u64>>::new();
|
||||
@@ -435,8 +422,6 @@ async fn check_family(
|
||||
&live_addr_counts,
|
||||
&live_writer_ids_by_addr,
|
||||
&bound_clients_by_writer,
|
||||
adaptive_idle_since,
|
||||
adaptive_recover_until,
|
||||
)
|
||||
.await;
|
||||
pool.set_adaptive_floor_runtime_caps(
|
||||
@@ -503,8 +488,6 @@ async fn check_family(
|
||||
outage_next_attempt.remove(&key);
|
||||
shadow_rotate_deadline.remove(&key);
|
||||
idle_refresh_next_attempt.remove(&key);
|
||||
adaptive_idle_since.remove(&key);
|
||||
adaptive_recover_until.remove(&key);
|
||||
info!(
|
||||
dc = %dc,
|
||||
?family,
|
||||
@@ -632,22 +615,28 @@ async fn check_family(
|
||||
restored += 1;
|
||||
continue;
|
||||
}
|
||||
pool_for_reconnect
|
||||
.stats
|
||||
.increment_me_floor_cap_block_total();
|
||||
pool_for_reconnect
|
||||
.stats
|
||||
.increment_me_floor_swap_idle_failed_total();
|
||||
debug!(
|
||||
dc = %dc,
|
||||
?family,
|
||||
alive,
|
||||
required,
|
||||
active_cap_effective_total,
|
||||
"Adaptive floor cap reached, reconnect attempt blocked"
|
||||
);
|
||||
break;
|
||||
|
||||
let base_req = pool_for_reconnect
|
||||
.required_writers_for_dc_with_floor_mode(endpoints_for_dc.len(), false);
|
||||
if alive + restored >= base_req {
|
||||
pool_for_reconnect
|
||||
.stats
|
||||
.increment_me_floor_cap_block_total();
|
||||
pool_for_reconnect
|
||||
.stats
|
||||
.increment_me_floor_swap_idle_failed_total();
|
||||
debug!(
|
||||
dc = %dc,
|
||||
?family,
|
||||
alive,
|
||||
required,
|
||||
active_cap_effective_total,
|
||||
"Adaptive floor cap reached, reconnect attempt blocked"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
pool_for_reconnect.stats.increment_me_reconnect_attempt();
|
||||
let res = tokio::time::timeout(
|
||||
pool_for_reconnect.reconnect_runtime.me_one_timeout,
|
||||
pool_for_reconnect.connect_endpoints_round_robin(
|
||||
@@ -663,11 +652,9 @@ async fn check_family(
|
||||
pool_for_reconnect.stats.increment_me_reconnect_success();
|
||||
}
|
||||
Ok(false) => {
|
||||
pool_for_reconnect.stats.increment_me_reconnect_attempt();
|
||||
debug!(dc = %dc, ?family, "ME round-robin reconnect failed")
|
||||
}
|
||||
Err(_) => {
|
||||
pool_for_reconnect.stats.increment_me_reconnect_attempt();
|
||||
debug!(dc = %dc, ?family, "ME reconnect timed out");
|
||||
}
|
||||
}
|
||||
@@ -678,10 +665,8 @@ async fn check_family(
|
||||
key,
|
||||
dc,
|
||||
family,
|
||||
alive,
|
||||
required,
|
||||
endpoint_count: endpoints_for_dc.len(),
|
||||
restored,
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -695,7 +680,7 @@ async fn check_family(
|
||||
}
|
||||
};
|
||||
let now = Instant::now();
|
||||
let now_alive = outcome.alive + outcome.restored;
|
||||
let now_alive = live_active_writers_for_dc_family(pool, outcome.dc, outcome.family).await;
|
||||
if now_alive >= outcome.required {
|
||||
info!(
|
||||
dc = %outcome.dc,
|
||||
@@ -851,6 +836,33 @@ fn should_emit_rate_limited_warn(
|
||||
false
|
||||
}
|
||||
|
||||
async fn live_active_writers_for_dc_family(pool: &Arc<MePool>, dc: i32, family: IpFamily) -> usize {
|
||||
let writers = pool.writers.read().await;
|
||||
writers
|
||||
.iter()
|
||||
.filter(|writer| {
|
||||
if writer.draining.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
return false;
|
||||
}
|
||||
if writer.writer_dc != dc {
|
||||
return false;
|
||||
}
|
||||
if !matches!(
|
||||
super::pool::WriterContour::from_u8(
|
||||
writer.contour.load(std::sync::atomic::Ordering::Relaxed),
|
||||
),
|
||||
super::pool::WriterContour::Active
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
match family {
|
||||
IpFamily::V4 => writer.addr.is_ipv4(),
|
||||
IpFamily::V6 => writer.addr.is_ipv6(),
|
||||
}
|
||||
})
|
||||
.count()
|
||||
}
|
||||
|
||||
fn adaptive_floor_class_min(
|
||||
pool: &Arc<MePool>,
|
||||
endpoint_count: usize,
|
||||
@@ -904,8 +916,6 @@ async fn build_family_floor_plan(
|
||||
live_addr_counts: &HashMap<(i32, SocketAddr), usize>,
|
||||
live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec<u64>>,
|
||||
bound_clients_by_writer: &HashMap<u64, usize>,
|
||||
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
) -> FamilyFloorPlan {
|
||||
let mut entries = Vec::<DcFloorPlanEntry>::new();
|
||||
let mut by_dc = HashMap::<i32, DcFloorPlanEntry>::new();
|
||||
@@ -921,18 +931,7 @@ async fn build_family_floor_plan(
|
||||
if endpoints.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let key = (*dc, family);
|
||||
let reduce_for_idle = should_reduce_floor_for_idle(
|
||||
pool,
|
||||
key,
|
||||
*dc,
|
||||
endpoints,
|
||||
live_writer_ids_by_addr,
|
||||
bound_clients_by_writer,
|
||||
adaptive_idle_since,
|
||||
adaptive_recover_until,
|
||||
)
|
||||
.await;
|
||||
let _key = (*dc, family);
|
||||
let base_required = pool.required_writers_for_dc(endpoints.len()).max(1);
|
||||
let min_required = if is_adaptive {
|
||||
adaptive_floor_class_min(pool, endpoints.len(), base_required)
|
||||
@@ -947,11 +946,11 @@ async fn build_family_floor_plan(
|
||||
if max_required < min_required {
|
||||
max_required = min_required;
|
||||
}
|
||||
let desired_raw = if is_adaptive && reduce_for_idle {
|
||||
min_required
|
||||
} else {
|
||||
base_required
|
||||
};
|
||||
// We initialize target_required at base_required to prevent 0-writer blackouts
|
||||
// caused by proactively dropping an idle DC to a single fragile connection.
|
||||
// The Adaptive Floor constraint loop below will gracefully compress idle DCs
|
||||
// (prioritized via has_bound_clients = false) to min_required only when global capacity is reached.
|
||||
let desired_raw = base_required;
|
||||
let target_required = desired_raw.clamp(min_required, max_required);
|
||||
let alive = endpoints
|
||||
.iter()
|
||||
@@ -1278,43 +1277,6 @@ async fn maybe_refresh_idle_writer_for_dc(
|
||||
);
|
||||
}
|
||||
|
||||
async fn should_reduce_floor_for_idle(
|
||||
pool: &Arc<MePool>,
|
||||
key: (i32, IpFamily),
|
||||
dc: i32,
|
||||
endpoints: &[SocketAddr],
|
||||
live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec<u64>>,
|
||||
bound_clients_by_writer: &HashMap<u64, usize>,
|
||||
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
) -> bool {
|
||||
if pool.floor_mode() != MeFloorMode::Adaptive {
|
||||
adaptive_idle_since.remove(&key);
|
||||
adaptive_recover_until.remove(&key);
|
||||
return false;
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
let writer_ids = list_writer_ids_for_endpoints(dc, endpoints, live_writer_ids_by_addr);
|
||||
let has_bound_clients = has_bound_clients_on_endpoint(&writer_ids, bound_clients_by_writer);
|
||||
if has_bound_clients {
|
||||
adaptive_idle_since.remove(&key);
|
||||
adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration());
|
||||
return false;
|
||||
}
|
||||
|
||||
if let Some(recover_until) = adaptive_recover_until.get(&key)
|
||||
&& now < *recover_until
|
||||
{
|
||||
adaptive_idle_since.remove(&key);
|
||||
return false;
|
||||
}
|
||||
adaptive_recover_until.remove(&key);
|
||||
|
||||
let idle_since = adaptive_idle_since.entry(key).or_insert(now);
|
||||
now.saturating_duration_since(*idle_since) >= pool.adaptive_floor_idle_duration()
|
||||
}
|
||||
|
||||
fn has_bound_clients_on_endpoint(
|
||||
writer_ids: &[u64],
|
||||
bound_clients_by_writer: &HashMap<u64, usize>,
|
||||
@@ -1364,6 +1326,7 @@ async fn recover_single_endpoint_outage(
|
||||
);
|
||||
return;
|
||||
};
|
||||
pool.stats.increment_me_reconnect_attempt();
|
||||
pool.stats
|
||||
.increment_me_single_endpoint_outage_reconnect_attempt_total();
|
||||
|
||||
@@ -1439,7 +1402,6 @@ async fn recover_single_endpoint_outage(
|
||||
return;
|
||||
}
|
||||
|
||||
pool.stats.increment_me_reconnect_attempt();
|
||||
let current_ms = *outage_backoff.get(&key).unwrap_or(&min_backoff_ms);
|
||||
let next_ms = current_ms.saturating_mul(2).min(max_backoff_ms);
|
||||
outage_backoff.insert(key, next_ms);
|
||||
|
||||
@@ -1422,22 +1422,6 @@ impl MePool {
|
||||
MeFloorMode::from_u8(self.floor_runtime.me_floor_mode.load(Ordering::Relaxed))
|
||||
}
|
||||
|
||||
pub(super) fn adaptive_floor_idle_duration(&self) -> Duration {
|
||||
Duration::from_secs(
|
||||
self.floor_runtime
|
||||
.me_adaptive_floor_idle_secs
|
||||
.load(Ordering::Relaxed),
|
||||
)
|
||||
}
|
||||
|
||||
pub(super) fn adaptive_floor_recover_grace_duration(&self) -> Duration {
|
||||
Duration::from_secs(
|
||||
self.floor_runtime
|
||||
.me_adaptive_floor_recover_grace_secs
|
||||
.load(Ordering::Relaxed),
|
||||
)
|
||||
}
|
||||
|
||||
pub(super) fn adaptive_floor_min_writers_multi_endpoint(&self) -> usize {
|
||||
(self
|
||||
.floor_runtime
|
||||
@@ -1659,6 +1643,7 @@ impl MePool {
|
||||
&self,
|
||||
contour: WriterContour,
|
||||
allow_coverage_override: bool,
|
||||
writer_dc: i32,
|
||||
) -> bool {
|
||||
let (active_writers, warm_writers, _) = self.non_draining_writer_counts_by_contour().await;
|
||||
match contour {
|
||||
@@ -1670,6 +1655,43 @@ impl MePool {
|
||||
if !allow_coverage_override {
|
||||
return false;
|
||||
}
|
||||
|
||||
let mut endpoints_len = 0;
|
||||
let now_epoch = Self::now_epoch_secs();
|
||||
if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch) {
|
||||
if let Some(addrs) = self.proxy_map_v4.read().await.get(&writer_dc) {
|
||||
endpoints_len += addrs.len();
|
||||
}
|
||||
}
|
||||
if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch) {
|
||||
if let Some(addrs) = self.proxy_map_v6.read().await.get(&writer_dc) {
|
||||
endpoints_len += addrs.len();
|
||||
}
|
||||
}
|
||||
|
||||
if endpoints_len > 0 {
|
||||
let base_req =
|
||||
self.required_writers_for_dc_with_floor_mode(endpoints_len, false);
|
||||
let active_for_dc = {
|
||||
let ws = self.writers.read().await;
|
||||
ws.iter()
|
||||
.filter(|w| {
|
||||
!w.draining.load(std::sync::atomic::Ordering::Relaxed)
|
||||
&& w.writer_dc == writer_dc
|
||||
&& matches!(
|
||||
WriterContour::from_u8(
|
||||
w.contour.load(std::sync::atomic::Ordering::Relaxed),
|
||||
),
|
||||
WriterContour::Active
|
||||
)
|
||||
})
|
||||
.count()
|
||||
};
|
||||
if active_for_dc < base_req {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
let coverage_required = self.active_coverage_required_total().await;
|
||||
active_writers < coverage_required
|
||||
}
|
||||
|
||||
@@ -77,6 +77,12 @@ impl MePool {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
if endpoints.len() == 1 && self.single_endpoint_outage_disable_quarantine() {
|
||||
let mut guard = self.endpoint_quarantine.lock().await;
|
||||
guard.retain(|_, expiry| *expiry > Instant::now());
|
||||
return endpoints.to_vec();
|
||||
}
|
||||
|
||||
let mut guard = self.endpoint_quarantine.lock().await;
|
||||
let now = Instant::now();
|
||||
guard.retain(|_, expiry| *expiry > now);
|
||||
@@ -236,8 +242,18 @@ impl MePool {
|
||||
let fast_retries = self.reconnect_runtime.me_reconnect_fast_retry_count.max(1);
|
||||
let mut total_attempts = 0u32;
|
||||
let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await;
|
||||
let dc_endpoints = self.endpoints_for_dc(writer_dc).await;
|
||||
let single_endpoint_dc = dc_endpoints.len() == 1 && dc_endpoints[0] == addr;
|
||||
let bypass_quarantine_for_single_endpoint =
|
||||
single_endpoint_dc && self.single_endpoint_outage_disable_quarantine();
|
||||
|
||||
if !same_endpoint_quarantined {
|
||||
if !same_endpoint_quarantined || bypass_quarantine_for_single_endpoint {
|
||||
if same_endpoint_quarantined && bypass_quarantine_for_single_endpoint {
|
||||
debug!(
|
||||
%addr,
|
||||
"Bypassing quarantine for immediate reconnect on single-endpoint DC"
|
||||
);
|
||||
}
|
||||
for attempt in 0..fast_retries {
|
||||
if total_attempts >= ME_REFILL_TOTAL_ATTEMPT_CAP {
|
||||
break;
|
||||
@@ -276,7 +292,6 @@ impl MePool {
|
||||
);
|
||||
}
|
||||
|
||||
let dc_endpoints = self.endpoints_for_dc(writer_dc).await;
|
||||
if dc_endpoints.is_empty() {
|
||||
self.stats.increment_me_refill_failed_total();
|
||||
return false;
|
||||
|
||||
@@ -342,7 +342,7 @@ impl MePool {
|
||||
allow_coverage_override: bool,
|
||||
) -> Result<()> {
|
||||
if !self
|
||||
.can_open_writer_for_contour(contour, allow_coverage_override)
|
||||
.can_open_writer_for_contour(contour, allow_coverage_override, writer_dc)
|
||||
.await
|
||||
{
|
||||
return Err(ProxyError::Proxy(format!(
|
||||
|
||||
@@ -109,18 +109,16 @@ async fn connectable_endpoints_waits_until_quarantine_expires() {
|
||||
|
||||
{
|
||||
let mut guard = pool.endpoint_quarantine.lock().await;
|
||||
guard.insert(addr, Instant::now() + Duration::from_millis(80));
|
||||
guard.insert(addr, Instant::now() + Duration::from_millis(500));
|
||||
}
|
||||
|
||||
let started = Instant::now();
|
||||
let endpoints = pool.connectable_endpoints_for_test(&[addr]).await;
|
||||
let elapsed = started.elapsed();
|
||||
|
||||
let endpoints = tokio::time::timeout(
|
||||
Duration::from_millis(120),
|
||||
pool.connectable_endpoints_for_test(&[addr]),
|
||||
)
|
||||
.await
|
||||
.expect("single-endpoint outage mode should bypass quarantine delay");
|
||||
assert_eq!(endpoints, vec![addr]);
|
||||
assert!(
|
||||
elapsed >= Duration::from_millis(50),
|
||||
"single-endpoint DC should honor quarantine before retry"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
Reference in New Issue
Block a user