mirror of
https://github.com/telemt/telemt.git
synced 2026-04-15 17:44:11 +03:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
982bfd20b9 | ||
|
|
0bcc3bf935 | ||
|
|
f7913721e2 |
@@ -2268,40 +2268,39 @@ Note: This section also accepts the legacy alias `[server.admin_api]` (same sche
|
|||||||
|
|
||||||
| Key | Type | Default |
|
| Key | Type | Default |
|
||||||
| --- | ---- | ------- |
|
| --- | ---- | ------- |
|
||||||
| [`tls_domain`](#cfg-censorship-tls_domain) | `String` | `"petrovich.ru"` |
|
| [`tls_domain`](#tls_domain) | `String` | `"petrovich.ru"` |
|
||||||
| [`tls_domains`](#cfg-censorship-tls_domains) | `String[]` | `[]` |
|
| [`tls_domains`](#tls_domains) | `String[]` | `[]` |
|
||||||
| [`unknown_sni_action`](#cfg-censorship-unknown_sni_action) | `"drop"`, `"mask"`, `"accept"` | `"drop"` |
|
| [`unknown_sni_action`](#unknown_sni_action) | `"drop"`, `"mask"`, `"accept"` | `"drop"` |
|
||||||
| [`tls_fetch_scope`](#cfg-censorship-tls_fetch_scope) | `String` | `""` |
|
| [`tls_fetch_scope`](#tls_fetch_scope) | `String` | `""` |
|
||||||
| [`tls_fetch`](#cfg-censorship-tls_fetch) | `Table` | built-in defaults |
|
| [`tls_fetch`](#tls_fetch) | `Table` | built-in defaults |
|
||||||
| [`mask`](#cfg-censorship-mask) | `bool` | `true` |
|
| [`mask`](#mask) | `bool` | `true` |
|
||||||
| [`mask_host`](#cfg-censorship-mask_host) | `String` | — |
|
| [`mask_host`](#mask_host) | `String` | — |
|
||||||
| [`mask_port`](#cfg-censorship-mask_port) | `u16` | `443` |
|
| [`mask_port`](#mask_port) | `u16` | `443` |
|
||||||
| [`mask_unix_sock`](#cfg-censorship-mask_unix_sock) | `String` | — |
|
| [`mask_unix_sock`](#mask_unix_sock) | `String` | — |
|
||||||
| [`fake_cert_len`](#cfg-censorship-fake_cert_len) | `usize` | `2048` |
|
| [`fake_cert_len`](#fake_cert_len) | `usize` | `2048` |
|
||||||
| [`tls_emulation`](#cfg-censorship-tls_emulation) | `bool` | `true` |
|
| [`tls_emulation`](#tls_emulation) | `bool` | `true` |
|
||||||
| [`tls_front_dir`](#cfg-censorship-tls_front_dir) | `String` | `"tlsfront"` |
|
| [`tls_front_dir`](#tls_front_dir) | `String` | `"tlsfront"` |
|
||||||
| [`server_hello_delay_min_ms`](#cfg-censorship-server_hello_delay_min_ms) | `u64` | `0` |
|
| [`server_hello_delay_min_ms`](#server_hello_delay_min_ms) | `u64` | `0` |
|
||||||
| [`server_hello_delay_max_ms`](#cfg-censorship-server_hello_delay_max_ms) | `u64` | `0` |
|
| [`server_hello_delay_max_ms`](#server_hello_delay_max_ms) | `u64` | `0` |
|
||||||
| [`tls_new_session_tickets`](#cfg-censorship-tls_new_session_tickets) | `u8` | `0` |
|
| [`tls_new_session_tickets`](#tls_new_session_tickets) | `u8` | `0` |
|
||||||
| [`tls_full_cert_ttl_secs`](#cfg-censorship-tls_full_cert_ttl_secs) | `u64` | `90` |
|
| [`tls_full_cert_ttl_secs`](#tls_full_cert_ttl_secs) | `u64` | `90` |
|
||||||
| [`alpn_enforce`](#cfg-censorship-alpn_enforce) | `bool` | `true` |
|
| [`alpn_enforce`](#alpn_enforce) | `bool` | `true` |
|
||||||
| [`mask_proxy_protocol`](#cfg-censorship-mask_proxy_protocol) | `u8` | `0` |
|
| [`mask_proxy_protocol`](#mask_proxy_protocol) | `u8` | `0` |
|
||||||
| [`mask_shape_hardening`](#cfg-censorship-mask_shape_hardening) | `bool` | `true` |
|
| [`mask_shape_hardening`](#mask_shape_hardening) | `bool` | `true` |
|
||||||
| [`mask_shape_hardening_aggressive_mode`](#cfg-censorship-mask_shape_hardening_aggressive_mode) | `bool` | `false` |
|
| [`mask_shape_hardening_aggressive_mode`](#mask_shape_hardening_aggressive_mode) | `bool` | `false` |
|
||||||
| [`mask_shape_bucket_floor_bytes`](#cfg-censorship-mask_shape_bucket_floor_bytes) | `usize` | `512` |
|
| [`mask_shape_bucket_floor_bytes`](#mask_shape_bucket_floor_bytes) | `usize` | `512` |
|
||||||
| [`mask_shape_bucket_cap_bytes`](#cfg-censorship-mask_shape_bucket_cap_bytes) | `usize` | `4096` |
|
| [`mask_shape_bucket_cap_bytes`](#mask_shape_bucket_cap_bytes) | `usize` | `4096` |
|
||||||
| [`mask_shape_above_cap_blur`](#cfg-censorship-mask_shape_above_cap_blur) | `bool` | `false` |
|
| [`mask_shape_above_cap_blur`](#mask_shape_above_cap_blur) | `bool` | `false` |
|
||||||
| [`mask_shape_above_cap_blur_max_bytes`](#cfg-censorship-mask_shape_above_cap_blur_max_bytes) | `usize` | `512` |
|
| [`mask_shape_above_cap_blur_max_bytes`](#mask_shape_above_cap_blur_max_bytes) | `usize` | `512` |
|
||||||
| [`mask_relay_max_bytes`](#cfg-censorship-mask_relay_max_bytes) | `usize` | `5242880` |
|
| [`mask_relay_max_bytes`](#mask_relay_max_bytes) | `usize` | `5242880` |
|
||||||
| [`mask_relay_timeout_ms`](#cfg-censorship-mask_relay_timeout_ms) | `u64` | `60_000` |
|
| [`mask_relay_timeout_ms`](#mask_relay_timeout_ms) | `u64` | `60_000` |
|
||||||
| [`mask_relay_idle_timeout_ms`](#cfg-censorship-mask_relay_idle_timeout_ms) | `u64` | `5_000` |
|
| [`mask_relay_idle_timeout_ms`](#mask_relay_idle_timeout_ms) | `u64` | `5_000` |
|
||||||
| [`mask_classifier_prefetch_timeout_ms`](#cfg-censorship-mask_classifier_prefetch_timeout_ms) | `u64` | `5` |
|
| [`mask_classifier_prefetch_timeout_ms`](#mask_classifier_prefetch_timeout_ms) | `u64` | `5` |
|
||||||
| [`mask_timing_normalization_enabled`](#cfg-censorship-mask_timing_normalization_enabled) | `bool` | `false` |
|
| [`mask_timing_normalization_enabled`](#mask_timing_normalization_enabled) | `bool` | `false` |
|
||||||
| [`mask_timing_normalization_floor_ms`](#cfg-censorship-mask_timing_normalization_floor_ms) | `u64` | `0` |
|
| [`mask_timing_normalization_floor_ms`](#mask_timing_normalization_floor_ms) | `u64` | `0` |
|
||||||
| [`mask_timing_normalization_ceiling_ms`](#cfg-censorship-mask_timing_normalization_ceiling_ms) | `u64` | `0` |
|
| [`mask_timing_normalization_ceiling_ms`](#mask_timing_normalization_ceiling_ms) | `u64` | `0` |
|
||||||
|
|
||||||
## "cfg-censorship-tls_domain"
|
## tls_domain
|
||||||
- `tls_domain`
|
|
||||||
- **Constraints / validation**: Must be a non-empty domain name. Must not contain spaces or `/`.
|
- **Constraints / validation**: Must be a non-empty domain name. Must not contain spaces or `/`.
|
||||||
- **Description**: Primary domain used for Fake-TLS masking / fronting profile and as the default SNI domain presented to clients.
|
- **Description**: Primary domain used for Fake-TLS masking / fronting profile and as the default SNI domain presented to clients.
|
||||||
This value becomes part of generated `ee` links, and changing it invalidates previously generated links.
|
This value becomes part of generated `ee` links, and changing it invalidates previously generated links.
|
||||||
@@ -2542,8 +2541,7 @@ Note: This section also accepts the legacy alias `[server.admin_api]` (same sche
|
|||||||
[censorship]
|
[censorship]
|
||||||
mask_relay_max_bytes = 5242880
|
mask_relay_max_bytes = 5242880
|
||||||
```
|
```
|
||||||
## "cfg-censorship-mask_relay_timeout_ms"
|
## mask_relay_timeout_ms
|
||||||
- `mask_relay_timeout_ms`
|
|
||||||
- **Constraints / validation**: Should be `>= mask_relay_idle_timeout_ms`.
|
- **Constraints / validation**: Should be `>= mask_relay_idle_timeout_ms`.
|
||||||
- **Description**: Wall-clock cap for the full masking relay on non-MTProto fallback paths. Raise when the mask target is a long-lived service (e.g. WebSocket). Default: 60 000 ms (1 minute).
|
- **Description**: Wall-clock cap for the full masking relay on non-MTProto fallback paths. Raise when the mask target is a long-lived service (e.g. WebSocket). Default: 60 000 ms (1 minute).
|
||||||
- **Example**:
|
- **Example**:
|
||||||
@@ -2552,8 +2550,7 @@ Note: This section also accepts the legacy alias `[server.admin_api]` (same sche
|
|||||||
[censorship]
|
[censorship]
|
||||||
mask_relay_timeout_ms = 60000
|
mask_relay_timeout_ms = 60000
|
||||||
```
|
```
|
||||||
## "cfg-censorship-mask_relay_idle_timeout_ms"
|
## mask_relay_idle_timeout_ms
|
||||||
- `mask_relay_idle_timeout_ms`
|
|
||||||
- **Constraints / validation**: Should be `<= mask_relay_timeout_ms`.
|
- **Constraints / validation**: Should be `<= mask_relay_timeout_ms`.
|
||||||
- **Description**: Per-read idle timeout on masking relay and drain paths. Limits resource consumption by slow-loris attacks and port scanners. A read call stalling beyond this value is treated as an abandoned connection. Default: 5 000 ms (5 s).
|
- **Description**: Per-read idle timeout on masking relay and drain paths. Limits resource consumption by slow-loris attacks and port scanners. A read call stalling beyond this value is treated as an abandoned connection. Default: 5 000 ms (5 s).
|
||||||
- **Example**:
|
- **Example**:
|
||||||
@@ -2562,8 +2559,7 @@ Note: This section also accepts the legacy alias `[server.admin_api]` (same sche
|
|||||||
[censorship]
|
[censorship]
|
||||||
mask_relay_idle_timeout_ms = 5000
|
mask_relay_idle_timeout_ms = 5000
|
||||||
```
|
```
|
||||||
## "cfg-censorship-mask_classifier_prefetch_timeout_ms"
|
## mask_classifier_prefetch_timeout_ms
|
||||||
- `mask_classifier_prefetch_timeout_ms`
|
|
||||||
- **Constraints / validation**: Must be within `[5, 50]` (milliseconds).
|
- **Constraints / validation**: Must be within `[5, 50]` (milliseconds).
|
||||||
- **Description**: Timeout budget (ms) for extending fragmented initial classifier window on masking fallback.
|
- **Description**: Timeout budget (ms) for extending fragmented initial classifier window on masking fallback.
|
||||||
- **Example**:
|
- **Example**:
|
||||||
|
|||||||
@@ -2299,6 +2299,8 @@
|
|||||||
| [`mask_shape_above_cap_blur`](#mask_shape_above_cap_blur) | `bool` | `false` |
|
| [`mask_shape_above_cap_blur`](#mask_shape_above_cap_blur) | `bool` | `false` |
|
||||||
| [`mask_shape_above_cap_blur_max_bytes`](#mask_shape_above_cap_blur_max_bytes) | `usize` | `512` |
|
| [`mask_shape_above_cap_blur_max_bytes`](#mask_shape_above_cap_blur_max_bytes) | `usize` | `512` |
|
||||||
| [`mask_relay_max_bytes`](#mask_relay_max_bytes) | `usize` | `5242880` |
|
| [`mask_relay_max_bytes`](#mask_relay_max_bytes) | `usize` | `5242880` |
|
||||||
|
| [`mask_relay_timeout_ms`](mask_relay_timeout_ms) | `u64` | `60_000` |
|
||||||
|
| [`mask_relay_idle_timeout_ms`](mask_relay_idle_timeout_ms) | `u64` | `5_000` |
|
||||||
| [`mask_classifier_prefetch_timeout_ms`](#mask_classifier_prefetch_timeout_ms) | `u64` | `5` |
|
| [`mask_classifier_prefetch_timeout_ms`](#mask_classifier_prefetch_timeout_ms) | `u64` | `5` |
|
||||||
| [`mask_timing_normalization_enabled`](#mask_timing_normalization_enabled) | `bool` | `false` |
|
| [`mask_timing_normalization_enabled`](#mask_timing_normalization_enabled) | `bool` | `false` |
|
||||||
| [`mask_timing_normalization_floor_ms`](#mask_timing_normalization_floor_ms) | `u64` | `0` |
|
| [`mask_timing_normalization_floor_ms`](#mask_timing_normalization_floor_ms) | `u64` | `0` |
|
||||||
@@ -2544,6 +2546,26 @@
|
|||||||
[censorship]
|
[censorship]
|
||||||
mask_relay_max_bytes = 5242880
|
mask_relay_max_bytes = 5242880
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## mask_relay_timeout_ms
|
||||||
|
- **Constraints / validation**: Должно быть больше или равно `mask_relay_idle_timeout_ms`.
|
||||||
|
- **Description**: Жёсткий лимит по реальному времени (wall-clock) для полного маскирующего проксирования на fallback-путях без MTProto. Увеличивайте значение, если целевой сервис маскирования является долгоживущим (например, WebSocket-соединение). Значение по умолчанию: 60 000 мс (1 минута).
|
||||||
|
- **Example**:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[censorship]
|
||||||
|
mask_relay_timeout_ms = 60000
|
||||||
|
```
|
||||||
|
## mask_relay_idle_timeout_ms
|
||||||
|
- **Constraints / validation**: Должно быть меньше или равно `mask_relay_timeout_ms`.
|
||||||
|
- **Description**: Тайм-аут простоя на каждую операцию чтения (per-read idle timeout) в маскирующем прокси и drain-пайплайнах. Ограничивает потребление ресурсов при атаках типа slow-loris и сканировании портов. Если операция чтения блокируется дольше заданного времени, соединение считается заброшенным и закрывается. Значение по умолчанию: 5 000 мс (5 с).
|
||||||
|
- **Example**:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[censorship]
|
||||||
|
mask_relay_idle_timeout_ms = 5000
|
||||||
|
```
|
||||||
|
|
||||||
## mask_classifier_prefetch_timeout_ms
|
## mask_classifier_prefetch_timeout_ms
|
||||||
- **Ограничения / валидация**: Должно быть в пределах `[5, 50]` (миллисекунд).
|
- **Ограничения / валидация**: Должно быть в пределах `[5, 50]` (миллисекунд).
|
||||||
- **Описание**: Лимит времени ожидания (в миллисекундах) для расширения первых входящих данных в режиме fallback-маскировки.
|
- **Описание**: Лимит времени ожидания (в миллисекундах) для расширения первых входящих данных в режиме fallback-маскировки.
|
||||||
|
|||||||
@@ -121,8 +121,6 @@ pub struct HotFields {
|
|||||||
pub user_max_tcp_conns_global_each: usize,
|
pub user_max_tcp_conns_global_each: usize,
|
||||||
pub user_expirations: std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
|
pub user_expirations: std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
|
||||||
pub user_data_quota: std::collections::HashMap<String, u64>,
|
pub user_data_quota: std::collections::HashMap<String, u64>,
|
||||||
pub user_rate_limits: std::collections::HashMap<String, crate::config::RateLimitBps>,
|
|
||||||
pub cidr_rate_limits: std::collections::HashMap<ipnetwork::IpNetwork, crate::config::RateLimitBps>,
|
|
||||||
pub user_max_unique_ips: std::collections::HashMap<String, usize>,
|
pub user_max_unique_ips: std::collections::HashMap<String, usize>,
|
||||||
pub user_max_unique_ips_global_each: usize,
|
pub user_max_unique_ips_global_each: usize,
|
||||||
pub user_max_unique_ips_mode: crate::config::UserMaxUniqueIpsMode,
|
pub user_max_unique_ips_mode: crate::config::UserMaxUniqueIpsMode,
|
||||||
@@ -247,8 +245,6 @@ impl HotFields {
|
|||||||
user_max_tcp_conns_global_each: cfg.access.user_max_tcp_conns_global_each,
|
user_max_tcp_conns_global_each: cfg.access.user_max_tcp_conns_global_each,
|
||||||
user_expirations: cfg.access.user_expirations.clone(),
|
user_expirations: cfg.access.user_expirations.clone(),
|
||||||
user_data_quota: cfg.access.user_data_quota.clone(),
|
user_data_quota: cfg.access.user_data_quota.clone(),
|
||||||
user_rate_limits: cfg.access.user_rate_limits.clone(),
|
|
||||||
cidr_rate_limits: cfg.access.cidr_rate_limits.clone(),
|
|
||||||
user_max_unique_ips: cfg.access.user_max_unique_ips.clone(),
|
user_max_unique_ips: cfg.access.user_max_unique_ips.clone(),
|
||||||
user_max_unique_ips_global_each: cfg.access.user_max_unique_ips_global_each,
|
user_max_unique_ips_global_each: cfg.access.user_max_unique_ips_global_each,
|
||||||
user_max_unique_ips_mode: cfg.access.user_max_unique_ips_mode,
|
user_max_unique_ips_mode: cfg.access.user_max_unique_ips_mode,
|
||||||
@@ -549,8 +545,6 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
|
|||||||
cfg.access.user_max_tcp_conns_global_each = new.access.user_max_tcp_conns_global_each;
|
cfg.access.user_max_tcp_conns_global_each = new.access.user_max_tcp_conns_global_each;
|
||||||
cfg.access.user_expirations = new.access.user_expirations.clone();
|
cfg.access.user_expirations = new.access.user_expirations.clone();
|
||||||
cfg.access.user_data_quota = new.access.user_data_quota.clone();
|
cfg.access.user_data_quota = new.access.user_data_quota.clone();
|
||||||
cfg.access.user_rate_limits = new.access.user_rate_limits.clone();
|
|
||||||
cfg.access.cidr_rate_limits = new.access.cidr_rate_limits.clone();
|
|
||||||
cfg.access.user_max_unique_ips = new.access.user_max_unique_ips.clone();
|
cfg.access.user_max_unique_ips = new.access.user_max_unique_ips.clone();
|
||||||
cfg.access.user_max_unique_ips_global_each = new.access.user_max_unique_ips_global_each;
|
cfg.access.user_max_unique_ips_global_each = new.access.user_max_unique_ips_global_each;
|
||||||
cfg.access.user_max_unique_ips_mode = new.access.user_max_unique_ips_mode;
|
cfg.access.user_max_unique_ips_mode = new.access.user_max_unique_ips_mode;
|
||||||
@@ -1189,18 +1183,6 @@ fn log_changes(
|
|||||||
new_hot.user_data_quota.len()
|
new_hot.user_data_quota.len()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if old_hot.user_rate_limits != new_hot.user_rate_limits {
|
|
||||||
info!(
|
|
||||||
"config reload: user_rate_limits updated ({} entries)",
|
|
||||||
new_hot.user_rate_limits.len()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
if old_hot.cidr_rate_limits != new_hot.cidr_rate_limits {
|
|
||||||
info!(
|
|
||||||
"config reload: cidr_rate_limits updated ({} entries)",
|
|
||||||
new_hot.cidr_rate_limits.len()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
if old_hot.user_max_unique_ips != new_hot.user_max_unique_ips {
|
if old_hot.user_max_unique_ips != new_hot.user_max_unique_ips {
|
||||||
info!(
|
info!(
|
||||||
"config reload: user_max_unique_ips updated ({} entries)",
|
"config reload: user_max_unique_ips updated ({} entries)",
|
||||||
|
|||||||
@@ -861,22 +861,6 @@ impl ProxyConfig {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (user, limit) in &config.access.user_rate_limits {
|
|
||||||
if limit.up_bps == 0 && limit.down_bps == 0 {
|
|
||||||
return Err(ProxyError::Config(format!(
|
|
||||||
"access.user_rate_limits.{user} must set at least one non-zero direction"
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (cidr, limit) in &config.access.cidr_rate_limits {
|
|
||||||
if limit.up_bps == 0 && limit.down_bps == 0 {
|
|
||||||
return Err(ProxyError::Config(format!(
|
|
||||||
"access.cidr_rate_limits.{cidr} must set at least one non-zero direction"
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.general.me_reinit_every_secs == 0 {
|
if config.general.me_reinit_every_secs == 0 {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
"general.me_reinit_every_secs must be > 0".to_string(),
|
"general.me_reinit_every_secs must be > 0".to_string(),
|
||||||
|
|||||||
@@ -1826,21 +1826,6 @@ pub struct AccessConfig {
|
|||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub user_data_quota: HashMap<String, u64>,
|
pub user_data_quota: HashMap<String, u64>,
|
||||||
|
|
||||||
/// Per-user transport rate limits in bits-per-second.
|
|
||||||
///
|
|
||||||
/// Each entry supports independent upload (`up_bps`) and download
|
|
||||||
/// (`down_bps`) ceilings. A value of `0` in one direction means
|
|
||||||
/// "unlimited" for that direction.
|
|
||||||
#[serde(default)]
|
|
||||||
pub user_rate_limits: HashMap<String, RateLimitBps>,
|
|
||||||
|
|
||||||
/// Per-CIDR aggregate transport rate limits in bits-per-second.
|
|
||||||
///
|
|
||||||
/// Matching uses longest-prefix-wins semantics. A value of `0` in one
|
|
||||||
/// direction means "unlimited" for that direction.
|
|
||||||
#[serde(default)]
|
|
||||||
pub cidr_rate_limits: HashMap<IpNetwork, RateLimitBps>,
|
|
||||||
|
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub user_max_unique_ips: HashMap<String, usize>,
|
pub user_max_unique_ips: HashMap<String, usize>,
|
||||||
|
|
||||||
@@ -1874,8 +1859,6 @@ impl Default for AccessConfig {
|
|||||||
user_max_tcp_conns_global_each: default_user_max_tcp_conns_global_each(),
|
user_max_tcp_conns_global_each: default_user_max_tcp_conns_global_each(),
|
||||||
user_expirations: HashMap::new(),
|
user_expirations: HashMap::new(),
|
||||||
user_data_quota: HashMap::new(),
|
user_data_quota: HashMap::new(),
|
||||||
user_rate_limits: HashMap::new(),
|
|
||||||
cidr_rate_limits: HashMap::new(),
|
|
||||||
user_max_unique_ips: HashMap::new(),
|
user_max_unique_ips: HashMap::new(),
|
||||||
user_max_unique_ips_global_each: default_user_max_unique_ips_global_each(),
|
user_max_unique_ips_global_each: default_user_max_unique_ips_global_each(),
|
||||||
user_max_unique_ips_mode: UserMaxUniqueIpsMode::default(),
|
user_max_unique_ips_mode: UserMaxUniqueIpsMode::default(),
|
||||||
@@ -1887,14 +1870,6 @@ impl Default for AccessConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
|
|
||||||
pub struct RateLimitBps {
|
|
||||||
#[serde(default)]
|
|
||||||
pub up_bps: u64,
|
|
||||||
#[serde(default)]
|
|
||||||
pub down_bps: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
// ============= Aux Structures =============
|
// ============= Aux Structures =============
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
|
|||||||
@@ -664,11 +664,6 @@ async fn run_telemt_core(
|
|||||||
));
|
));
|
||||||
|
|
||||||
let buffer_pool = Arc::new(BufferPool::with_config(64 * 1024, 4096));
|
let buffer_pool = Arc::new(BufferPool::with_config(64 * 1024, 4096));
|
||||||
let shared_state = ProxySharedState::new();
|
|
||||||
shared_state.traffic_limiter.apply_policy(
|
|
||||||
config.access.user_rate_limits.clone(),
|
|
||||||
config.access.cidr_rate_limits.clone(),
|
|
||||||
);
|
|
||||||
|
|
||||||
connectivity::run_startup_connectivity(
|
connectivity::run_startup_connectivity(
|
||||||
&config,
|
&config,
|
||||||
@@ -700,7 +695,6 @@ async fn run_telemt_core(
|
|||||||
beobachten.clone(),
|
beobachten.clone(),
|
||||||
api_config_tx.clone(),
|
api_config_tx.clone(),
|
||||||
me_pool.clone(),
|
me_pool.clone(),
|
||||||
shared_state.clone(),
|
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
let config_rx = runtime_watches.config_rx;
|
let config_rx = runtime_watches.config_rx;
|
||||||
@@ -717,6 +711,7 @@ async fn run_telemt_core(
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
let _admission_tx_hold = admission_tx;
|
let _admission_tx_hold = admission_tx;
|
||||||
|
let shared_state = ProxySharedState::new();
|
||||||
conntrack_control::spawn_conntrack_controller(
|
conntrack_control::spawn_conntrack_controller(
|
||||||
config_rx.clone(),
|
config_rx.clone(),
|
||||||
stats.clone(),
|
stats.clone(),
|
||||||
|
|||||||
@@ -51,7 +51,6 @@ pub(crate) async fn spawn_runtime_tasks(
|
|||||||
beobachten: Arc<BeobachtenStore>,
|
beobachten: Arc<BeobachtenStore>,
|
||||||
api_config_tx: watch::Sender<Arc<ProxyConfig>>,
|
api_config_tx: watch::Sender<Arc<ProxyConfig>>,
|
||||||
me_pool_for_policy: Option<Arc<MePool>>,
|
me_pool_for_policy: Option<Arc<MePool>>,
|
||||||
shared_state: Arc<ProxySharedState>,
|
|
||||||
) -> RuntimeWatches {
|
) -> RuntimeWatches {
|
||||||
let um_clone = upstream_manager.clone();
|
let um_clone = upstream_manager.clone();
|
||||||
let dc_overrides_for_health = config.dc_overrides.clone();
|
let dc_overrides_for_health = config.dc_overrides.clone();
|
||||||
@@ -183,33 +182,6 @@ pub(crate) async fn spawn_runtime_tasks(
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let limiter = shared_state.traffic_limiter.clone();
|
|
||||||
limiter.apply_policy(
|
|
||||||
config.access.user_rate_limits.clone(),
|
|
||||||
config.access.cidr_rate_limits.clone(),
|
|
||||||
);
|
|
||||||
let mut config_rx_rate_limits = config_rx.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut prev_user_limits = config_rx_rate_limits.borrow().access.user_rate_limits.clone();
|
|
||||||
let mut prev_cidr_limits = config_rx_rate_limits.borrow().access.cidr_rate_limits.clone();
|
|
||||||
loop {
|
|
||||||
if config_rx_rate_limits.changed().await.is_err() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
let cfg = config_rx_rate_limits.borrow_and_update().clone();
|
|
||||||
if prev_user_limits != cfg.access.user_rate_limits
|
|
||||||
|| prev_cidr_limits != cfg.access.cidr_rate_limits
|
|
||||||
{
|
|
||||||
limiter.apply_policy(
|
|
||||||
cfg.access.user_rate_limits.clone(),
|
|
||||||
cfg.access.cidr_rate_limits.clone(),
|
|
||||||
);
|
|
||||||
prev_user_limits = cfg.access.user_rate_limits.clone();
|
|
||||||
prev_cidr_limits = cfg.access.cidr_rate_limits.clone();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let beobachten_writer = beobachten.clone();
|
let beobachten_writer = beobachten.clone();
|
||||||
let config_rx_beobachten = config_rx.clone();
|
let config_rx_beobachten = config_rx.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|||||||
133
src/metrics.rs
133
src/metrics.rs
@@ -575,139 +575,6 @@ async fn render_metrics(
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
let limiter_metrics = shared_state.traffic_limiter.metrics_snapshot();
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# HELP telemt_rate_limiter_throttle_total Traffic limiter throttle events by scope and direction"
|
|
||||||
);
|
|
||||||
let _ = writeln!(out, "# TYPE telemt_rate_limiter_throttle_total counter");
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_rate_limiter_throttle_total{{scope=\"user\",direction=\"up\"}} {}",
|
|
||||||
if core_enabled {
|
|
||||||
limiter_metrics.user_throttle_up_total
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_rate_limiter_throttle_total{{scope=\"user\",direction=\"down\"}} {}",
|
|
||||||
if core_enabled {
|
|
||||||
limiter_metrics.user_throttle_down_total
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_rate_limiter_throttle_total{{scope=\"cidr\",direction=\"up\"}} {}",
|
|
||||||
if core_enabled {
|
|
||||||
limiter_metrics.cidr_throttle_up_total
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_rate_limiter_throttle_total{{scope=\"cidr\",direction=\"down\"}} {}",
|
|
||||||
if core_enabled {
|
|
||||||
limiter_metrics.cidr_throttle_down_total
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# HELP telemt_rate_limiter_wait_ms_total Traffic limiter accumulated wait time in milliseconds by scope and direction"
|
|
||||||
);
|
|
||||||
let _ = writeln!(out, "# TYPE telemt_rate_limiter_wait_ms_total counter");
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_rate_limiter_wait_ms_total{{scope=\"user\",direction=\"up\"}} {}",
|
|
||||||
if core_enabled {
|
|
||||||
limiter_metrics.user_wait_up_ms_total
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_rate_limiter_wait_ms_total{{scope=\"user\",direction=\"down\"}} {}",
|
|
||||||
if core_enabled {
|
|
||||||
limiter_metrics.user_wait_down_ms_total
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_rate_limiter_wait_ms_total{{scope=\"cidr\",direction=\"up\"}} {}",
|
|
||||||
if core_enabled {
|
|
||||||
limiter_metrics.cidr_wait_up_ms_total
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_rate_limiter_wait_ms_total{{scope=\"cidr\",direction=\"down\"}} {}",
|
|
||||||
if core_enabled {
|
|
||||||
limiter_metrics.cidr_wait_down_ms_total
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# HELP telemt_rate_limiter_active_leases Active relay leases under rate limiting by scope"
|
|
||||||
);
|
|
||||||
let _ = writeln!(out, "# TYPE telemt_rate_limiter_active_leases gauge");
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_rate_limiter_active_leases{{scope=\"user\"}} {}",
|
|
||||||
if core_enabled {
|
|
||||||
limiter_metrics.user_active_leases
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_rate_limiter_active_leases{{scope=\"cidr\"}} {}",
|
|
||||||
if core_enabled {
|
|
||||||
limiter_metrics.cidr_active_leases
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# HELP telemt_rate_limiter_policy_entries Active rate-limit policy entries by scope"
|
|
||||||
);
|
|
||||||
let _ = writeln!(out, "# TYPE telemt_rate_limiter_policy_entries gauge");
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_rate_limiter_policy_entries{{scope=\"user\"}} {}",
|
|
||||||
if core_enabled {
|
|
||||||
limiter_metrics.user_policy_entries
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_rate_limiter_policy_entries{{scope=\"cidr\"}} {}",
|
|
||||||
if core_enabled {
|
|
||||||
limiter_metrics.cidr_policy_entries
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
out,
|
out,
|
||||||
"# HELP telemt_upstream_connect_attempt_total Upstream connect attempts across all requests"
|
"# HELP telemt_upstream_connect_attempt_total Upstream connect attempts across all requests"
|
||||||
|
|||||||
@@ -316,7 +316,6 @@ where
|
|||||||
|
|
||||||
stats.increment_user_connects(user);
|
stats.increment_user_connects(user);
|
||||||
let _direct_connection_lease = stats.acquire_direct_connection_lease();
|
let _direct_connection_lease = stats.acquire_direct_connection_lease();
|
||||||
let traffic_lease = shared.traffic_limiter.acquire_lease(user, success.peer.ip());
|
|
||||||
|
|
||||||
let buffer_pool_trim = Arc::clone(&buffer_pool);
|
let buffer_pool_trim = Arc::clone(&buffer_pool);
|
||||||
let relay_activity_timeout = if shared.conntrack_pressure_active() {
|
let relay_activity_timeout = if shared.conntrack_pressure_active() {
|
||||||
@@ -330,7 +329,7 @@ where
|
|||||||
} else {
|
} else {
|
||||||
Duration::from_secs(1800)
|
Duration::from_secs(1800)
|
||||||
};
|
};
|
||||||
let relay_result = crate::proxy::relay::relay_bidirectional_with_activity_timeout_and_lease(
|
let relay_result = crate::proxy::relay::relay_bidirectional_with_activity_timeout(
|
||||||
client_reader,
|
client_reader,
|
||||||
client_writer,
|
client_writer,
|
||||||
tg_reader,
|
tg_reader,
|
||||||
@@ -341,7 +340,6 @@ where
|
|||||||
Arc::clone(&stats),
|
Arc::clone(&stats),
|
||||||
config.access.user_data_quota.get(user).copied(),
|
config.access.user_data_quota.get(user).copied(),
|
||||||
buffer_pool,
|
buffer_pool,
|
||||||
traffic_lease,
|
|
||||||
relay_activity_timeout,
|
relay_activity_timeout,
|
||||||
);
|
);
|
||||||
tokio::pin!(relay_result);
|
tokio::pin!(relay_result);
|
||||||
|
|||||||
@@ -28,7 +28,6 @@ use crate::proxy::route_mode::{
|
|||||||
use crate::proxy::shared_state::{
|
use crate::proxy::shared_state::{
|
||||||
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
|
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
|
||||||
};
|
};
|
||||||
use crate::proxy::traffic_limiter::{RateDirection, TrafficLease, next_refill_delay};
|
|
||||||
use crate::stats::{
|
use crate::stats::{
|
||||||
MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, QuotaReserveError, Stats, UserStats,
|
MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, QuotaReserveError, Stats, UserStats,
|
||||||
};
|
};
|
||||||
@@ -287,10 +286,6 @@ impl RelayClientIdleState {
|
|||||||
self.last_client_frame_at = now;
|
self.last_client_frame_at = now;
|
||||||
self.soft_idle_marked = false;
|
self.soft_idle_marked = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_client_tiny_frame(&mut self, now: Instant) {
|
|
||||||
self.last_client_frame_at = now;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MeD2cFlushPolicy {
|
impl MeD2cFlushPolicy {
|
||||||
@@ -600,41 +595,6 @@ async fn reserve_user_quota_with_yield(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn wait_for_traffic_budget(
|
|
||||||
lease: Option<&Arc<TrafficLease>>,
|
|
||||||
direction: RateDirection,
|
|
||||||
bytes: u64,
|
|
||||||
) {
|
|
||||||
if bytes == 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let Some(lease) = lease else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut remaining = bytes;
|
|
||||||
while remaining > 0 {
|
|
||||||
let consume = lease.try_consume(direction, remaining);
|
|
||||||
if consume.granted > 0 {
|
|
||||||
remaining = remaining.saturating_sub(consume.granted);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let wait_started_at = Instant::now();
|
|
||||||
tokio::time::sleep(next_refill_delay()).await;
|
|
||||||
let wait_ms = wait_started_at
|
|
||||||
.elapsed()
|
|
||||||
.as_millis()
|
|
||||||
.min(u128::from(u64::MAX)) as u64;
|
|
||||||
lease.observe_wait_ms(
|
|
||||||
direction,
|
|
||||||
consume.blocked_user,
|
|
||||||
consume.blocked_cidr,
|
|
||||||
wait_ms,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn classify_me_d2c_flush_reason(
|
fn classify_me_d2c_flush_reason(
|
||||||
flush_immediately: bool,
|
flush_immediately: bool,
|
||||||
batch_frames: usize,
|
batch_frames: usize,
|
||||||
@@ -1025,7 +985,6 @@ where
|
|||||||
let quota_limit = config.access.user_data_quota.get(&user).copied();
|
let quota_limit = config.access.user_data_quota.get(&user).copied();
|
||||||
let quota_user_stats = quota_limit.map(|_| stats.get_or_create_user_stats_handle(&user));
|
let quota_user_stats = quota_limit.map(|_| stats.get_or_create_user_stats_handle(&user));
|
||||||
let peer = success.peer;
|
let peer = success.peer;
|
||||||
let traffic_lease = shared.traffic_limiter.acquire_lease(&user, peer.ip());
|
|
||||||
let proto_tag = success.proto_tag;
|
let proto_tag = success.proto_tag;
|
||||||
let pool_generation = me_pool.current_generation();
|
let pool_generation = me_pool.current_generation();
|
||||||
|
|
||||||
@@ -1161,7 +1120,6 @@ where
|
|||||||
let rng_clone = rng.clone();
|
let rng_clone = rng.clone();
|
||||||
let user_clone = user.clone();
|
let user_clone = user.clone();
|
||||||
let quota_user_stats_me_writer = quota_user_stats.clone();
|
let quota_user_stats_me_writer = quota_user_stats.clone();
|
||||||
let traffic_lease_me_writer = traffic_lease.clone();
|
|
||||||
let last_downstream_activity_ms_clone = last_downstream_activity_ms.clone();
|
let last_downstream_activity_ms_clone = last_downstream_activity_ms.clone();
|
||||||
let bytes_me2c_clone = bytes_me2c.clone();
|
let bytes_me2c_clone = bytes_me2c.clone();
|
||||||
let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config);
|
let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config);
|
||||||
@@ -1195,7 +1153,7 @@ where
|
|||||||
|
|
||||||
let first_is_downstream_activity =
|
let first_is_downstream_activity =
|
||||||
matches!(&first, MeResponse::Data { .. } | MeResponse::Ack(_));
|
matches!(&first, MeResponse::Data { .. } | MeResponse::Ack(_));
|
||||||
match process_me_writer_response_with_traffic_lease(
|
match process_me_writer_response(
|
||||||
first,
|
first,
|
||||||
&mut writer,
|
&mut writer,
|
||||||
proto_tag,
|
proto_tag,
|
||||||
@@ -1206,7 +1164,6 @@ where
|
|||||||
quota_user_stats_me_writer.as_deref(),
|
quota_user_stats_me_writer.as_deref(),
|
||||||
quota_limit,
|
quota_limit,
|
||||||
d2c_flush_policy.quota_soft_overshoot_bytes,
|
d2c_flush_policy.quota_soft_overshoot_bytes,
|
||||||
traffic_lease_me_writer.as_ref(),
|
|
||||||
bytes_me2c_clone.as_ref(),
|
bytes_me2c_clone.as_ref(),
|
||||||
conn_id,
|
conn_id,
|
||||||
d2c_flush_policy.ack_flush_immediate,
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
@@ -1256,7 +1213,7 @@ where
|
|||||||
|
|
||||||
let next_is_downstream_activity =
|
let next_is_downstream_activity =
|
||||||
matches!(&next, MeResponse::Data { .. } | MeResponse::Ack(_));
|
matches!(&next, MeResponse::Data { .. } | MeResponse::Ack(_));
|
||||||
match process_me_writer_response_with_traffic_lease(
|
match process_me_writer_response(
|
||||||
next,
|
next,
|
||||||
&mut writer,
|
&mut writer,
|
||||||
proto_tag,
|
proto_tag,
|
||||||
@@ -1267,7 +1224,6 @@ where
|
|||||||
quota_user_stats_me_writer.as_deref(),
|
quota_user_stats_me_writer.as_deref(),
|
||||||
quota_limit,
|
quota_limit,
|
||||||
d2c_flush_policy.quota_soft_overshoot_bytes,
|
d2c_flush_policy.quota_soft_overshoot_bytes,
|
||||||
traffic_lease_me_writer.as_ref(),
|
|
||||||
bytes_me2c_clone.as_ref(),
|
bytes_me2c_clone.as_ref(),
|
||||||
conn_id,
|
conn_id,
|
||||||
d2c_flush_policy.ack_flush_immediate,
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
@@ -1320,7 +1276,7 @@ where
|
|||||||
Ok(Some(next)) => {
|
Ok(Some(next)) => {
|
||||||
let next_is_downstream_activity =
|
let next_is_downstream_activity =
|
||||||
matches!(&next, MeResponse::Data { .. } | MeResponse::Ack(_));
|
matches!(&next, MeResponse::Data { .. } | MeResponse::Ack(_));
|
||||||
match process_me_writer_response_with_traffic_lease(
|
match process_me_writer_response(
|
||||||
next,
|
next,
|
||||||
&mut writer,
|
&mut writer,
|
||||||
proto_tag,
|
proto_tag,
|
||||||
@@ -1331,7 +1287,6 @@ where
|
|||||||
quota_user_stats_me_writer.as_deref(),
|
quota_user_stats_me_writer.as_deref(),
|
||||||
quota_limit,
|
quota_limit,
|
||||||
d2c_flush_policy.quota_soft_overshoot_bytes,
|
d2c_flush_policy.quota_soft_overshoot_bytes,
|
||||||
traffic_lease_me_writer.as_ref(),
|
|
||||||
bytes_me2c_clone.as_ref(),
|
bytes_me2c_clone.as_ref(),
|
||||||
conn_id,
|
conn_id,
|
||||||
d2c_flush_policy.ack_flush_immediate,
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
@@ -1386,7 +1341,7 @@ where
|
|||||||
|
|
||||||
let extra_is_downstream_activity =
|
let extra_is_downstream_activity =
|
||||||
matches!(&extra, MeResponse::Data { .. } | MeResponse::Ack(_));
|
matches!(&extra, MeResponse::Data { .. } | MeResponse::Ack(_));
|
||||||
match process_me_writer_response_with_traffic_lease(
|
match process_me_writer_response(
|
||||||
extra,
|
extra,
|
||||||
&mut writer,
|
&mut writer,
|
||||||
proto_tag,
|
proto_tag,
|
||||||
@@ -1397,7 +1352,6 @@ where
|
|||||||
quota_user_stats_me_writer.as_deref(),
|
quota_user_stats_me_writer.as_deref(),
|
||||||
quota_limit,
|
quota_limit,
|
||||||
d2c_flush_policy.quota_soft_overshoot_bytes,
|
d2c_flush_policy.quota_soft_overshoot_bytes,
|
||||||
traffic_lease_me_writer.as_ref(),
|
|
||||||
bytes_me2c_clone.as_ref(),
|
bytes_me2c_clone.as_ref(),
|
||||||
conn_id,
|
conn_id,
|
||||||
d2c_flush_policy.ack_flush_immediate,
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
@@ -1588,12 +1542,6 @@ where
|
|||||||
match payload_result {
|
match payload_result {
|
||||||
Ok(Some((payload, quickack))) => {
|
Ok(Some((payload, quickack))) => {
|
||||||
trace!(conn_id, bytes = payload.len(), "C->ME frame");
|
trace!(conn_id, bytes = payload.len(), "C->ME frame");
|
||||||
wait_for_traffic_budget(
|
|
||||||
traffic_lease.as_ref(),
|
|
||||||
RateDirection::Up,
|
|
||||||
payload.len() as u64,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
forensics.bytes_c2me = forensics
|
forensics.bytes_c2me = forensics
|
||||||
.bytes_c2me
|
.bytes_c2me
|
||||||
.saturating_add(payload.len() as u64);
|
.saturating_add(payload.len() as u64);
|
||||||
@@ -1814,6 +1762,40 @@ where
|
|||||||
let downstream_ms = last_downstream_activity_ms.load(Ordering::Relaxed);
|
let downstream_ms = last_downstream_activity_ms.load(Ordering::Relaxed);
|
||||||
let hard_deadline =
|
let hard_deadline =
|
||||||
hard_deadline(idle_policy, idle_state, session_started_at, downstream_ms);
|
hard_deadline(idle_policy, idle_state, session_started_at, downstream_ms);
|
||||||
|
if now >= hard_deadline {
|
||||||
|
clear_relay_idle_candidate_in(shared, forensics.conn_id);
|
||||||
|
stats.increment_relay_idle_hard_close_total();
|
||||||
|
let client_idle_secs = now
|
||||||
|
.saturating_duration_since(idle_state.last_client_frame_at)
|
||||||
|
.as_secs();
|
||||||
|
let downstream_idle_secs = now
|
||||||
|
.saturating_duration_since(
|
||||||
|
session_started_at + Duration::from_millis(downstream_ms),
|
||||||
|
)
|
||||||
|
.as_secs();
|
||||||
|
warn!(
|
||||||
|
trace_id = format_args!("0x{:016x}", forensics.trace_id),
|
||||||
|
conn_id = forensics.conn_id,
|
||||||
|
user = %forensics.user,
|
||||||
|
read_label,
|
||||||
|
client_idle_secs,
|
||||||
|
downstream_idle_secs,
|
||||||
|
soft_idle_secs = idle_policy.soft_idle.as_secs(),
|
||||||
|
hard_idle_secs = idle_policy.hard_idle.as_secs(),
|
||||||
|
grace_secs = idle_policy.grace_after_downstream_activity.as_secs(),
|
||||||
|
"Middle-relay hard idle close"
|
||||||
|
);
|
||||||
|
return Err(ProxyError::Io(std::io::Error::new(
|
||||||
|
std::io::ErrorKind::TimedOut,
|
||||||
|
format!(
|
||||||
|
"middle-relay hard idle timeout while reading {read_label}: client_idle_secs={client_idle_secs}, downstream_idle_secs={downstream_idle_secs}, soft_idle_secs={}, hard_idle_secs={}, grace_secs={}",
|
||||||
|
idle_policy.soft_idle.as_secs(),
|
||||||
|
idle_policy.hard_idle.as_secs(),
|
||||||
|
idle_policy.grace_after_downstream_activity.as_secs(),
|
||||||
|
),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
if !idle_state.soft_idle_marked
|
if !idle_state.soft_idle_marked
|
||||||
&& now.saturating_duration_since(idle_state.last_client_frame_at)
|
&& now.saturating_duration_since(idle_state.last_client_frame_at)
|
||||||
>= idle_policy.soft_idle
|
>= idle_policy.soft_idle
|
||||||
@@ -1868,45 +1850,7 @@ where
|
|||||||
),
|
),
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {}
|
||||||
let now = Instant::now();
|
|
||||||
let downstream_ms = last_downstream_activity_ms.load(Ordering::Relaxed);
|
|
||||||
let hard_deadline =
|
|
||||||
hard_deadline(idle_policy, idle_state, session_started_at, downstream_ms);
|
|
||||||
if now >= hard_deadline {
|
|
||||||
clear_relay_idle_candidate_in(shared, forensics.conn_id);
|
|
||||||
stats.increment_relay_idle_hard_close_total();
|
|
||||||
let client_idle_secs = now
|
|
||||||
.saturating_duration_since(idle_state.last_client_frame_at)
|
|
||||||
.as_secs();
|
|
||||||
let downstream_idle_secs = now
|
|
||||||
.saturating_duration_since(
|
|
||||||
session_started_at + Duration::from_millis(downstream_ms),
|
|
||||||
)
|
|
||||||
.as_secs();
|
|
||||||
warn!(
|
|
||||||
trace_id = format_args!("0x{:016x}", forensics.trace_id),
|
|
||||||
conn_id = forensics.conn_id,
|
|
||||||
user = %forensics.user,
|
|
||||||
read_label,
|
|
||||||
client_idle_secs,
|
|
||||||
downstream_idle_secs,
|
|
||||||
soft_idle_secs = idle_policy.soft_idle.as_secs(),
|
|
||||||
hard_idle_secs = idle_policy.hard_idle.as_secs(),
|
|
||||||
grace_secs = idle_policy.grace_after_downstream_activity.as_secs(),
|
|
||||||
"Middle-relay hard idle close"
|
|
||||||
);
|
|
||||||
return Err(ProxyError::Io(std::io::Error::new(
|
|
||||||
std::io::ErrorKind::TimedOut,
|
|
||||||
format!(
|
|
||||||
"middle-relay hard idle timeout while reading {read_label}: client_idle_secs={client_idle_secs}, downstream_idle_secs={downstream_idle_secs}, soft_idle_secs={}, hard_idle_secs={}, grace_secs={}",
|
|
||||||
idle_policy.soft_idle.as_secs(),
|
|
||||||
idle_policy.hard_idle.as_secs(),
|
|
||||||
idle_policy.grace_after_downstream_activity.as_secs(),
|
|
||||||
),
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1997,7 +1941,6 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
if len == 0 {
|
if len == 0 {
|
||||||
idle_state.on_client_tiny_frame(Instant::now());
|
|
||||||
idle_state.tiny_frame_debt = idle_state
|
idle_state.tiny_frame_debt = idle_state
|
||||||
.tiny_frame_debt
|
.tiny_frame_debt
|
||||||
.saturating_add(TINY_FRAME_DEBT_PER_TINY);
|
.saturating_add(TINY_FRAME_DEBT_PER_TINY);
|
||||||
@@ -2217,46 +2160,6 @@ async fn process_me_writer_response<W>(
|
|||||||
ack_flush_immediate: bool,
|
ack_flush_immediate: bool,
|
||||||
batched: bool,
|
batched: bool,
|
||||||
) -> Result<MeWriterResponseOutcome>
|
) -> Result<MeWriterResponseOutcome>
|
||||||
where
|
|
||||||
W: AsyncWrite + Unpin + Send + 'static,
|
|
||||||
{
|
|
||||||
process_me_writer_response_with_traffic_lease(
|
|
||||||
response,
|
|
||||||
client_writer,
|
|
||||||
proto_tag,
|
|
||||||
rng,
|
|
||||||
frame_buf,
|
|
||||||
stats,
|
|
||||||
user,
|
|
||||||
quota_user_stats,
|
|
||||||
quota_limit,
|
|
||||||
quota_soft_overshoot_bytes,
|
|
||||||
None,
|
|
||||||
bytes_me2c,
|
|
||||||
conn_id,
|
|
||||||
ack_flush_immediate,
|
|
||||||
batched,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn process_me_writer_response_with_traffic_lease<W>(
|
|
||||||
response: MeResponse,
|
|
||||||
client_writer: &mut CryptoWriter<W>,
|
|
||||||
proto_tag: ProtoTag,
|
|
||||||
rng: &SecureRandom,
|
|
||||||
frame_buf: &mut Vec<u8>,
|
|
||||||
stats: &Stats,
|
|
||||||
user: &str,
|
|
||||||
quota_user_stats: Option<&UserStats>,
|
|
||||||
quota_limit: Option<u64>,
|
|
||||||
quota_soft_overshoot_bytes: u64,
|
|
||||||
traffic_lease: Option<&Arc<TrafficLease>>,
|
|
||||||
bytes_me2c: &AtomicU64,
|
|
||||||
conn_id: u64,
|
|
||||||
ack_flush_immediate: bool,
|
|
||||||
batched: bool,
|
|
||||||
) -> Result<MeWriterResponseOutcome>
|
|
||||||
where
|
where
|
||||||
W: AsyncWrite + Unpin + Send + 'static,
|
W: AsyncWrite + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
@@ -2280,7 +2183,6 @@ where
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
wait_for_traffic_budget(traffic_lease, RateDirection::Down, data_len).await;
|
|
||||||
|
|
||||||
let write_mode =
|
let write_mode =
|
||||||
match write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
|
match write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
|
||||||
@@ -2318,7 +2220,6 @@ where
|
|||||||
} else {
|
} else {
|
||||||
trace!(conn_id, confirm, "ME->C quickack");
|
trace!(conn_id, confirm, "ME->C quickack");
|
||||||
}
|
}
|
||||||
wait_for_traffic_budget(traffic_lease, RateDirection::Down, 4).await;
|
|
||||||
write_client_ack(client_writer, proto_tag, confirm).await?;
|
write_client_ack(client_writer, proto_tag, confirm).await?;
|
||||||
stats.increment_me_d2c_ack_frames_total();
|
stats.increment_me_d2c_ack_frames_total();
|
||||||
|
|
||||||
|
|||||||
@@ -68,7 +68,6 @@ pub mod relay;
|
|||||||
pub mod route_mode;
|
pub mod route_mode;
|
||||||
pub mod session_eviction;
|
pub mod session_eviction;
|
||||||
pub mod shared_state;
|
pub mod shared_state;
|
||||||
pub mod traffic_limiter;
|
|
||||||
|
|
||||||
pub use client::ClientHandler;
|
pub use client::ClientHandler;
|
||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
|
|||||||
@@ -52,7 +52,6 @@
|
|||||||
//! - `SharedCounters` (atomics) let the watchdog read stats without locking
|
//! - `SharedCounters` (atomics) let the watchdog read stats without locking
|
||||||
|
|
||||||
use crate::error::{ProxyError, Result};
|
use crate::error::{ProxyError, Result};
|
||||||
use crate::proxy::traffic_limiter::{RateDirection, TrafficLease, next_refill_delay};
|
|
||||||
use crate::stats::{Stats, UserStats};
|
use crate::stats::{Stats, UserStats};
|
||||||
use crate::stream::BufferPool;
|
use crate::stream::BufferPool;
|
||||||
use std::io;
|
use std::io;
|
||||||
@@ -62,7 +61,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
|||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf, copy_bidirectional_with_sizes};
|
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf, copy_bidirectional_with_sizes};
|
||||||
use tokio::time::{Instant, Sleep};
|
use tokio::time::Instant;
|
||||||
use tracing::{debug, trace, warn};
|
use tracing::{debug, trace, warn};
|
||||||
|
|
||||||
// ============= Constants =============
|
// ============= Constants =============
|
||||||
@@ -211,24 +210,12 @@ struct StatsIo<S> {
|
|||||||
stats: Arc<Stats>,
|
stats: Arc<Stats>,
|
||||||
user: String,
|
user: String,
|
||||||
user_stats: Arc<UserStats>,
|
user_stats: Arc<UserStats>,
|
||||||
traffic_lease: Option<Arc<TrafficLease>>,
|
|
||||||
c2s_rate_debt_bytes: u64,
|
|
||||||
c2s_wait: RateWaitState,
|
|
||||||
s2c_wait: RateWaitState,
|
|
||||||
quota_limit: Option<u64>,
|
quota_limit: Option<u64>,
|
||||||
quota_exceeded: Arc<AtomicBool>,
|
quota_exceeded: Arc<AtomicBool>,
|
||||||
quota_bytes_since_check: u64,
|
quota_bytes_since_check: u64,
|
||||||
epoch: Instant,
|
epoch: Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct RateWaitState {
|
|
||||||
sleep: Option<Pin<Box<Sleep>>>,
|
|
||||||
started_at: Option<Instant>,
|
|
||||||
blocked_user: bool,
|
|
||||||
blocked_cidr: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> StatsIo<S> {
|
impl<S> StatsIo<S> {
|
||||||
fn new(
|
fn new(
|
||||||
inner: S,
|
inner: S,
|
||||||
@@ -238,28 +225,6 @@ impl<S> StatsIo<S> {
|
|||||||
quota_limit: Option<u64>,
|
quota_limit: Option<u64>,
|
||||||
quota_exceeded: Arc<AtomicBool>,
|
quota_exceeded: Arc<AtomicBool>,
|
||||||
epoch: Instant,
|
epoch: Instant,
|
||||||
) -> Self {
|
|
||||||
Self::new_with_traffic_lease(
|
|
||||||
inner,
|
|
||||||
counters,
|
|
||||||
stats,
|
|
||||||
user,
|
|
||||||
None,
|
|
||||||
quota_limit,
|
|
||||||
quota_exceeded,
|
|
||||||
epoch,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_with_traffic_lease(
|
|
||||||
inner: S,
|
|
||||||
counters: Arc<SharedCounters>,
|
|
||||||
stats: Arc<Stats>,
|
|
||||||
user: String,
|
|
||||||
traffic_lease: Option<Arc<TrafficLease>>,
|
|
||||||
quota_limit: Option<u64>,
|
|
||||||
quota_exceeded: Arc<AtomicBool>,
|
|
||||||
epoch: Instant,
|
|
||||||
) -> Self {
|
) -> Self {
|
||||||
// Mark initial activity so the watchdog doesn't fire before data flows
|
// Mark initial activity so the watchdog doesn't fire before data flows
|
||||||
counters.touch(Instant::now(), epoch);
|
counters.touch(Instant::now(), epoch);
|
||||||
@@ -270,97 +235,12 @@ impl<S> StatsIo<S> {
|
|||||||
stats,
|
stats,
|
||||||
user,
|
user,
|
||||||
user_stats,
|
user_stats,
|
||||||
traffic_lease,
|
|
||||||
c2s_rate_debt_bytes: 0,
|
|
||||||
c2s_wait: RateWaitState::default(),
|
|
||||||
s2c_wait: RateWaitState::default(),
|
|
||||||
quota_limit,
|
quota_limit,
|
||||||
quota_exceeded,
|
quota_exceeded,
|
||||||
quota_bytes_since_check: 0,
|
quota_bytes_since_check: 0,
|
||||||
epoch,
|
epoch,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn record_wait(
|
|
||||||
wait: &mut RateWaitState,
|
|
||||||
lease: Option<&Arc<TrafficLease>>,
|
|
||||||
direction: RateDirection,
|
|
||||||
) {
|
|
||||||
let Some(started_at) = wait.started_at.take() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
let wait_ms = started_at
|
|
||||||
.elapsed()
|
|
||||||
.as_millis()
|
|
||||||
.min(u128::from(u64::MAX)) as u64;
|
|
||||||
if let Some(lease) = lease {
|
|
||||||
lease.observe_wait_ms(
|
|
||||||
direction,
|
|
||||||
wait.blocked_user,
|
|
||||||
wait.blocked_cidr,
|
|
||||||
wait_ms,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
wait.blocked_user = false;
|
|
||||||
wait.blocked_cidr = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn arm_wait(wait: &mut RateWaitState, blocked_user: bool, blocked_cidr: bool) {
|
|
||||||
if wait.sleep.is_none() {
|
|
||||||
wait.sleep = Some(Box::pin(tokio::time::sleep(next_refill_delay())));
|
|
||||||
wait.started_at = Some(Instant::now());
|
|
||||||
}
|
|
||||||
wait.blocked_user |= blocked_user;
|
|
||||||
wait.blocked_cidr |= blocked_cidr;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_wait(
|
|
||||||
wait: &mut RateWaitState,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
lease: Option<&Arc<TrafficLease>>,
|
|
||||||
direction: RateDirection,
|
|
||||||
) -> Poll<()> {
|
|
||||||
let Some(sleep) = wait.sleep.as_mut() else {
|
|
||||||
return Poll::Ready(());
|
|
||||||
};
|
|
||||||
if sleep.as_mut().poll(cx).is_pending() {
|
|
||||||
return Poll::Pending;
|
|
||||||
}
|
|
||||||
wait.sleep = None;
|
|
||||||
Self::record_wait(wait, lease, direction);
|
|
||||||
Poll::Ready(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn settle_c2s_rate_debt(&mut self, cx: &mut Context<'_>) -> Poll<()> {
|
|
||||||
let Some(lease) = self.traffic_lease.as_ref() else {
|
|
||||||
self.c2s_rate_debt_bytes = 0;
|
|
||||||
return Poll::Ready(());
|
|
||||||
};
|
|
||||||
|
|
||||||
while self.c2s_rate_debt_bytes > 0 {
|
|
||||||
let consume = lease.try_consume(RateDirection::Up, self.c2s_rate_debt_bytes);
|
|
||||||
if consume.granted > 0 {
|
|
||||||
self.c2s_rate_debt_bytes =
|
|
||||||
self.c2s_rate_debt_bytes.saturating_sub(consume.granted);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Self::arm_wait(
|
|
||||||
&mut self.c2s_wait,
|
|
||||||
consume.blocked_user,
|
|
||||||
consume.blocked_cidr,
|
|
||||||
);
|
|
||||||
if Self::poll_wait(&mut self.c2s_wait, cx, Some(lease), RateDirection::Up).is_pending()
|
|
||||||
{
|
|
||||||
return Poll::Pending;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if Self::poll_wait(&mut self.c2s_wait, cx, Some(lease), RateDirection::Up).is_pending() {
|
|
||||||
return Poll::Pending;
|
|
||||||
}
|
|
||||||
|
|
||||||
Poll::Ready(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -406,25 +286,6 @@ fn should_immediate_quota_check(remaining_before: u64, charge_bytes: u64) -> boo
|
|||||||
remaining_before <= QUOTA_NEAR_LIMIT_BYTES || charge_bytes >= QUOTA_LARGE_CHARGE_BYTES
|
remaining_before <= QUOTA_NEAR_LIMIT_BYTES || charge_bytes >= QUOTA_LARGE_CHARGE_BYTES
|
||||||
}
|
}
|
||||||
|
|
||||||
fn refund_reserved_quota_bytes(user_stats: &UserStats, reserved_bytes: u64) {
|
|
||||||
if reserved_bytes == 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let mut current = user_stats.quota_used.load(Ordering::Relaxed);
|
|
||||||
loop {
|
|
||||||
let next = current.saturating_sub(reserved_bytes);
|
|
||||||
match user_stats.quota_used.compare_exchange_weak(
|
|
||||||
current,
|
|
||||||
next,
|
|
||||||
Ordering::Relaxed,
|
|
||||||
Ordering::Relaxed,
|
|
||||||
) {
|
|
||||||
Ok(_) => return,
|
|
||||||
Err(observed) => current = observed,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
|
impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
|
||||||
fn poll_read(
|
fn poll_read(
|
||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
@@ -435,9 +296,6 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
|
|||||||
if this.quota_exceeded.load(Ordering::Acquire) {
|
if this.quota_exceeded.load(Ordering::Acquire) {
|
||||||
return Poll::Ready(Err(quota_io_error()));
|
return Poll::Ready(Err(quota_io_error()));
|
||||||
}
|
}
|
||||||
if this.settle_c2s_rate_debt(cx).is_pending() {
|
|
||||||
return Poll::Pending;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut remaining_before = None;
|
let mut remaining_before = None;
|
||||||
if let Some(limit) = this.quota_limit {
|
if let Some(limit) = this.quota_limit {
|
||||||
@@ -519,11 +377,6 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
|
|||||||
.add_user_octets_from_handle(this.user_stats.as_ref(), n_to_charge);
|
.add_user_octets_from_handle(this.user_stats.as_ref(), n_to_charge);
|
||||||
this.stats
|
this.stats
|
||||||
.increment_user_msgs_from_handle(this.user_stats.as_ref());
|
.increment_user_msgs_from_handle(this.user_stats.as_ref());
|
||||||
if this.traffic_lease.is_some() {
|
|
||||||
this.c2s_rate_debt_bytes =
|
|
||||||
this.c2s_rate_debt_bytes.saturating_add(n_to_charge);
|
|
||||||
let _ = this.settle_c2s_rate_debt(cx);
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!(user = %this.user, bytes = n, "C->S");
|
trace!(user = %this.user, bytes = n, "C->S");
|
||||||
}
|
}
|
||||||
@@ -545,66 +398,28 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
|
|||||||
return Poll::Ready(Err(quota_io_error()));
|
return Poll::Ready(Err(quota_io_error()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut shaper_reserved_bytes = 0u64;
|
|
||||||
let mut write_buf = buf;
|
|
||||||
if let Some(lease) = this.traffic_lease.as_ref() {
|
|
||||||
if !buf.is_empty() {
|
|
||||||
loop {
|
|
||||||
let consume = lease.try_consume(RateDirection::Down, buf.len() as u64);
|
|
||||||
if consume.granted > 0 {
|
|
||||||
shaper_reserved_bytes = consume.granted;
|
|
||||||
if consume.granted < buf.len() as u64 {
|
|
||||||
write_buf = &buf[..consume.granted as usize];
|
|
||||||
}
|
|
||||||
let _ = Self::poll_wait(
|
|
||||||
&mut this.s2c_wait,
|
|
||||||
cx,
|
|
||||||
Some(lease),
|
|
||||||
RateDirection::Down,
|
|
||||||
);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
Self::arm_wait(
|
|
||||||
&mut this.s2c_wait,
|
|
||||||
consume.blocked_user,
|
|
||||||
consume.blocked_cidr,
|
|
||||||
);
|
|
||||||
if Self::poll_wait(&mut this.s2c_wait, cx, Some(lease), RateDirection::Down)
|
|
||||||
.is_pending()
|
|
||||||
{
|
|
||||||
return Poll::Pending;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let _ = Self::poll_wait(&mut this.s2c_wait, cx, Some(lease), RateDirection::Down);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut remaining_before = None;
|
let mut remaining_before = None;
|
||||||
let mut reserved_bytes = 0u64;
|
let mut reserved_bytes = 0u64;
|
||||||
|
let mut write_buf = buf;
|
||||||
if let Some(limit) = this.quota_limit {
|
if let Some(limit) = this.quota_limit {
|
||||||
if !write_buf.is_empty() {
|
if !buf.is_empty() {
|
||||||
let mut reserve_rounds = 0usize;
|
let mut reserve_rounds = 0usize;
|
||||||
while reserved_bytes == 0 {
|
while reserved_bytes == 0 {
|
||||||
let used_before = this.user_stats.quota_used();
|
let used_before = this.user_stats.quota_used();
|
||||||
let remaining = limit.saturating_sub(used_before);
|
let remaining = limit.saturating_sub(used_before);
|
||||||
if remaining == 0 {
|
if remaining == 0 {
|
||||||
if let Some(lease) = this.traffic_lease.as_ref() {
|
|
||||||
lease.refund(RateDirection::Down, shaper_reserved_bytes);
|
|
||||||
}
|
|
||||||
this.quota_exceeded.store(true, Ordering::Release);
|
this.quota_exceeded.store(true, Ordering::Release);
|
||||||
return Poll::Ready(Err(quota_io_error()));
|
return Poll::Ready(Err(quota_io_error()));
|
||||||
}
|
}
|
||||||
remaining_before = Some(remaining);
|
remaining_before = Some(remaining);
|
||||||
|
|
||||||
let desired = remaining.min(write_buf.len() as u64);
|
let desired = remaining.min(buf.len() as u64);
|
||||||
let mut saw_contention = false;
|
let mut saw_contention = false;
|
||||||
for _ in 0..QUOTA_RESERVE_SPIN_RETRIES {
|
for _ in 0..QUOTA_RESERVE_SPIN_RETRIES {
|
||||||
match this.user_stats.quota_try_reserve(desired, limit) {
|
match this.user_stats.quota_try_reserve(desired, limit) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
reserved_bytes = desired;
|
reserved_bytes = desired;
|
||||||
write_buf = &write_buf[..desired as usize];
|
write_buf = &buf[..desired as usize];
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(crate::stats::QuotaReserveError::LimitExceeded) => {
|
Err(crate::stats::QuotaReserveError::LimitExceeded) => {
|
||||||
@@ -619,9 +434,6 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
|
|||||||
if reserved_bytes == 0 {
|
if reserved_bytes == 0 {
|
||||||
reserve_rounds = reserve_rounds.saturating_add(1);
|
reserve_rounds = reserve_rounds.saturating_add(1);
|
||||||
if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS {
|
if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS {
|
||||||
if let Some(lease) = this.traffic_lease.as_ref() {
|
|
||||||
lease.refund(RateDirection::Down, shaper_reserved_bytes);
|
|
||||||
}
|
|
||||||
this.quota_exceeded.store(true, Ordering::Release);
|
this.quota_exceeded.store(true, Ordering::Release);
|
||||||
return Poll::Ready(Err(quota_io_error()));
|
return Poll::Ready(Err(quota_io_error()));
|
||||||
}
|
}
|
||||||
@@ -634,9 +446,6 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
|
|||||||
let used_before = this.user_stats.quota_used();
|
let used_before = this.user_stats.quota_used();
|
||||||
let remaining = limit.saturating_sub(used_before);
|
let remaining = limit.saturating_sub(used_before);
|
||||||
if remaining == 0 {
|
if remaining == 0 {
|
||||||
if let Some(lease) = this.traffic_lease.as_ref() {
|
|
||||||
lease.refund(RateDirection::Down, shaper_reserved_bytes);
|
|
||||||
}
|
|
||||||
this.quota_exceeded.store(true, Ordering::Release);
|
this.quota_exceeded.store(true, Ordering::Release);
|
||||||
return Poll::Ready(Err(quota_io_error()));
|
return Poll::Ready(Err(quota_io_error()));
|
||||||
}
|
}
|
||||||
@@ -647,17 +456,23 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
|
|||||||
match Pin::new(&mut this.inner).poll_write(cx, write_buf) {
|
match Pin::new(&mut this.inner).poll_write(cx, write_buf) {
|
||||||
Poll::Ready(Ok(n)) => {
|
Poll::Ready(Ok(n)) => {
|
||||||
if reserved_bytes > n as u64 {
|
if reserved_bytes > n as u64 {
|
||||||
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_bytes - n as u64);
|
let refund = reserved_bytes - n as u64;
|
||||||
}
|
let mut current = this.user_stats.quota_used.load(Ordering::Relaxed);
|
||||||
if shaper_reserved_bytes > n as u64
|
loop {
|
||||||
&& let Some(lease) = this.traffic_lease.as_ref()
|
let next = current.saturating_sub(refund);
|
||||||
{
|
match this.user_stats.quota_used.compare_exchange_weak(
|
||||||
lease.refund(RateDirection::Down, shaper_reserved_bytes - n as u64);
|
current,
|
||||||
}
|
next,
|
||||||
if n > 0 {
|
Ordering::Relaxed,
|
||||||
if let Some(lease) = this.traffic_lease.as_ref() {
|
Ordering::Relaxed,
|
||||||
Self::record_wait(&mut this.s2c_wait, Some(lease), RateDirection::Down);
|
) {
|
||||||
|
Ok(_) => break,
|
||||||
|
Err(observed) => current = observed,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if n > 0 {
|
||||||
let n_to_charge = n as u64;
|
let n_to_charge = n as u64;
|
||||||
|
|
||||||
// S→C: data written to client
|
// S→C: data written to client
|
||||||
@@ -697,23 +512,37 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
|
|||||||
}
|
}
|
||||||
Poll::Ready(Err(err)) => {
|
Poll::Ready(Err(err)) => {
|
||||||
if reserved_bytes > 0 {
|
if reserved_bytes > 0 {
|
||||||
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_bytes);
|
let mut current = this.user_stats.quota_used.load(Ordering::Relaxed);
|
||||||
}
|
loop {
|
||||||
if shaper_reserved_bytes > 0
|
let next = current.saturating_sub(reserved_bytes);
|
||||||
&& let Some(lease) = this.traffic_lease.as_ref()
|
match this.user_stats.quota_used.compare_exchange_weak(
|
||||||
{
|
current,
|
||||||
lease.refund(RateDirection::Down, shaper_reserved_bytes);
|
next,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
) {
|
||||||
|
Ok(_) => break,
|
||||||
|
Err(observed) => current = observed,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Poll::Ready(Err(err))
|
Poll::Ready(Err(err))
|
||||||
}
|
}
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
if reserved_bytes > 0 {
|
if reserved_bytes > 0 {
|
||||||
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_bytes);
|
let mut current = this.user_stats.quota_used.load(Ordering::Relaxed);
|
||||||
}
|
loop {
|
||||||
if shaper_reserved_bytes > 0
|
let next = current.saturating_sub(reserved_bytes);
|
||||||
&& let Some(lease) = this.traffic_lease.as_ref()
|
match this.user_stats.quota_used.compare_exchange_weak(
|
||||||
{
|
current,
|
||||||
lease.refund(RateDirection::Down, shaper_reserved_bytes);
|
next,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
) {
|
||||||
|
Ok(_) => break,
|
||||||
|
Err(observed) => current = observed,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
@@ -798,43 +627,6 @@ pub async fn relay_bidirectional_with_activity_timeout<CR, CW, SR, SW>(
|
|||||||
_buffer_pool: Arc<BufferPool>,
|
_buffer_pool: Arc<BufferPool>,
|
||||||
activity_timeout: Duration,
|
activity_timeout: Duration,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
|
||||||
CR: AsyncRead + Unpin + Send + 'static,
|
|
||||||
CW: AsyncWrite + Unpin + Send + 'static,
|
|
||||||
SR: AsyncRead + Unpin + Send + 'static,
|
|
||||||
SW: AsyncWrite + Unpin + Send + 'static,
|
|
||||||
{
|
|
||||||
relay_bidirectional_with_activity_timeout_and_lease(
|
|
||||||
client_reader,
|
|
||||||
client_writer,
|
|
||||||
server_reader,
|
|
||||||
server_writer,
|
|
||||||
c2s_buf_size,
|
|
||||||
s2c_buf_size,
|
|
||||||
user,
|
|
||||||
stats,
|
|
||||||
quota_limit,
|
|
||||||
_buffer_pool,
|
|
||||||
None,
|
|
||||||
activity_timeout,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn relay_bidirectional_with_activity_timeout_and_lease<CR, CW, SR, SW>(
|
|
||||||
client_reader: CR,
|
|
||||||
client_writer: CW,
|
|
||||||
server_reader: SR,
|
|
||||||
server_writer: SW,
|
|
||||||
c2s_buf_size: usize,
|
|
||||||
s2c_buf_size: usize,
|
|
||||||
user: &str,
|
|
||||||
stats: Arc<Stats>,
|
|
||||||
quota_limit: Option<u64>,
|
|
||||||
_buffer_pool: Arc<BufferPool>,
|
|
||||||
traffic_lease: Option<Arc<TrafficLease>>,
|
|
||||||
activity_timeout: Duration,
|
|
||||||
) -> Result<()>
|
|
||||||
where
|
where
|
||||||
CR: AsyncRead + Unpin + Send + 'static,
|
CR: AsyncRead + Unpin + Send + 'static,
|
||||||
CW: AsyncWrite + Unpin + Send + 'static,
|
CW: AsyncWrite + Unpin + Send + 'static,
|
||||||
@@ -852,12 +644,11 @@ where
|
|||||||
let mut server = CombinedStream::new(server_reader, server_writer);
|
let mut server = CombinedStream::new(server_reader, server_writer);
|
||||||
|
|
||||||
// Wrap client with stats/activity tracking
|
// Wrap client with stats/activity tracking
|
||||||
let mut client = StatsIo::new_with_traffic_lease(
|
let mut client = StatsIo::new(
|
||||||
client_combined,
|
client_combined,
|
||||||
Arc::clone(&counters),
|
Arc::clone(&counters),
|
||||||
Arc::clone(&stats),
|
Arc::clone(&stats),
|
||||||
user_owned.clone(),
|
user_owned.clone(),
|
||||||
traffic_lease,
|
|
||||||
quota_limit,
|
quota_limit,
|
||||||
Arc::clone("a_exceeded),
|
Arc::clone("a_exceeded),
|
||||||
epoch,
|
epoch,
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ use tokio::sync::mpsc;
|
|||||||
|
|
||||||
use crate::proxy::handshake::{AuthProbeSaturationState, AuthProbeState};
|
use crate::proxy::handshake::{AuthProbeSaturationState, AuthProbeState};
|
||||||
use crate::proxy::middle_relay::{DesyncDedupRotationState, RelayIdleCandidateRegistry};
|
use crate::proxy::middle_relay::{DesyncDedupRotationState, RelayIdleCandidateRegistry};
|
||||||
use crate::proxy::traffic_limiter::TrafficLimiter;
|
|
||||||
|
|
||||||
const HANDSHAKE_RECENT_USER_RING_LEN: usize = 64;
|
const HANDSHAKE_RECENT_USER_RING_LEN: usize = 64;
|
||||||
|
|
||||||
@@ -66,7 +65,6 @@ pub(crate) struct MiddleRelaySharedState {
|
|||||||
pub(crate) struct ProxySharedState {
|
pub(crate) struct ProxySharedState {
|
||||||
pub(crate) handshake: HandshakeSharedState,
|
pub(crate) handshake: HandshakeSharedState,
|
||||||
pub(crate) middle_relay: MiddleRelaySharedState,
|
pub(crate) middle_relay: MiddleRelaySharedState,
|
||||||
pub(crate) traffic_limiter: Arc<TrafficLimiter>,
|
|
||||||
pub(crate) conntrack_pressure_active: AtomicBool,
|
pub(crate) conntrack_pressure_active: AtomicBool,
|
||||||
pub(crate) conntrack_close_tx: Mutex<Option<mpsc::Sender<ConntrackCloseEvent>>>,
|
pub(crate) conntrack_close_tx: Mutex<Option<mpsc::Sender<ConntrackCloseEvent>>>,
|
||||||
}
|
}
|
||||||
@@ -100,7 +98,6 @@ impl ProxySharedState {
|
|||||||
relay_idle_registry: Mutex::new(RelayIdleCandidateRegistry::default()),
|
relay_idle_registry: Mutex::new(RelayIdleCandidateRegistry::default()),
|
||||||
relay_idle_mark_seq: AtomicU64::new(0),
|
relay_idle_mark_seq: AtomicU64::new(0),
|
||||||
},
|
},
|
||||||
traffic_limiter: TrafficLimiter::new(),
|
|
||||||
conntrack_pressure_active: AtomicBool::new(false),
|
conntrack_pressure_active: AtomicBool::new(false),
|
||||||
conntrack_close_tx: Mutex::new(None),
|
conntrack_close_tx: Mutex::new(None),
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -1,847 +0,0 @@
|
|||||||
use std::collections::{HashMap, HashSet};
|
|
||||||
use std::hash::{Hash, Hasher};
|
|
||||||
use std::net::IpAddr;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::sync::OnceLock;
|
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
|
||||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
|
||||||
|
|
||||||
use arc_swap::ArcSwap;
|
|
||||||
use dashmap::DashMap;
|
|
||||||
use ipnetwork::IpNetwork;
|
|
||||||
|
|
||||||
use crate::config::RateLimitBps;
|
|
||||||
|
|
||||||
const REGISTRY_SHARDS: usize = 64;
|
|
||||||
const FAIR_EPOCH_MS: u64 = 20;
|
|
||||||
const MAX_BORROW_CHUNK_BYTES: u64 = 32 * 1024;
|
|
||||||
const CLEANUP_INTERVAL_SECS: u64 = 60;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
||||||
pub enum RateDirection {
|
|
||||||
Up,
|
|
||||||
Down,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
||||||
pub struct TrafficConsumeResult {
|
|
||||||
pub granted: u64,
|
|
||||||
pub blocked_user: bool,
|
|
||||||
pub blocked_cidr: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
|
||||||
pub struct TrafficLimiterMetricsSnapshot {
|
|
||||||
pub user_throttle_up_total: u64,
|
|
||||||
pub user_throttle_down_total: u64,
|
|
||||||
pub cidr_throttle_up_total: u64,
|
|
||||||
pub cidr_throttle_down_total: u64,
|
|
||||||
pub user_wait_up_ms_total: u64,
|
|
||||||
pub user_wait_down_ms_total: u64,
|
|
||||||
pub cidr_wait_up_ms_total: u64,
|
|
||||||
pub cidr_wait_down_ms_total: u64,
|
|
||||||
pub user_active_leases: u64,
|
|
||||||
pub cidr_active_leases: u64,
|
|
||||||
pub user_policy_entries: u64,
|
|
||||||
pub cidr_policy_entries: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct ScopeMetrics {
|
|
||||||
throttle_up_total: AtomicU64,
|
|
||||||
throttle_down_total: AtomicU64,
|
|
||||||
wait_up_ms_total: AtomicU64,
|
|
||||||
wait_down_ms_total: AtomicU64,
|
|
||||||
active_leases: AtomicU64,
|
|
||||||
policy_entries: AtomicU64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ScopeMetrics {
|
|
||||||
fn throttle(&self, direction: RateDirection) {
|
|
||||||
match direction {
|
|
||||||
RateDirection::Up => {
|
|
||||||
self.throttle_up_total.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
RateDirection::Down => {
|
|
||||||
self.throttle_down_total.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn wait_ms(&self, direction: RateDirection, wait_ms: u64) {
|
|
||||||
match direction {
|
|
||||||
RateDirection::Up => {
|
|
||||||
self.wait_up_ms_total.fetch_add(wait_ms, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
RateDirection::Down => {
|
|
||||||
self.wait_down_ms_total.fetch_add(wait_ms, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct AtomicRatePair {
|
|
||||||
up_bps: AtomicU64,
|
|
||||||
down_bps: AtomicU64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AtomicRatePair {
|
|
||||||
fn set(&self, limits: RateLimitBps) {
|
|
||||||
self.up_bps.store(limits.up_bps, Ordering::Relaxed);
|
|
||||||
self.down_bps.store(limits.down_bps, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get(&self, direction: RateDirection) -> u64 {
|
|
||||||
match direction {
|
|
||||||
RateDirection::Up => self.up_bps.load(Ordering::Relaxed),
|
|
||||||
RateDirection::Down => self.down_bps.load(Ordering::Relaxed),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct DirectionBucket {
|
|
||||||
epoch: AtomicU64,
|
|
||||||
used: AtomicU64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DirectionBucket {
|
|
||||||
fn sync_epoch(&self, epoch: u64) {
|
|
||||||
let current = self.epoch.load(Ordering::Relaxed);
|
|
||||||
if current == epoch {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if current < epoch
|
|
||||||
&& self
|
|
||||||
.epoch
|
|
||||||
.compare_exchange(current, epoch, Ordering::Relaxed, Ordering::Relaxed)
|
|
||||||
.is_ok()
|
|
||||||
{
|
|
||||||
self.used.store(0, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_consume(&self, cap_bps: u64, requested: u64) -> u64 {
|
|
||||||
if requested == 0 {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
if cap_bps == 0 {
|
|
||||||
return requested;
|
|
||||||
}
|
|
||||||
|
|
||||||
let epoch = current_epoch();
|
|
||||||
self.sync_epoch(epoch);
|
|
||||||
let cap_epoch = bytes_per_epoch(cap_bps);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let used = self.used.load(Ordering::Relaxed);
|
|
||||||
if used >= cap_epoch {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
let remaining = cap_epoch.saturating_sub(used);
|
|
||||||
let grant = requested.min(remaining);
|
|
||||||
if grant == 0 {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
let next = used.saturating_add(grant);
|
|
||||||
if self
|
|
||||||
.used
|
|
||||||
.compare_exchange_weak(used, next, Ordering::Relaxed, Ordering::Relaxed)
|
|
||||||
.is_ok()
|
|
||||||
{
|
|
||||||
return grant;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn refund(&self, bytes: u64) {
|
|
||||||
if bytes == 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
decrement_atomic_saturating(&self.used, bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct UserBucket {
|
|
||||||
rates: AtomicRatePair,
|
|
||||||
up: DirectionBucket,
|
|
||||||
down: DirectionBucket,
|
|
||||||
active_leases: AtomicU64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl UserBucket {
|
|
||||||
fn new(limits: RateLimitBps) -> Self {
|
|
||||||
let rates = AtomicRatePair::default();
|
|
||||||
rates.set(limits);
|
|
||||||
Self {
|
|
||||||
rates,
|
|
||||||
up: DirectionBucket::default(),
|
|
||||||
down: DirectionBucket::default(),
|
|
||||||
active_leases: AtomicU64::new(0),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_rates(&self, limits: RateLimitBps) {
|
|
||||||
self.rates.set(limits);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_consume(&self, direction: RateDirection, requested: u64) -> u64 {
|
|
||||||
let cap_bps = self.rates.get(direction);
|
|
||||||
match direction {
|
|
||||||
RateDirection::Up => self.up.try_consume(cap_bps, requested),
|
|
||||||
RateDirection::Down => self.down.try_consume(cap_bps, requested),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn refund(&self, direction: RateDirection, bytes: u64) {
|
|
||||||
match direction {
|
|
||||||
RateDirection::Up => self.up.refund(bytes),
|
|
||||||
RateDirection::Down => self.down.refund(bytes),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct CidrDirectionBucket {
|
|
||||||
epoch: AtomicU64,
|
|
||||||
used: AtomicU64,
|
|
||||||
active_users: AtomicU64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CidrDirectionBucket {
|
|
||||||
fn sync_epoch(&self, epoch: u64) {
|
|
||||||
let current = self.epoch.load(Ordering::Relaxed);
|
|
||||||
if current == epoch {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if current < epoch
|
|
||||||
&& self
|
|
||||||
.epoch
|
|
||||||
.compare_exchange(current, epoch, Ordering::Relaxed, Ordering::Relaxed)
|
|
||||||
.is_ok()
|
|
||||||
{
|
|
||||||
self.used.store(0, Ordering::Relaxed);
|
|
||||||
self.active_users.store(0, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_consume(
|
|
||||||
&self,
|
|
||||||
user_state: &CidrUserDirectionState,
|
|
||||||
cap_epoch: u64,
|
|
||||||
requested: u64,
|
|
||||||
) -> u64 {
|
|
||||||
if requested == 0 || cap_epoch == 0 {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
let epoch = current_epoch();
|
|
||||||
self.sync_epoch(epoch);
|
|
||||||
user_state.sync_epoch_and_mark_active(epoch, &self.active_users);
|
|
||||||
let active_users = self.active_users.load(Ordering::Relaxed).max(1);
|
|
||||||
let fair_share = cap_epoch.saturating_div(active_users).max(1);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let total_used = self.used.load(Ordering::Relaxed);
|
|
||||||
if total_used >= cap_epoch {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
let total_remaining = cap_epoch.saturating_sub(total_used);
|
|
||||||
let user_used = user_state.used.load(Ordering::Relaxed);
|
|
||||||
let guaranteed_remaining = fair_share.saturating_sub(user_used);
|
|
||||||
|
|
||||||
let grant = if guaranteed_remaining > 0 {
|
|
||||||
requested.min(guaranteed_remaining).min(total_remaining)
|
|
||||||
} else {
|
|
||||||
requested
|
|
||||||
.min(total_remaining)
|
|
||||||
.min(MAX_BORROW_CHUNK_BYTES)
|
|
||||||
};
|
|
||||||
|
|
||||||
if grant == 0 {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
let next_total = total_used.saturating_add(grant);
|
|
||||||
if self
|
|
||||||
.used
|
|
||||||
.compare_exchange_weak(
|
|
||||||
total_used,
|
|
||||||
next_total,
|
|
||||||
Ordering::Relaxed,
|
|
||||||
Ordering::Relaxed,
|
|
||||||
)
|
|
||||||
.is_ok()
|
|
||||||
{
|
|
||||||
user_state.used.fetch_add(grant, Ordering::Relaxed);
|
|
||||||
return grant;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn refund(&self, bytes: u64) {
|
|
||||||
if bytes == 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
decrement_atomic_saturating(&self.used, bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct CidrUserDirectionState {
|
|
||||||
epoch: AtomicU64,
|
|
||||||
used: AtomicU64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CidrUserDirectionState {
|
|
||||||
fn sync_epoch_and_mark_active(&self, epoch: u64, active_users: &AtomicU64) {
|
|
||||||
let current = self.epoch.load(Ordering::Relaxed);
|
|
||||||
if current == epoch {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if current < epoch
|
|
||||||
&& self
|
|
||||||
.epoch
|
|
||||||
.compare_exchange(current, epoch, Ordering::Relaxed, Ordering::Relaxed)
|
|
||||||
.is_ok()
|
|
||||||
{
|
|
||||||
self.used.store(0, Ordering::Relaxed);
|
|
||||||
active_users.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn refund(&self, bytes: u64) {
|
|
||||||
if bytes == 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
decrement_atomic_saturating(&self.used, bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct CidrUserShare {
|
|
||||||
active_conns: AtomicU64,
|
|
||||||
up: CidrUserDirectionState,
|
|
||||||
down: CidrUserDirectionState,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CidrUserShare {
|
|
||||||
fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
active_conns: AtomicU64::new(0),
|
|
||||||
up: CidrUserDirectionState::default(),
|
|
||||||
down: CidrUserDirectionState::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct CidrBucket {
|
|
||||||
rates: AtomicRatePair,
|
|
||||||
up: CidrDirectionBucket,
|
|
||||||
down: CidrDirectionBucket,
|
|
||||||
users: ShardedRegistry<CidrUserShare>,
|
|
||||||
active_leases: AtomicU64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CidrBucket {
|
|
||||||
fn new(limits: RateLimitBps) -> Self {
|
|
||||||
let rates = AtomicRatePair::default();
|
|
||||||
rates.set(limits);
|
|
||||||
Self {
|
|
||||||
rates,
|
|
||||||
up: CidrDirectionBucket::default(),
|
|
||||||
down: CidrDirectionBucket::default(),
|
|
||||||
users: ShardedRegistry::new(REGISTRY_SHARDS),
|
|
||||||
active_leases: AtomicU64::new(0),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_rates(&self, limits: RateLimitBps) {
|
|
||||||
self.rates.set(limits);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn acquire_user_share(&self, user: &str) -> Arc<CidrUserShare> {
|
|
||||||
let share = self.users.get_or_insert_with(user, CidrUserShare::new);
|
|
||||||
share.active_conns.fetch_add(1, Ordering::Relaxed);
|
|
||||||
share
|
|
||||||
}
|
|
||||||
|
|
||||||
fn release_user_share(&self, user: &str, share: &Arc<CidrUserShare>) {
|
|
||||||
decrement_atomic_saturating(&share.active_conns, 1);
|
|
||||||
let share_for_remove = Arc::clone(share);
|
|
||||||
let _ = self.users.remove_if(user, |candidate| {
|
|
||||||
Arc::ptr_eq(candidate, &share_for_remove)
|
|
||||||
&& candidate.active_conns.load(Ordering::Relaxed) == 0
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_consume_for_user(
|
|
||||||
&self,
|
|
||||||
direction: RateDirection,
|
|
||||||
share: &CidrUserShare,
|
|
||||||
requested: u64,
|
|
||||||
) -> u64 {
|
|
||||||
let cap_bps = self.rates.get(direction);
|
|
||||||
if cap_bps == 0 {
|
|
||||||
return requested;
|
|
||||||
}
|
|
||||||
let cap_epoch = bytes_per_epoch(cap_bps);
|
|
||||||
match direction {
|
|
||||||
RateDirection::Up => self.up.try_consume(&share.up, cap_epoch, requested),
|
|
||||||
RateDirection::Down => self.down.try_consume(&share.down, cap_epoch, requested),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn refund_for_user(&self, direction: RateDirection, share: &CidrUserShare, bytes: u64) {
|
|
||||||
match direction {
|
|
||||||
RateDirection::Up => {
|
|
||||||
self.up.refund(bytes);
|
|
||||||
share.up.refund(bytes);
|
|
||||||
}
|
|
||||||
RateDirection::Down => {
|
|
||||||
self.down.refund(bytes);
|
|
||||||
share.down.refund(bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn cleanup_idle_users(&self) {
|
|
||||||
self.users
|
|
||||||
.retain(|_, share| share.active_conns.load(Ordering::Relaxed) > 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct CidrRule {
|
|
||||||
key: String,
|
|
||||||
cidr: IpNetwork,
|
|
||||||
limits: RateLimitBps,
|
|
||||||
prefix_len: u8,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct PolicySnapshot {
|
|
||||||
user_limits: HashMap<String, RateLimitBps>,
|
|
||||||
cidr_rules_v4: Vec<CidrRule>,
|
|
||||||
cidr_rules_v6: Vec<CidrRule>,
|
|
||||||
cidr_rule_keys: HashSet<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PolicySnapshot {
|
|
||||||
fn match_cidr(&self, ip: IpAddr) -> Option<&CidrRule> {
|
|
||||||
match ip {
|
|
||||||
IpAddr::V4(_) => self.cidr_rules_v4.iter().find(|rule| rule.cidr.contains(ip)),
|
|
||||||
IpAddr::V6(_) => self.cidr_rules_v6.iter().find(|rule| rule.cidr.contains(ip)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ShardedRegistry<T> {
|
|
||||||
shards: Box<[DashMap<String, Arc<T>>]>,
|
|
||||||
mask: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> ShardedRegistry<T> {
|
|
||||||
fn new(shards: usize) -> Self {
|
|
||||||
let shard_count = shards.max(1).next_power_of_two();
|
|
||||||
let mut items = Vec::with_capacity(shard_count);
|
|
||||||
for _ in 0..shard_count {
|
|
||||||
items.push(DashMap::<String, Arc<T>>::new());
|
|
||||||
}
|
|
||||||
Self {
|
|
||||||
shards: items.into_boxed_slice(),
|
|
||||||
mask: shard_count.saturating_sub(1),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn shard_index(&self, key: &str) -> usize {
|
|
||||||
let mut hasher = std::collections::hash_map::DefaultHasher::new();
|
|
||||||
key.hash(&mut hasher);
|
|
||||||
(hasher.finish() as usize) & self.mask
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_or_insert_with<F>(&self, key: &str, make: F) -> Arc<T>
|
|
||||||
where
|
|
||||||
F: FnOnce() -> T,
|
|
||||||
{
|
|
||||||
let shard = &self.shards[self.shard_index(key)];
|
|
||||||
match shard.entry(key.to_string()) {
|
|
||||||
dashmap::mapref::entry::Entry::Occupied(entry) => Arc::clone(entry.get()),
|
|
||||||
dashmap::mapref::entry::Entry::Vacant(slot) => {
|
|
||||||
let value = Arc::new(make());
|
|
||||||
slot.insert(Arc::clone(&value));
|
|
||||||
value
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn retain<F>(&self, predicate: F)
|
|
||||||
where
|
|
||||||
F: Fn(&String, &Arc<T>) -> bool + Copy,
|
|
||||||
{
|
|
||||||
for shard in &*self.shards {
|
|
||||||
shard.retain(|key, value| predicate(key, value));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove_if<F>(&self, key: &str, predicate: F) -> bool
|
|
||||||
where
|
|
||||||
F: Fn(&Arc<T>) -> bool,
|
|
||||||
{
|
|
||||||
let shard = &self.shards[self.shard_index(key)];
|
|
||||||
let should_remove = match shard.get(key) {
|
|
||||||
Some(entry) => predicate(entry.value()),
|
|
||||||
None => false,
|
|
||||||
};
|
|
||||||
if !should_remove {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
shard.remove(key).is_some()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct TrafficLease {
|
|
||||||
limiter: Arc<TrafficLimiter>,
|
|
||||||
user_bucket: Option<Arc<UserBucket>>,
|
|
||||||
cidr_bucket: Option<Arc<CidrBucket>>,
|
|
||||||
cidr_user_key: Option<String>,
|
|
||||||
cidr_user_share: Option<Arc<CidrUserShare>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TrafficLease {
|
|
||||||
pub fn try_consume(&self, direction: RateDirection, requested: u64) -> TrafficConsumeResult {
|
|
||||||
if requested == 0 {
|
|
||||||
return TrafficConsumeResult {
|
|
||||||
granted: 0,
|
|
||||||
blocked_user: false,
|
|
||||||
blocked_cidr: false,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut granted = requested;
|
|
||||||
if let Some(user_bucket) = self.user_bucket.as_ref() {
|
|
||||||
let user_granted = user_bucket.try_consume(direction, granted);
|
|
||||||
if user_granted == 0 {
|
|
||||||
self.limiter.observe_throttle(direction, true, false);
|
|
||||||
return TrafficConsumeResult {
|
|
||||||
granted: 0,
|
|
||||||
blocked_user: true,
|
|
||||||
blocked_cidr: false,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
granted = user_granted;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let (Some(cidr_bucket), Some(cidr_user_share)) =
|
|
||||||
(self.cidr_bucket.as_ref(), self.cidr_user_share.as_ref())
|
|
||||||
{
|
|
||||||
let cidr_granted = cidr_bucket.try_consume_for_user(direction, cidr_user_share, granted);
|
|
||||||
if cidr_granted < granted
|
|
||||||
&& let Some(user_bucket) = self.user_bucket.as_ref()
|
|
||||||
{
|
|
||||||
user_bucket.refund(direction, granted.saturating_sub(cidr_granted));
|
|
||||||
}
|
|
||||||
if cidr_granted == 0 {
|
|
||||||
self.limiter.observe_throttle(direction, false, true);
|
|
||||||
return TrafficConsumeResult {
|
|
||||||
granted: 0,
|
|
||||||
blocked_user: false,
|
|
||||||
blocked_cidr: true,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
granted = cidr_granted;
|
|
||||||
}
|
|
||||||
|
|
||||||
TrafficConsumeResult {
|
|
||||||
granted,
|
|
||||||
blocked_user: false,
|
|
||||||
blocked_cidr: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn refund(&self, direction: RateDirection, bytes: u64) {
|
|
||||||
if bytes == 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(user_bucket) = self.user_bucket.as_ref() {
|
|
||||||
user_bucket.refund(direction, bytes);
|
|
||||||
}
|
|
||||||
if let (Some(cidr_bucket), Some(cidr_user_share)) =
|
|
||||||
(self.cidr_bucket.as_ref(), self.cidr_user_share.as_ref())
|
|
||||||
{
|
|
||||||
cidr_bucket.refund_for_user(direction, cidr_user_share, bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn observe_wait_ms(
|
|
||||||
&self,
|
|
||||||
direction: RateDirection,
|
|
||||||
blocked_user: bool,
|
|
||||||
blocked_cidr: bool,
|
|
||||||
wait_ms: u64,
|
|
||||||
) {
|
|
||||||
if wait_ms == 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
self.limiter
|
|
||||||
.observe_wait(direction, blocked_user, blocked_cidr, wait_ms);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for TrafficLease {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
if let Some(bucket) = self.user_bucket.as_ref() {
|
|
||||||
decrement_atomic_saturating(&bucket.active_leases, 1);
|
|
||||||
decrement_atomic_saturating(&self.limiter.user_scope.active_leases, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(bucket) = self.cidr_bucket.as_ref() {
|
|
||||||
if let (Some(user_key), Some(share)) =
|
|
||||||
(self.cidr_user_key.as_ref(), self.cidr_user_share.as_ref())
|
|
||||||
{
|
|
||||||
bucket.release_user_share(user_key, share);
|
|
||||||
}
|
|
||||||
decrement_atomic_saturating(&bucket.active_leases, 1);
|
|
||||||
decrement_atomic_saturating(&self.limiter.cidr_scope.active_leases, 1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct TrafficLimiter {
|
|
||||||
policy: ArcSwap<PolicySnapshot>,
|
|
||||||
user_buckets: ShardedRegistry<UserBucket>,
|
|
||||||
cidr_buckets: ShardedRegistry<CidrBucket>,
|
|
||||||
user_scope: ScopeMetrics,
|
|
||||||
cidr_scope: ScopeMetrics,
|
|
||||||
last_cleanup_epoch_secs: AtomicU64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TrafficLimiter {
|
|
||||||
pub fn new() -> Arc<Self> {
|
|
||||||
Arc::new(Self {
|
|
||||||
policy: ArcSwap::from_pointee(PolicySnapshot::default()),
|
|
||||||
user_buckets: ShardedRegistry::new(REGISTRY_SHARDS),
|
|
||||||
cidr_buckets: ShardedRegistry::new(REGISTRY_SHARDS),
|
|
||||||
user_scope: ScopeMetrics::default(),
|
|
||||||
cidr_scope: ScopeMetrics::default(),
|
|
||||||
last_cleanup_epoch_secs: AtomicU64::new(0),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn apply_policy(
|
|
||||||
&self,
|
|
||||||
user_limits: HashMap<String, RateLimitBps>,
|
|
||||||
cidr_limits: HashMap<IpNetwork, RateLimitBps>,
|
|
||||||
) {
|
|
||||||
let filtered_users = user_limits
|
|
||||||
.into_iter()
|
|
||||||
.filter(|(_, limit)| limit.up_bps > 0 || limit.down_bps > 0)
|
|
||||||
.collect::<HashMap<_, _>>();
|
|
||||||
|
|
||||||
let mut cidr_rules_v4 = Vec::new();
|
|
||||||
let mut cidr_rules_v6 = Vec::new();
|
|
||||||
let mut cidr_rule_keys = HashSet::new();
|
|
||||||
for (cidr, limits) in cidr_limits {
|
|
||||||
if limits.up_bps == 0 && limits.down_bps == 0 {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let key = cidr.to_string();
|
|
||||||
let rule = CidrRule {
|
|
||||||
key: key.clone(),
|
|
||||||
cidr,
|
|
||||||
limits,
|
|
||||||
prefix_len: cidr.prefix(),
|
|
||||||
};
|
|
||||||
cidr_rule_keys.insert(key);
|
|
||||||
match rule.cidr {
|
|
||||||
IpNetwork::V4(_) => cidr_rules_v4.push(rule),
|
|
||||||
IpNetwork::V6(_) => cidr_rules_v6.push(rule),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cidr_rules_v4.sort_by(|a, b| b.prefix_len.cmp(&a.prefix_len));
|
|
||||||
cidr_rules_v6.sort_by(|a, b| b.prefix_len.cmp(&a.prefix_len));
|
|
||||||
|
|
||||||
self.user_scope
|
|
||||||
.policy_entries
|
|
||||||
.store(filtered_users.len() as u64, Ordering::Relaxed);
|
|
||||||
self.cidr_scope
|
|
||||||
.policy_entries
|
|
||||||
.store(cidr_rule_keys.len() as u64, Ordering::Relaxed);
|
|
||||||
|
|
||||||
self.policy.store(Arc::new(PolicySnapshot {
|
|
||||||
user_limits: filtered_users,
|
|
||||||
cidr_rules_v4,
|
|
||||||
cidr_rules_v6,
|
|
||||||
cidr_rule_keys,
|
|
||||||
}));
|
|
||||||
|
|
||||||
self.maybe_cleanup();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn acquire_lease(
|
|
||||||
self: &Arc<Self>,
|
|
||||||
user: &str,
|
|
||||||
client_ip: IpAddr,
|
|
||||||
) -> Option<Arc<TrafficLease>> {
|
|
||||||
let policy = self.policy.load_full();
|
|
||||||
let mut user_bucket = None;
|
|
||||||
if let Some(limit) = policy.user_limits.get(user).copied() {
|
|
||||||
let bucket = self
|
|
||||||
.user_buckets
|
|
||||||
.get_or_insert_with(user, || UserBucket::new(limit));
|
|
||||||
bucket.set_rates(limit);
|
|
||||||
bucket.active_leases.fetch_add(1, Ordering::Relaxed);
|
|
||||||
self.user_scope.active_leases.fetch_add(1, Ordering::Relaxed);
|
|
||||||
user_bucket = Some(bucket);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut cidr_bucket = None;
|
|
||||||
let mut cidr_user_key = None;
|
|
||||||
let mut cidr_user_share = None;
|
|
||||||
if let Some(rule) = policy.match_cidr(client_ip) {
|
|
||||||
let bucket = self
|
|
||||||
.cidr_buckets
|
|
||||||
.get_or_insert_with(rule.key.as_str(), || CidrBucket::new(rule.limits));
|
|
||||||
bucket.set_rates(rule.limits);
|
|
||||||
bucket.active_leases.fetch_add(1, Ordering::Relaxed);
|
|
||||||
self.cidr_scope.active_leases.fetch_add(1, Ordering::Relaxed);
|
|
||||||
let share = bucket.acquire_user_share(user);
|
|
||||||
cidr_user_key = Some(user.to_string());
|
|
||||||
cidr_user_share = Some(share);
|
|
||||||
cidr_bucket = Some(bucket);
|
|
||||||
}
|
|
||||||
|
|
||||||
if user_bucket.is_none() && cidr_bucket.is_none() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.maybe_cleanup();
|
|
||||||
Some(Arc::new(TrafficLease {
|
|
||||||
limiter: Arc::clone(self),
|
|
||||||
user_bucket,
|
|
||||||
cidr_bucket,
|
|
||||||
cidr_user_key,
|
|
||||||
cidr_user_share,
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn metrics_snapshot(&self) -> TrafficLimiterMetricsSnapshot {
|
|
||||||
TrafficLimiterMetricsSnapshot {
|
|
||||||
user_throttle_up_total: self.user_scope.throttle_up_total.load(Ordering::Relaxed),
|
|
||||||
user_throttle_down_total: self.user_scope.throttle_down_total.load(Ordering::Relaxed),
|
|
||||||
cidr_throttle_up_total: self.cidr_scope.throttle_up_total.load(Ordering::Relaxed),
|
|
||||||
cidr_throttle_down_total: self.cidr_scope.throttle_down_total.load(Ordering::Relaxed),
|
|
||||||
user_wait_up_ms_total: self.user_scope.wait_up_ms_total.load(Ordering::Relaxed),
|
|
||||||
user_wait_down_ms_total: self.user_scope.wait_down_ms_total.load(Ordering::Relaxed),
|
|
||||||
cidr_wait_up_ms_total: self.cidr_scope.wait_up_ms_total.load(Ordering::Relaxed),
|
|
||||||
cidr_wait_down_ms_total: self.cidr_scope.wait_down_ms_total.load(Ordering::Relaxed),
|
|
||||||
user_active_leases: self.user_scope.active_leases.load(Ordering::Relaxed),
|
|
||||||
cidr_active_leases: self.cidr_scope.active_leases.load(Ordering::Relaxed),
|
|
||||||
user_policy_entries: self.user_scope.policy_entries.load(Ordering::Relaxed),
|
|
||||||
cidr_policy_entries: self.cidr_scope.policy_entries.load(Ordering::Relaxed),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn observe_throttle(&self, direction: RateDirection, blocked_user: bool, blocked_cidr: bool) {
|
|
||||||
if blocked_user {
|
|
||||||
self.user_scope.throttle(direction);
|
|
||||||
}
|
|
||||||
if blocked_cidr {
|
|
||||||
self.cidr_scope.throttle(direction);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn observe_wait(
|
|
||||||
&self,
|
|
||||||
direction: RateDirection,
|
|
||||||
blocked_user: bool,
|
|
||||||
blocked_cidr: bool,
|
|
||||||
wait_ms: u64,
|
|
||||||
) {
|
|
||||||
if blocked_user {
|
|
||||||
self.user_scope.wait_ms(direction, wait_ms);
|
|
||||||
}
|
|
||||||
if blocked_cidr {
|
|
||||||
self.cidr_scope.wait_ms(direction, wait_ms);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn maybe_cleanup(&self) {
|
|
||||||
let now_epoch_secs = now_epoch_secs();
|
|
||||||
let last = self.last_cleanup_epoch_secs.load(Ordering::Relaxed);
|
|
||||||
if now_epoch_secs.saturating_sub(last) < CLEANUP_INTERVAL_SECS {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if self
|
|
||||||
.last_cleanup_epoch_secs
|
|
||||||
.compare_exchange(last, now_epoch_secs, Ordering::Relaxed, Ordering::Relaxed)
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let policy = self.policy.load_full();
|
|
||||||
self.user_buckets.retain(|user, bucket| {
|
|
||||||
bucket.active_leases.load(Ordering::Relaxed) > 0 || policy.user_limits.contains_key(user)
|
|
||||||
});
|
|
||||||
self.cidr_buckets.retain(|cidr_key, bucket| {
|
|
||||||
bucket.cleanup_idle_users();
|
|
||||||
bucket.active_leases.load(Ordering::Relaxed) > 0
|
|
||||||
|| policy.cidr_rule_keys.contains(cidr_key)
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn next_refill_delay() -> Duration {
|
|
||||||
let start = limiter_epoch_start();
|
|
||||||
let elapsed_ms = start.elapsed().as_millis() as u64;
|
|
||||||
let epoch_pos = elapsed_ms % FAIR_EPOCH_MS;
|
|
||||||
let wait_ms = FAIR_EPOCH_MS.saturating_sub(epoch_pos).max(1);
|
|
||||||
Duration::from_millis(wait_ms)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn decrement_atomic_saturating(counter: &AtomicU64, by: u64) {
|
|
||||||
if by == 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let mut current = counter.load(Ordering::Relaxed);
|
|
||||||
loop {
|
|
||||||
if current == 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let next = current.saturating_sub(by);
|
|
||||||
match counter.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed) {
|
|
||||||
Ok(_) => return,
|
|
||||||
Err(actual) => current = actual,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn now_epoch_secs() -> u64 {
|
|
||||||
SystemTime::now()
|
|
||||||
.duration_since(UNIX_EPOCH)
|
|
||||||
.unwrap_or_default()
|
|
||||||
.as_secs()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn bytes_per_epoch(bps: u64) -> u64 {
|
|
||||||
if bps == 0 {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
let numerator = bps.saturating_mul(FAIR_EPOCH_MS);
|
|
||||||
let bytes = numerator.saturating_div(8_000);
|
|
||||||
bytes.max(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn current_epoch() -> u64 {
|
|
||||||
let start = limiter_epoch_start();
|
|
||||||
let elapsed_ms = start.elapsed().as_millis() as u64;
|
|
||||||
elapsed_ms / FAIR_EPOCH_MS
|
|
||||||
}
|
|
||||||
|
|
||||||
fn limiter_epoch_start() -> &'static Instant {
|
|
||||||
static START: OnceLock<Instant> = OnceLock::new();
|
|
||||||
START.get_or_init(Instant::now)
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user