Compare commits

..

2 Commits
flow ... main

Author SHA1 Message Date
Alexey
d25aa5a1e9 Merge pull request #709 from groozchique/main
[docs] add hyperlinks to README
2026-04-16 16:12:48 +03:00
Nick Parfyonov
f1b7b9aa08 [docs] add hyperlinks to README 2026-04-16 09:40:55 +03:00
20 changed files with 136 additions and 2128 deletions

View File

@@ -98,3 +98,4 @@ harness = false
[profile.release] [profile.release]
lto = "fat" lto = "fat"
codegen-units = 1 codegen-units = 1

View File

@@ -1,6 +1,6 @@
# Telemt - MTProxy on Rust + Tokio # Telemt - MTProxy on Rust + Tokio
![Latest Release](https://img.shields.io/github/v/release/telemt/telemt?color=neon) ![Stars](https://img.shields.io/github/stars/telemt/telemt?style=social) ![Forks](https://img.shields.io/github/forks/telemt/telemt?style=social) [![Telegram](https://img.shields.io/badge/Telegram-Chat-24a1de?logo=telegram&logoColor=24a1de)](https://t.me/telemtrs) [![Latest Release](https://img.shields.io/github/v/release/telemt/telemt?color=neon)](https://github.com/telemt/telemt/releases/latest) [![Stars](https://img.shields.io/github/stars/telemt/telemt?style=social)](https://github.com/telemt/telemt/stargazers) [![Forks](https://img.shields.io/github/forks/telemt/telemt?style=social)](https://github.com/telemt/telemt/network/members) [![Telegram](https://img.shields.io/badge/Telegram-Chat-24a1de?logo=telegram&logoColor=24a1de)](https://t.me/telemtrs)
[🇷🇺 README на русском](https://github.com/telemt/telemt/blob/main/README.ru.md) [🇷🇺 README на русском](https://github.com/telemt/telemt/blob/main/README.ru.md)

View File

@@ -1,6 +1,6 @@
# Telemt — MTProxy на Rust + Tokio # Telemt — MTProxy на Rust + Tokio
![Latest Release](https://img.shields.io/github/v/release/telemt/telemt?color=neon) ![Stars](https://img.shields.io/github/stars/telemt/telemt?style=social) ![Forks](https://img.shields.io/github/forks/telemt/telemt?style=social) [![Telegram](https://img.shields.io/badge/Telegram-Chat-24a1de?logo=telegram&logoColor=24a1de)](https://t.me/telemtrs) [![Latest Release](https://img.shields.io/github/v/release/telemt/telemt?color=neon)](https://github.com/telemt/telemt/releases/latest) [![Stars](https://img.shields.io/github/stars/telemt/telemt?style=social)](https://github.com/telemt/telemt/stargazers) [![Forks](https://img.shields.io/github/forks/telemt/telemt?style=social)](https://github.com/telemt/telemt/network/members) [![Telegram](https://img.shields.io/badge/Telegram-Chat-24a1de?logo=telegram&logoColor=24a1de)](https://t.me/telemtrs)
***Решает проблемы раньше, чем другие узнают об их существовании*** ***Решает проблемы раньше, чем другие узнают об их существовании***

View File

@@ -1,225 +0,0 @@
# TLS Front Profile Fidelity
## Overview
This document describes how Telemt reuses captured TLS behavior in the FakeTLS server flight and how to validate the result on a real deployment.
When TLS front emulation is enabled, Telemt can capture useful server-side TLS behavior from the selected origin and reuse that behavior in the emulated success path. The goal is not to reproduce the origin byte-for-byte, but to reduce stable synthetic traits and make the emitted server flight structurally closer to the captured profile.
## Why this change exists
The project already captures useful server-side TLS behavior in the TLS front fetch path:
- `change_cipher_spec_count`
- `app_data_record_sizes`
- `ticket_record_sizes`
Before this change, the emulator used only part of that information. This left a gap between captured origin behavior and emitted FakeTLS server flight.
## What is implemented
- The emulator now replays the observed `ChangeCipherSpec` count from the fetched behavior profile.
- The emulator now replays observed ticket-like tail ApplicationData record sizes when raw or merged TLS profile data is available.
- The emulator now preserves more of the profiled encrypted-flight structure instead of collapsing it into a smaller synthetic shape.
- The emulator still falls back to the previous synthetic behavior when the cached profile does not contain raw TLS behavior information.
- Operator-configured `tls_new_session_tickets` still works as an additive fallback when the profile does not provide enough tail records.
## Practical benefit
- Reduced distinguishability between profiled origin TLS behavior and emulated TLS behavior.
- Lower chance of stable server-flight fingerprints caused by fixed CCS count or synthetic-only tail record sizes.
- Better reuse of already captured TLS profile data without changing MTProto logic, KDF routing, or transport architecture.
## Limitations
This mechanism does not aim to make Telemt byte-identical to the origin server.
It also does not change:
- MTProto business logic;
- KDF routing behavior;
- the overall transport architecture.
The practical goal is narrower:
- reuse more captured profile data;
- reduce fixed synthetic behavior in the server flight;
- preserve a valid FakeTLS success path while changing the emitted shape on the wire.
## Validation targets
- Correct count of emulated `ChangeCipherSpec` records.
- Correct replay of observed ticket-tail record sizes.
- No regression in existing ALPN and payload-placement behavior.
## How to validate the result
Recommended validation consists of two layers:
- focused unit and security tests for CCS-count replay and ticket-tail replay;
- real packet-capture comparison for a selected origin and a successful FakeTLS session.
When testing on the network, the expected result is:
- a valid FakeTLS and MTProto success path is preserved;
- the early encrypted server flight changes shape when richer profile data is available;
- the change is visible on the wire without changing MTProto logic or transport architecture.
This validation is intended to show better reuse of captured TLS profile data.
It is not intended to prove byte-level equivalence with the real origin server.
## How to test on a real deployment
The strongest practical validation is a side-by-side trace comparison between:
- a real TLS origin server used as `mask_host`;
- a Telemt FakeTLS success-path connection for the same SNI;
- optional captures from different Telemt builds or configurations.
The purpose of the comparison is to inspect the shape of the server flight:
- record order;
- count of `ChangeCipherSpec` records;
- count and grouping of early encrypted `ApplicationData` records;
- lengths of tail or continuation `ApplicationData` records.
## Recommended environment
Use a Linux host or Docker container for the cleanest reproduction.
Recommended setup:
1. One Telemt instance.
2. One real HTTPS origin as `mask_host`.
3. One Telegram client configured with an `ee` proxy link for the Telemt instance.
4. `tcpdump` or Wireshark available for capture analysis.
## Step-by-step test procedure
### 1. Prepare the origin
1. Choose a real HTTPS origin.
2. Set both `censorship.tls_domain` and `censorship.mask_host` to that hostname.
3. Confirm that a direct TLS request works:
```bash
openssl s_client -connect ORIGIN_IP:443 -servername YOUR_DOMAIN </dev/null
```
### 2. Configure Telemt
Use a configuration that enables:
- `censorship.mask = true`
- `censorship.tls_emulation = true`
- `censorship.mask_host`
- `censorship.mask_port`
Recommended for cleaner testing:
- keep `censorship.tls_new_session_tickets = 0`, so the result depends primarily on fetched profile data rather than operator-forced synthetic tail records;
- keep `censorship.tls_fetch.strict_route = true`, if cleaner provenance for captured profile data is important.
### 3. Refresh TLS profile data
1. Start Telemt.
2. Let it fetch TLS front profile data for the configured domain.
3. If `tls_front_dir` is persisted, confirm that the TLS front cache is populated.
Persisted cache artifacts are useful, but they are not required if packet captures already demonstrate the runtime result.
### 4. Capture a direct-origin trace
From a separate client host, connect directly to the origin:
```bash
openssl s_client -connect ORIGIN_IP:443 -servername YOUR_DOMAIN </dev/null
```
Capture with:
```bash
sudo tcpdump -i any -w origin-direct.pcap host ORIGIN_IP and port 443
```
### 5. Capture a Telemt FakeTLS success-path trace
Now connect to Telemt with a real Telegram client through an `ee` proxy link that targets the Telemt instance.
`openssl s_client` is useful for direct-origin capture and fallback sanity checks, but it does not exercise the successful FakeTLS and MTProto path.
Capture with:
```bash
sudo tcpdump -i any -w telemt-emulated.pcap host TELEMT_IP and port 443
```
### 6. Decode TLS record structure
Use `tshark` to print record-level structure:
```bash
tshark -r origin-direct.pcap -Y "tls.record" -T fields \
-e frame.number \
-e ip.src \
-e ip.dst \
-e tls.record.content_type \
-e tls.record.length
```
```bash
tshark -r telemt-emulated.pcap -Y "tls.record" -T fields \
-e frame.number \
-e ip.src \
-e ip.dst \
-e tls.record.content_type \
-e tls.record.length
```
Focus on the server flight after ClientHello:
- `22` = Handshake
- `20` = ChangeCipherSpec
- `23` = ApplicationData
### 7. Build a comparison table
A compact table like the following is usually enough:
| Path | CCS count | AppData count in first encrypted flight | Tail AppData lengths |
| --- | --- | --- | --- |
| Origin | `N` | `M` | `[a, b, ...]` |
| Telemt build A | `...` | `...` | `...` |
| Telemt build B | `...` | `...` | `...` |
The comparison should make it easy to see that:
- the FakeTLS success path remains valid;
- the early encrypted server flight changes when richer profile data is reused;
- the result is backed by packet evidence.
## Example capture set
One practical example of this workflow uses:
- `origin-direct-nginx.pcap`
- `telemt-ee-before-nginx.pcap`
- `telemt-ee-after-nginx.pcap`
Practical notes:
- `origin` was captured as a direct TLS 1.2 connection to `nginx.org`;
- `before` and `after` were captured on the Telemt FakeTLS success path with a real Telegram client;
- the first server-side FakeTLS response remains valid in both cases;
- the early encrypted server-flight segmentation differs between `before` and `after`, which is consistent with better reuse of captured profile data;
- this kind of result shows a wire-visible effect without breaking the success path, but it does not claim full indistinguishability from the origin.
## Stronger validation
For broader confidence, repeat the same comparison on:
1. one CDN-backed origin;
2. one regular nginx origin;
3. one origin with a multi-record encrypted flight and visible ticket-like tails.
If the same directional improvement appears across all three, confidence in the result will be much higher than for a single-origin example.

View File

@@ -1,225 +0,0 @@
# Fidelity TLS Front Profile
## Обзор
Этот документ описывает, как Telemt переиспользует захваченное TLS-поведение в FakeTLS server flight и как проверять результат на реальной инсталляции.
Когда включена TLS front emulation, Telemt может собирать полезное серверное TLS-поведение выбранного origin и использовать его в emulated success path. Цель здесь не в побайтном копировании origin, а в уменьшении устойчивых synthetic признаков и в том, чтобы emitted server flight был структурно ближе к захваченному profile.
## Зачем нужно это изменение
Проект уже умеет собирать полезное серверное TLS-поведение в пути TLS front fetch:
- `change_cipher_spec_count`
- `app_data_record_sizes`
- `ticket_record_sizes`
До этого изменения эмулятор использовал только часть этой информации. Из-за этого оставался разрыв между захваченным поведением origin и тем FakeTLS server flight, который реально уходил на провод.
## Что реализовано
- Эмулятор теперь воспроизводит наблюдаемое значение `ChangeCipherSpec` из полученного `behavior_profile`.
- Эмулятор теперь воспроизводит наблюдаемые размеры ticket-like tail ApplicationData records, когда доступны raw или merged TLS profile data.
- Эмулятор теперь сохраняет больше структуры профилированного encrypted flight, а не схлопывает его в более маленькую synthetic форму.
- Для профилей без raw TLS behavior по-прежнему сохраняется прежний synthetic fallback.
- Операторский `tls_new_session_tickets` по-прежнему работает как дополнительный fallback, если профиль не даёт достаточного количества tail records.
## Практическая польза
- Снижается различимость между профилированным origin TLS-поведением и эмулируемым TLS-поведением.
- Уменьшается шанс устойчивых server-flight fingerprint, вызванных фиксированным CCS count или полностью synthetic tail record sizes.
- Уже собранные TLS profile data используются лучше, без изменения MTProto logic, KDF routing или transport architecture.
## Ограничения
Этот механизм не ставит целью сделать Telemt побайтно идентичным origin server.
Он также не меняет:
- MTProto business logic;
- поведение KDF routing;
- общую transport architecture.
Практическая цель уже:
- использовать больше уже собранных profile data;
- уменьшить fixed synthetic behavior в server flight;
- сохранить валидный FakeTLS success path, одновременно меняя форму emitted traffic на проводе.
## Цели валидации
- Корректное количество эмулируемых `ChangeCipherSpec` records.
- Корректное воспроизведение наблюдаемых ticket-tail record sizes.
- Отсутствие регрессии в существующем ALPN и payload-placement behavior.
## Как проверять результат
Рекомендуемая валидация состоит из двух слоёв:
- focused unit и security tests для CCS-count replay и ticket-tail replay;
- сравнение реальных packet capture для выбранного origin и успешной FakeTLS session.
При проверке на сети ожидаемый результат такой:
- валидный FakeTLS и MTProto success path сохраняется;
- форма раннего encrypted server flight меняется, когда доступно более богатое profile data;
- изменение видно на проводе без изменения MTProto logic и transport architecture.
Такая проверка нужна для подтверждения того, что уже собранные TLS profile data используются лучше.
Она не предназначена для доказательства побайтной эквивалентности с реальным origin server.
## Как проверить на реальной инсталляции
Самая сильная практическая проверка — side-by-side trace comparison между:
- реальным TLS origin server, используемым как `mask_host`;
- Telemt FakeTLS success-path connection для того же SNI;
- при необходимости capture от разных Telemt builds или configurations.
Смысл сравнения состоит в том, чтобы посмотреть на форму server flight:
- порядок records;
- количество `ChangeCipherSpec` records;
- количество и группировку ранних encrypted `ApplicationData` records;
- размеры tail или continuation `ApplicationData` records.
## Рекомендуемое окружение
Для самой чистой проверки лучше использовать Linux host или Docker container.
Рекомендуемый setup:
1. Один экземпляр Telemt.
2. Один реальный HTTPS origin как `mask_host`.
3. Один Telegram client, настроенный на `ee` proxy link для Telemt instance.
4. `tcpdump` или Wireshark для анализа capture.
## Пошаговая процедура проверки
### 1. Подготовить origin
1. Выберите реальный HTTPS origin.
2. Установите и `censorship.tls_domain`, и `censorship.mask_host` в hostname этого origin.
3. Убедитесь, что прямой TLS request работает:
```bash
openssl s_client -connect ORIGIN_IP:443 -servername YOUR_DOMAIN </dev/null
```
### 2. Настроить Telemt
Используйте config, где включены:
- `censorship.mask = true`
- `censorship.tls_emulation = true`
- `censorship.mask_host`
- `censorship.mask_port`
Для более чистой проверки рекомендуется:
- держать `censorship.tls_new_session_tickets = 0`, чтобы результат в первую очередь зависел от fetched profile data, а не от операторских synthetic tail records;
- держать `censorship.tls_fetch.strict_route = true`, если важна более чистая provenance для captured profile data.
### 3. Обновить TLS profile data
1. Запустите Telemt.
2. Дайте ему получить TLS front profile data для выбранного домена.
3. Если `tls_front_dir` хранится persistently, убедитесь, что TLS front cache заполнен.
Persisted cache artifacts полезны, но не обязательны, если packet capture уже показывают runtime result.
### 4. Снять direct-origin trace
С отдельной клиентской машины подключитесь напрямую к origin:
```bash
openssl s_client -connect ORIGIN_IP:443 -servername YOUR_DOMAIN </dev/null
```
Capture:
```bash
sudo tcpdump -i any -w origin-direct.pcap host ORIGIN_IP and port 443
```
### 5. Снять Telemt FakeTLS success-path trace
Теперь подключитесь к Telemt через реальный Telegram client с `ee` proxy link, который указывает на Telemt instance.
`openssl s_client` полезен для direct-origin capture и для fallback sanity checks, но он не проходит успешный FakeTLS и MTProto path.
Capture:
```bash
sudo tcpdump -i any -w telemt-emulated.pcap host TELEMT_IP and port 443
```
### 6. Декодировать структуру TLS records
Используйте `tshark`, чтобы вывести record-level structure:
```bash
tshark -r origin-direct.pcap -Y "tls.record" -T fields \
-e frame.number \
-e ip.src \
-e ip.dst \
-e tls.record.content_type \
-e tls.record.length
```
```bash
tshark -r telemt-emulated.pcap -Y "tls.record" -T fields \
-e frame.number \
-e ip.src \
-e ip.dst \
-e tls.record.content_type \
-e tls.record.length
```
Смотрите на server flight после ClientHello:
- `22` = Handshake
- `20` = ChangeCipherSpec
- `23` = ApplicationData
### 7. Собрать сравнительную таблицу
Обычно достаточно короткой таблицы такого вида:
| Path | CCS count | AppData count in first encrypted flight | Tail AppData lengths |
| --- | --- | --- | --- |
| Origin | `N` | `M` | `[a, b, ...]` |
| Telemt build A | `...` | `...` | `...` |
| Telemt build B | `...` | `...` | `...` |
По такой таблице должно быть легко увидеть, что:
- FakeTLS success path остаётся валидным;
- ранний encrypted server flight меняется, когда переиспользуется более богатое profile data;
- результат подтверждён packet evidence.
## Пример набора capture
Один практический пример такой проверки использует:
- `origin-direct-nginx.pcap`
- `telemt-ee-before-nginx.pcap`
- `telemt-ee-after-nginx.pcap`
Практические замечания:
- `origin` снимался как прямое TLS 1.2 connection к `nginx.org`;
- `before` и `after` снимались на Telemt FakeTLS success path с реальным Telegram client;
- первый server-side FakeTLS response остаётся валидным в обоих случаях;
- сегментация раннего encrypted server flight отличается между `before` и `after`, что согласуется с лучшим использованием captured profile data;
- такой результат показывает заметный эффект на проводе без поломки success path, но не заявляет полной неотличимости от origin.
## Более сильная валидация
Для более широкой проверки повторите ту же процедуру ещё на:
1. одном CDN-backed origin;
2. одном regular nginx origin;
3. одном origin с multi-record encrypted flight и заметными ticket-like tails.
Если одно и то же направление улучшения повторится на всех трёх, уверенность в результате будет значительно выше, чем для одного origin example.

View File

@@ -121,8 +121,6 @@ pub struct HotFields {
pub user_max_tcp_conns_global_each: usize, pub user_max_tcp_conns_global_each: usize,
pub user_expirations: std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>, pub user_expirations: std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
pub user_data_quota: std::collections::HashMap<String, u64>, pub user_data_quota: std::collections::HashMap<String, u64>,
pub user_rate_limits: std::collections::HashMap<String, crate::config::RateLimitBps>,
pub cidr_rate_limits: std::collections::HashMap<ipnetwork::IpNetwork, crate::config::RateLimitBps>,
pub user_max_unique_ips: std::collections::HashMap<String, usize>, pub user_max_unique_ips: std::collections::HashMap<String, usize>,
pub user_max_unique_ips_global_each: usize, pub user_max_unique_ips_global_each: usize,
pub user_max_unique_ips_mode: crate::config::UserMaxUniqueIpsMode, pub user_max_unique_ips_mode: crate::config::UserMaxUniqueIpsMode,
@@ -247,8 +245,6 @@ impl HotFields {
user_max_tcp_conns_global_each: cfg.access.user_max_tcp_conns_global_each, user_max_tcp_conns_global_each: cfg.access.user_max_tcp_conns_global_each,
user_expirations: cfg.access.user_expirations.clone(), user_expirations: cfg.access.user_expirations.clone(),
user_data_quota: cfg.access.user_data_quota.clone(), user_data_quota: cfg.access.user_data_quota.clone(),
user_rate_limits: cfg.access.user_rate_limits.clone(),
cidr_rate_limits: cfg.access.cidr_rate_limits.clone(),
user_max_unique_ips: cfg.access.user_max_unique_ips.clone(), user_max_unique_ips: cfg.access.user_max_unique_ips.clone(),
user_max_unique_ips_global_each: cfg.access.user_max_unique_ips_global_each, user_max_unique_ips_global_each: cfg.access.user_max_unique_ips_global_each,
user_max_unique_ips_mode: cfg.access.user_max_unique_ips_mode, user_max_unique_ips_mode: cfg.access.user_max_unique_ips_mode,
@@ -549,8 +545,6 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
cfg.access.user_max_tcp_conns_global_each = new.access.user_max_tcp_conns_global_each; cfg.access.user_max_tcp_conns_global_each = new.access.user_max_tcp_conns_global_each;
cfg.access.user_expirations = new.access.user_expirations.clone(); cfg.access.user_expirations = new.access.user_expirations.clone();
cfg.access.user_data_quota = new.access.user_data_quota.clone(); cfg.access.user_data_quota = new.access.user_data_quota.clone();
cfg.access.user_rate_limits = new.access.user_rate_limits.clone();
cfg.access.cidr_rate_limits = new.access.cidr_rate_limits.clone();
cfg.access.user_max_unique_ips = new.access.user_max_unique_ips.clone(); cfg.access.user_max_unique_ips = new.access.user_max_unique_ips.clone();
cfg.access.user_max_unique_ips_global_each = new.access.user_max_unique_ips_global_each; cfg.access.user_max_unique_ips_global_each = new.access.user_max_unique_ips_global_each;
cfg.access.user_max_unique_ips_mode = new.access.user_max_unique_ips_mode; cfg.access.user_max_unique_ips_mode = new.access.user_max_unique_ips_mode;
@@ -1189,18 +1183,6 @@ fn log_changes(
new_hot.user_data_quota.len() new_hot.user_data_quota.len()
); );
} }
if old_hot.user_rate_limits != new_hot.user_rate_limits {
info!(
"config reload: user_rate_limits updated ({} entries)",
new_hot.user_rate_limits.len()
);
}
if old_hot.cidr_rate_limits != new_hot.cidr_rate_limits {
info!(
"config reload: cidr_rate_limits updated ({} entries)",
new_hot.cidr_rate_limits.len()
);
}
if old_hot.user_max_unique_ips != new_hot.user_max_unique_ips { if old_hot.user_max_unique_ips != new_hot.user_max_unique_ips {
info!( info!(
"config reload: user_max_unique_ips updated ({} entries)", "config reload: user_max_unique_ips updated ({} entries)",

View File

@@ -861,22 +861,6 @@ impl ProxyConfig {
)); ));
} }
for (user, limit) in &config.access.user_rate_limits {
if limit.up_bps == 0 && limit.down_bps == 0 {
return Err(ProxyError::Config(format!(
"access.user_rate_limits.{user} must set at least one non-zero direction"
)));
}
}
for (cidr, limit) in &config.access.cidr_rate_limits {
if limit.up_bps == 0 && limit.down_bps == 0 {
return Err(ProxyError::Config(format!(
"access.cidr_rate_limits.{cidr} must set at least one non-zero direction"
)));
}
}
if config.general.me_reinit_every_secs == 0 { if config.general.me_reinit_every_secs == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.me_reinit_every_secs must be > 0".to_string(), "general.me_reinit_every_secs must be > 0".to_string(),

View File

@@ -1826,21 +1826,6 @@ pub struct AccessConfig {
#[serde(default)] #[serde(default)]
pub user_data_quota: HashMap<String, u64>, pub user_data_quota: HashMap<String, u64>,
/// Per-user transport rate limits in bits-per-second.
///
/// Each entry supports independent upload (`up_bps`) and download
/// (`down_bps`) ceilings. A value of `0` in one direction means
/// "unlimited" for that direction.
#[serde(default)]
pub user_rate_limits: HashMap<String, RateLimitBps>,
/// Per-CIDR aggregate transport rate limits in bits-per-second.
///
/// Matching uses longest-prefix-wins semantics. A value of `0` in one
/// direction means "unlimited" for that direction.
#[serde(default)]
pub cidr_rate_limits: HashMap<IpNetwork, RateLimitBps>,
#[serde(default)] #[serde(default)]
pub user_max_unique_ips: HashMap<String, usize>, pub user_max_unique_ips: HashMap<String, usize>,
@@ -1874,8 +1859,6 @@ impl Default for AccessConfig {
user_max_tcp_conns_global_each: default_user_max_tcp_conns_global_each(), user_max_tcp_conns_global_each: default_user_max_tcp_conns_global_each(),
user_expirations: HashMap::new(), user_expirations: HashMap::new(),
user_data_quota: HashMap::new(), user_data_quota: HashMap::new(),
user_rate_limits: HashMap::new(),
cidr_rate_limits: HashMap::new(),
user_max_unique_ips: HashMap::new(), user_max_unique_ips: HashMap::new(),
user_max_unique_ips_global_each: default_user_max_unique_ips_global_each(), user_max_unique_ips_global_each: default_user_max_unique_ips_global_each(),
user_max_unique_ips_mode: UserMaxUniqueIpsMode::default(), user_max_unique_ips_mode: UserMaxUniqueIpsMode::default(),
@@ -1887,14 +1870,6 @@ impl Default for AccessConfig {
} }
} }
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct RateLimitBps {
#[serde(default)]
pub up_bps: u64,
#[serde(default)]
pub down_bps: u64,
}
// ============= Aux Structures ============= // ============= Aux Structures =============
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]

View File

@@ -9,7 +9,6 @@ use std::os::unix::fs::OpenOptionsExt;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use nix::fcntl::{Flock, FlockArg}; use nix::fcntl::{Flock, FlockArg};
use nix::errno::Errno;
use nix::unistd::{self, ForkResult, Gid, Pid, Uid, chdir, close, fork, getpid, setsid}; use nix::unistd::{self, ForkResult, Gid, Pid, Uid, chdir, close, fork, getpid, setsid};
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
@@ -338,31 +337,6 @@ fn is_process_running(pid: i32) -> bool {
nix::sys::signal::kill(Pid::from_raw(pid), None).is_ok() nix::sys::signal::kill(Pid::from_raw(pid), None).is_ok()
} }
// macOS gates nix::unistd::setgroups differently in the current dependency set,
// so call libc directly there while preserving the original nix path elsewhere.
fn set_supplementary_groups(gid: Gid) -> Result<(), nix::Error> {
#[cfg(target_os = "macos")]
{
let groups = [gid.as_raw()];
let rc = unsafe {
libc::setgroups(
i32::try_from(groups.len()).expect("single supplementary group must fit in c_int"),
groups.as_ptr(),
)
};
if rc == 0 {
Ok(())
} else {
Err(Errno::last())
}
}
#[cfg(not(target_os = "macos"))]
{
unistd::setgroups(&[gid])
}
}
/// Drops privileges to the specified user and group. /// Drops privileges to the specified user and group.
/// ///
/// This should be called after binding privileged ports but before entering /// This should be called after binding privileged ports but before entering
@@ -394,7 +368,7 @@ pub fn drop_privileges(
if let Some(gid) = target_gid { if let Some(gid) = target_gid {
unistd::setgid(gid).map_err(DaemonError::PrivilegeDrop)?; unistd::setgid(gid).map_err(DaemonError::PrivilegeDrop)?;
set_supplementary_groups(gid).map_err(DaemonError::PrivilegeDrop)?; unistd::setgroups(&[gid]).map_err(DaemonError::PrivilegeDrop)?;
info!(gid = gid.as_raw(), "Dropped group privileges"); info!(gid = gid.as_raw(), "Dropped group privileges");
} }

View File

@@ -664,11 +664,6 @@ async fn run_telemt_core(
)); ));
let buffer_pool = Arc::new(BufferPool::with_config(64 * 1024, 4096)); let buffer_pool = Arc::new(BufferPool::with_config(64 * 1024, 4096));
let shared_state = ProxySharedState::new();
shared_state.traffic_limiter.apply_policy(
config.access.user_rate_limits.clone(),
config.access.cidr_rate_limits.clone(),
);
connectivity::run_startup_connectivity( connectivity::run_startup_connectivity(
&config, &config,
@@ -700,7 +695,6 @@ async fn run_telemt_core(
beobachten.clone(), beobachten.clone(),
api_config_tx.clone(), api_config_tx.clone(),
me_pool.clone(), me_pool.clone(),
shared_state.clone(),
) )
.await; .await;
let config_rx = runtime_watches.config_rx; let config_rx = runtime_watches.config_rx;
@@ -717,6 +711,7 @@ async fn run_telemt_core(
) )
.await; .await;
let _admission_tx_hold = admission_tx; let _admission_tx_hold = admission_tx;
let shared_state = ProxySharedState::new();
conntrack_control::spawn_conntrack_controller( conntrack_control::spawn_conntrack_controller(
config_rx.clone(), config_rx.clone(),
stats.clone(), stats.clone(),

View File

@@ -51,7 +51,6 @@ pub(crate) async fn spawn_runtime_tasks(
beobachten: Arc<BeobachtenStore>, beobachten: Arc<BeobachtenStore>,
api_config_tx: watch::Sender<Arc<ProxyConfig>>, api_config_tx: watch::Sender<Arc<ProxyConfig>>,
me_pool_for_policy: Option<Arc<MePool>>, me_pool_for_policy: Option<Arc<MePool>>,
shared_state: Arc<ProxySharedState>,
) -> RuntimeWatches { ) -> RuntimeWatches {
let um_clone = upstream_manager.clone(); let um_clone = upstream_manager.clone();
let dc_overrides_for_health = config.dc_overrides.clone(); let dc_overrides_for_health = config.dc_overrides.clone();
@@ -183,33 +182,6 @@ pub(crate) async fn spawn_runtime_tasks(
} }
}); });
let limiter = shared_state.traffic_limiter.clone();
limiter.apply_policy(
config.access.user_rate_limits.clone(),
config.access.cidr_rate_limits.clone(),
);
let mut config_rx_rate_limits = config_rx.clone();
tokio::spawn(async move {
let mut prev_user_limits = config_rx_rate_limits.borrow().access.user_rate_limits.clone();
let mut prev_cidr_limits = config_rx_rate_limits.borrow().access.cidr_rate_limits.clone();
loop {
if config_rx_rate_limits.changed().await.is_err() {
break;
}
let cfg = config_rx_rate_limits.borrow_and_update().clone();
if prev_user_limits != cfg.access.user_rate_limits
|| prev_cidr_limits != cfg.access.cidr_rate_limits
{
limiter.apply_policy(
cfg.access.user_rate_limits.clone(),
cfg.access.cidr_rate_limits.clone(),
);
prev_user_limits = cfg.access.user_rate_limits.clone();
prev_cidr_limits = cfg.access.cidr_rate_limits.clone();
}
}
});
let beobachten_writer = beobachten.clone(); let beobachten_writer = beobachten.clone();
let config_rx_beobachten = config_rx.clone(); let config_rx_beobachten = config_rx.clone();
tokio::spawn(async move { tokio::spawn(async move {

View File

@@ -575,139 +575,6 @@ async fn render_metrics(
} }
); );
let limiter_metrics = shared_state.traffic_limiter.metrics_snapshot();
let _ = writeln!(
out,
"# HELP telemt_rate_limiter_throttle_total Traffic limiter throttle events by scope and direction"
);
let _ = writeln!(out, "# TYPE telemt_rate_limiter_throttle_total counter");
let _ = writeln!(
out,
"telemt_rate_limiter_throttle_total{{scope=\"user\",direction=\"up\"}} {}",
if core_enabled {
limiter_metrics.user_throttle_up_total
} else {
0
}
);
let _ = writeln!(
out,
"telemt_rate_limiter_throttle_total{{scope=\"user\",direction=\"down\"}} {}",
if core_enabled {
limiter_metrics.user_throttle_down_total
} else {
0
}
);
let _ = writeln!(
out,
"telemt_rate_limiter_throttle_total{{scope=\"cidr\",direction=\"up\"}} {}",
if core_enabled {
limiter_metrics.cidr_throttle_up_total
} else {
0
}
);
let _ = writeln!(
out,
"telemt_rate_limiter_throttle_total{{scope=\"cidr\",direction=\"down\"}} {}",
if core_enabled {
limiter_metrics.cidr_throttle_down_total
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_rate_limiter_wait_ms_total Traffic limiter accumulated wait time in milliseconds by scope and direction"
);
let _ = writeln!(out, "# TYPE telemt_rate_limiter_wait_ms_total counter");
let _ = writeln!(
out,
"telemt_rate_limiter_wait_ms_total{{scope=\"user\",direction=\"up\"}} {}",
if core_enabled {
limiter_metrics.user_wait_up_ms_total
} else {
0
}
);
let _ = writeln!(
out,
"telemt_rate_limiter_wait_ms_total{{scope=\"user\",direction=\"down\"}} {}",
if core_enabled {
limiter_metrics.user_wait_down_ms_total
} else {
0
}
);
let _ = writeln!(
out,
"telemt_rate_limiter_wait_ms_total{{scope=\"cidr\",direction=\"up\"}} {}",
if core_enabled {
limiter_metrics.cidr_wait_up_ms_total
} else {
0
}
);
let _ = writeln!(
out,
"telemt_rate_limiter_wait_ms_total{{scope=\"cidr\",direction=\"down\"}} {}",
if core_enabled {
limiter_metrics.cidr_wait_down_ms_total
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_rate_limiter_active_leases Active relay leases under rate limiting by scope"
);
let _ = writeln!(out, "# TYPE telemt_rate_limiter_active_leases gauge");
let _ = writeln!(
out,
"telemt_rate_limiter_active_leases{{scope=\"user\"}} {}",
if core_enabled {
limiter_metrics.user_active_leases
} else {
0
}
);
let _ = writeln!(
out,
"telemt_rate_limiter_active_leases{{scope=\"cidr\"}} {}",
if core_enabled {
limiter_metrics.cidr_active_leases
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_rate_limiter_policy_entries Active rate-limit policy entries by scope"
);
let _ = writeln!(out, "# TYPE telemt_rate_limiter_policy_entries gauge");
let _ = writeln!(
out,
"telemt_rate_limiter_policy_entries{{scope=\"user\"}} {}",
if core_enabled {
limiter_metrics.user_policy_entries
} else {
0
}
);
let _ = writeln!(
out,
"telemt_rate_limiter_policy_entries{{scope=\"cidr\"}} {}",
if core_enabled {
limiter_metrics.cidr_policy_entries
} else {
0
}
);
let _ = writeln!( let _ = writeln!(
out, out,
"# HELP telemt_upstream_connect_attempt_total Upstream connect attempts across all requests" "# HELP telemt_upstream_connect_attempt_total Upstream connect attempts across all requests"

View File

@@ -316,7 +316,6 @@ where
stats.increment_user_connects(user); stats.increment_user_connects(user);
let _direct_connection_lease = stats.acquire_direct_connection_lease(); let _direct_connection_lease = stats.acquire_direct_connection_lease();
let traffic_lease = shared.traffic_limiter.acquire_lease(user, success.peer.ip());
let buffer_pool_trim = Arc::clone(&buffer_pool); let buffer_pool_trim = Arc::clone(&buffer_pool);
let relay_activity_timeout = if shared.conntrack_pressure_active() { let relay_activity_timeout = if shared.conntrack_pressure_active() {
@@ -330,7 +329,7 @@ where
} else { } else {
Duration::from_secs(1800) Duration::from_secs(1800)
}; };
let relay_result = crate::proxy::relay::relay_bidirectional_with_activity_timeout_and_lease( let relay_result = crate::proxy::relay::relay_bidirectional_with_activity_timeout(
client_reader, client_reader,
client_writer, client_writer,
tg_reader, tg_reader,
@@ -341,7 +340,6 @@ where
Arc::clone(&stats), Arc::clone(&stats),
config.access.user_data_quota.get(user).copied(), config.access.user_data_quota.get(user).copied(),
buffer_pool, buffer_pool,
traffic_lease,
relay_activity_timeout, relay_activity_timeout,
); );
tokio::pin!(relay_result); tokio::pin!(relay_result);

View File

@@ -28,7 +28,6 @@ use crate::proxy::route_mode::{
use crate::proxy::shared_state::{ use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState, ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
}; };
use crate::proxy::traffic_limiter::{RateDirection, TrafficLease, next_refill_delay};
use crate::stats::{ use crate::stats::{
MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, QuotaReserveError, Stats, UserStats, MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, QuotaReserveError, Stats, UserStats,
}; };
@@ -287,10 +286,6 @@ impl RelayClientIdleState {
self.last_client_frame_at = now; self.last_client_frame_at = now;
self.soft_idle_marked = false; self.soft_idle_marked = false;
} }
fn on_client_tiny_frame(&mut self, now: Instant) {
self.last_client_frame_at = now;
}
} }
impl MeD2cFlushPolicy { impl MeD2cFlushPolicy {
@@ -600,41 +595,6 @@ async fn reserve_user_quota_with_yield(
} }
} }
async fn wait_for_traffic_budget(
lease: Option<&Arc<TrafficLease>>,
direction: RateDirection,
bytes: u64,
) {
if bytes == 0 {
return;
}
let Some(lease) = lease else {
return;
};
let mut remaining = bytes;
while remaining > 0 {
let consume = lease.try_consume(direction, remaining);
if consume.granted > 0 {
remaining = remaining.saturating_sub(consume.granted);
continue;
}
let wait_started_at = Instant::now();
tokio::time::sleep(next_refill_delay()).await;
let wait_ms = wait_started_at
.elapsed()
.as_millis()
.min(u128::from(u64::MAX)) as u64;
lease.observe_wait_ms(
direction,
consume.blocked_user,
consume.blocked_cidr,
wait_ms,
);
}
}
fn classify_me_d2c_flush_reason( fn classify_me_d2c_flush_reason(
flush_immediately: bool, flush_immediately: bool,
batch_frames: usize, batch_frames: usize,
@@ -1025,7 +985,6 @@ where
let quota_limit = config.access.user_data_quota.get(&user).copied(); let quota_limit = config.access.user_data_quota.get(&user).copied();
let quota_user_stats = quota_limit.map(|_| stats.get_or_create_user_stats_handle(&user)); let quota_user_stats = quota_limit.map(|_| stats.get_or_create_user_stats_handle(&user));
let peer = success.peer; let peer = success.peer;
let traffic_lease = shared.traffic_limiter.acquire_lease(&user, peer.ip());
let proto_tag = success.proto_tag; let proto_tag = success.proto_tag;
let pool_generation = me_pool.current_generation(); let pool_generation = me_pool.current_generation();
@@ -1161,7 +1120,6 @@ where
let rng_clone = rng.clone(); let rng_clone = rng.clone();
let user_clone = user.clone(); let user_clone = user.clone();
let quota_user_stats_me_writer = quota_user_stats.clone(); let quota_user_stats_me_writer = quota_user_stats.clone();
let traffic_lease_me_writer = traffic_lease.clone();
let last_downstream_activity_ms_clone = last_downstream_activity_ms.clone(); let last_downstream_activity_ms_clone = last_downstream_activity_ms.clone();
let bytes_me2c_clone = bytes_me2c.clone(); let bytes_me2c_clone = bytes_me2c.clone();
let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config); let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config);
@@ -1195,7 +1153,7 @@ where
let first_is_downstream_activity = let first_is_downstream_activity =
matches!(&first, MeResponse::Data { .. } | MeResponse::Ack(_)); matches!(&first, MeResponse::Data { .. } | MeResponse::Ack(_));
match process_me_writer_response_with_traffic_lease( match process_me_writer_response(
first, first,
&mut writer, &mut writer,
proto_tag, proto_tag,
@@ -1206,7 +1164,6 @@ where
quota_user_stats_me_writer.as_deref(), quota_user_stats_me_writer.as_deref(),
quota_limit, quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes, d2c_flush_policy.quota_soft_overshoot_bytes,
traffic_lease_me_writer.as_ref(),
bytes_me2c_clone.as_ref(), bytes_me2c_clone.as_ref(),
conn_id, conn_id,
d2c_flush_policy.ack_flush_immediate, d2c_flush_policy.ack_flush_immediate,
@@ -1256,7 +1213,7 @@ where
let next_is_downstream_activity = let next_is_downstream_activity =
matches!(&next, MeResponse::Data { .. } | MeResponse::Ack(_)); matches!(&next, MeResponse::Data { .. } | MeResponse::Ack(_));
match process_me_writer_response_with_traffic_lease( match process_me_writer_response(
next, next,
&mut writer, &mut writer,
proto_tag, proto_tag,
@@ -1267,7 +1224,6 @@ where
quota_user_stats_me_writer.as_deref(), quota_user_stats_me_writer.as_deref(),
quota_limit, quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes, d2c_flush_policy.quota_soft_overshoot_bytes,
traffic_lease_me_writer.as_ref(),
bytes_me2c_clone.as_ref(), bytes_me2c_clone.as_ref(),
conn_id, conn_id,
d2c_flush_policy.ack_flush_immediate, d2c_flush_policy.ack_flush_immediate,
@@ -1320,7 +1276,7 @@ where
Ok(Some(next)) => { Ok(Some(next)) => {
let next_is_downstream_activity = let next_is_downstream_activity =
matches!(&next, MeResponse::Data { .. } | MeResponse::Ack(_)); matches!(&next, MeResponse::Data { .. } | MeResponse::Ack(_));
match process_me_writer_response_with_traffic_lease( match process_me_writer_response(
next, next,
&mut writer, &mut writer,
proto_tag, proto_tag,
@@ -1331,7 +1287,6 @@ where
quota_user_stats_me_writer.as_deref(), quota_user_stats_me_writer.as_deref(),
quota_limit, quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes, d2c_flush_policy.quota_soft_overshoot_bytes,
traffic_lease_me_writer.as_ref(),
bytes_me2c_clone.as_ref(), bytes_me2c_clone.as_ref(),
conn_id, conn_id,
d2c_flush_policy.ack_flush_immediate, d2c_flush_policy.ack_flush_immediate,
@@ -1386,7 +1341,7 @@ where
let extra_is_downstream_activity = let extra_is_downstream_activity =
matches!(&extra, MeResponse::Data { .. } | MeResponse::Ack(_)); matches!(&extra, MeResponse::Data { .. } | MeResponse::Ack(_));
match process_me_writer_response_with_traffic_lease( match process_me_writer_response(
extra, extra,
&mut writer, &mut writer,
proto_tag, proto_tag,
@@ -1397,7 +1352,6 @@ where
quota_user_stats_me_writer.as_deref(), quota_user_stats_me_writer.as_deref(),
quota_limit, quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes, d2c_flush_policy.quota_soft_overshoot_bytes,
traffic_lease_me_writer.as_ref(),
bytes_me2c_clone.as_ref(), bytes_me2c_clone.as_ref(),
conn_id, conn_id,
d2c_flush_policy.ack_flush_immediate, d2c_flush_policy.ack_flush_immediate,
@@ -1588,12 +1542,6 @@ where
match payload_result { match payload_result {
Ok(Some((payload, quickack))) => { Ok(Some((payload, quickack))) => {
trace!(conn_id, bytes = payload.len(), "C->ME frame"); trace!(conn_id, bytes = payload.len(), "C->ME frame");
wait_for_traffic_budget(
traffic_lease.as_ref(),
RateDirection::Up,
payload.len() as u64,
)
.await;
forensics.bytes_c2me = forensics forensics.bytes_c2me = forensics
.bytes_c2me .bytes_c2me
.saturating_add(payload.len() as u64); .saturating_add(payload.len() as u64);
@@ -1814,6 +1762,40 @@ where
let downstream_ms = last_downstream_activity_ms.load(Ordering::Relaxed); let downstream_ms = last_downstream_activity_ms.load(Ordering::Relaxed);
let hard_deadline = let hard_deadline =
hard_deadline(idle_policy, idle_state, session_started_at, downstream_ms); hard_deadline(idle_policy, idle_state, session_started_at, downstream_ms);
if now >= hard_deadline {
clear_relay_idle_candidate_in(shared, forensics.conn_id);
stats.increment_relay_idle_hard_close_total();
let client_idle_secs = now
.saturating_duration_since(idle_state.last_client_frame_at)
.as_secs();
let downstream_idle_secs = now
.saturating_duration_since(
session_started_at + Duration::from_millis(downstream_ms),
)
.as_secs();
warn!(
trace_id = format_args!("0x{:016x}", forensics.trace_id),
conn_id = forensics.conn_id,
user = %forensics.user,
read_label,
client_idle_secs,
downstream_idle_secs,
soft_idle_secs = idle_policy.soft_idle.as_secs(),
hard_idle_secs = idle_policy.hard_idle.as_secs(),
grace_secs = idle_policy.grace_after_downstream_activity.as_secs(),
"Middle-relay hard idle close"
);
return Err(ProxyError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
"middle-relay hard idle timeout while reading {read_label}: client_idle_secs={client_idle_secs}, downstream_idle_secs={downstream_idle_secs}, soft_idle_secs={}, hard_idle_secs={}, grace_secs={}",
idle_policy.soft_idle.as_secs(),
idle_policy.hard_idle.as_secs(),
idle_policy.grace_after_downstream_activity.as_secs(),
),
)));
}
if !idle_state.soft_idle_marked if !idle_state.soft_idle_marked
&& now.saturating_duration_since(idle_state.last_client_frame_at) && now.saturating_duration_since(idle_state.last_client_frame_at)
>= idle_policy.soft_idle >= idle_policy.soft_idle
@@ -1868,45 +1850,7 @@ where
), ),
))); )));
} }
Err(_) => { Err(_) => {}
let now = Instant::now();
let downstream_ms = last_downstream_activity_ms.load(Ordering::Relaxed);
let hard_deadline =
hard_deadline(idle_policy, idle_state, session_started_at, downstream_ms);
if now >= hard_deadline {
clear_relay_idle_candidate_in(shared, forensics.conn_id);
stats.increment_relay_idle_hard_close_total();
let client_idle_secs = now
.saturating_duration_since(idle_state.last_client_frame_at)
.as_secs();
let downstream_idle_secs = now
.saturating_duration_since(
session_started_at + Duration::from_millis(downstream_ms),
)
.as_secs();
warn!(
trace_id = format_args!("0x{:016x}", forensics.trace_id),
conn_id = forensics.conn_id,
user = %forensics.user,
read_label,
client_idle_secs,
downstream_idle_secs,
soft_idle_secs = idle_policy.soft_idle.as_secs(),
hard_idle_secs = idle_policy.hard_idle.as_secs(),
grace_secs = idle_policy.grace_after_downstream_activity.as_secs(),
"Middle-relay hard idle close"
);
return Err(ProxyError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
"middle-relay hard idle timeout while reading {read_label}: client_idle_secs={client_idle_secs}, downstream_idle_secs={downstream_idle_secs}, soft_idle_secs={}, hard_idle_secs={}, grace_secs={}",
idle_policy.soft_idle.as_secs(),
idle_policy.hard_idle.as_secs(),
idle_policy.grace_after_downstream_activity.as_secs(),
),
)));
}
}
} }
} }
@@ -1997,7 +1941,6 @@ where
}; };
if len == 0 { if len == 0 {
idle_state.on_client_tiny_frame(Instant::now());
idle_state.tiny_frame_debt = idle_state idle_state.tiny_frame_debt = idle_state
.tiny_frame_debt .tiny_frame_debt
.saturating_add(TINY_FRAME_DEBT_PER_TINY); .saturating_add(TINY_FRAME_DEBT_PER_TINY);
@@ -2217,46 +2160,6 @@ async fn process_me_writer_response<W>(
ack_flush_immediate: bool, ack_flush_immediate: bool,
batched: bool, batched: bool,
) -> Result<MeWriterResponseOutcome> ) -> Result<MeWriterResponseOutcome>
where
W: AsyncWrite + Unpin + Send + 'static,
{
process_me_writer_response_with_traffic_lease(
response,
client_writer,
proto_tag,
rng,
frame_buf,
stats,
user,
quota_user_stats,
quota_limit,
quota_soft_overshoot_bytes,
None,
bytes_me2c,
conn_id,
ack_flush_immediate,
batched,
)
.await
}
async fn process_me_writer_response_with_traffic_lease<W>(
response: MeResponse,
client_writer: &mut CryptoWriter<W>,
proto_tag: ProtoTag,
rng: &SecureRandom,
frame_buf: &mut Vec<u8>,
stats: &Stats,
user: &str,
quota_user_stats: Option<&UserStats>,
quota_limit: Option<u64>,
quota_soft_overshoot_bytes: u64,
traffic_lease: Option<&Arc<TrafficLease>>,
bytes_me2c: &AtomicU64,
conn_id: u64,
ack_flush_immediate: bool,
batched: bool,
) -> Result<MeWriterResponseOutcome>
where where
W: AsyncWrite + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static,
{ {
@@ -2280,7 +2183,6 @@ where
}); });
} }
} }
wait_for_traffic_budget(traffic_lease, RateDirection::Down, data_len).await;
let write_mode = let write_mode =
match write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf) match write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
@@ -2318,7 +2220,6 @@ where
} else { } else {
trace!(conn_id, confirm, "ME->C quickack"); trace!(conn_id, confirm, "ME->C quickack");
} }
wait_for_traffic_budget(traffic_lease, RateDirection::Down, 4).await;
write_client_ack(client_writer, proto_tag, confirm).await?; write_client_ack(client_writer, proto_tag, confirm).await?;
stats.increment_me_d2c_ack_frames_total(); stats.increment_me_d2c_ack_frames_total();

View File

@@ -68,7 +68,6 @@ pub mod relay;
pub mod route_mode; pub mod route_mode;
pub mod session_eviction; pub mod session_eviction;
pub mod shared_state; pub mod shared_state;
pub mod traffic_limiter;
pub use client::ClientHandler; pub use client::ClientHandler;
#[allow(unused_imports)] #[allow(unused_imports)]

View File

@@ -52,7 +52,6 @@
//! - `SharedCounters` (atomics) let the watchdog read stats without locking //! - `SharedCounters` (atomics) let the watchdog read stats without locking
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::proxy::traffic_limiter::{RateDirection, TrafficLease, next_refill_delay};
use crate::stats::{Stats, UserStats}; use crate::stats::{Stats, UserStats};
use crate::stream::BufferPool; use crate::stream::BufferPool;
use std::io; use std::io;
@@ -62,7 +61,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf, copy_bidirectional_with_sizes}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf, copy_bidirectional_with_sizes};
use tokio::time::{Instant, Sleep}; use tokio::time::Instant;
use tracing::{debug, trace, warn}; use tracing::{debug, trace, warn};
// ============= Constants ============= // ============= Constants =============
@@ -211,24 +210,12 @@ struct StatsIo<S> {
stats: Arc<Stats>, stats: Arc<Stats>,
user: String, user: String,
user_stats: Arc<UserStats>, user_stats: Arc<UserStats>,
traffic_lease: Option<Arc<TrafficLease>>,
c2s_rate_debt_bytes: u64,
c2s_wait: RateWaitState,
s2c_wait: RateWaitState,
quota_limit: Option<u64>, quota_limit: Option<u64>,
quota_exceeded: Arc<AtomicBool>, quota_exceeded: Arc<AtomicBool>,
quota_bytes_since_check: u64, quota_bytes_since_check: u64,
epoch: Instant, epoch: Instant,
} }
#[derive(Default)]
struct RateWaitState {
sleep: Option<Pin<Box<Sleep>>>,
started_at: Option<Instant>,
blocked_user: bool,
blocked_cidr: bool,
}
impl<S> StatsIo<S> { impl<S> StatsIo<S> {
fn new( fn new(
inner: S, inner: S,
@@ -238,28 +225,6 @@ impl<S> StatsIo<S> {
quota_limit: Option<u64>, quota_limit: Option<u64>,
quota_exceeded: Arc<AtomicBool>, quota_exceeded: Arc<AtomicBool>,
epoch: Instant, epoch: Instant,
) -> Self {
Self::new_with_traffic_lease(
inner,
counters,
stats,
user,
None,
quota_limit,
quota_exceeded,
epoch,
)
}
fn new_with_traffic_lease(
inner: S,
counters: Arc<SharedCounters>,
stats: Arc<Stats>,
user: String,
traffic_lease: Option<Arc<TrafficLease>>,
quota_limit: Option<u64>,
quota_exceeded: Arc<AtomicBool>,
epoch: Instant,
) -> Self { ) -> Self {
// Mark initial activity so the watchdog doesn't fire before data flows // Mark initial activity so the watchdog doesn't fire before data flows
counters.touch(Instant::now(), epoch); counters.touch(Instant::now(), epoch);
@@ -270,97 +235,12 @@ impl<S> StatsIo<S> {
stats, stats,
user, user,
user_stats, user_stats,
traffic_lease,
c2s_rate_debt_bytes: 0,
c2s_wait: RateWaitState::default(),
s2c_wait: RateWaitState::default(),
quota_limit, quota_limit,
quota_exceeded, quota_exceeded,
quota_bytes_since_check: 0, quota_bytes_since_check: 0,
epoch, epoch,
} }
} }
fn record_wait(
wait: &mut RateWaitState,
lease: Option<&Arc<TrafficLease>>,
direction: RateDirection,
) {
let Some(started_at) = wait.started_at.take() else {
return;
};
let wait_ms = started_at
.elapsed()
.as_millis()
.min(u128::from(u64::MAX)) as u64;
if let Some(lease) = lease {
lease.observe_wait_ms(
direction,
wait.blocked_user,
wait.blocked_cidr,
wait_ms,
);
}
wait.blocked_user = false;
wait.blocked_cidr = false;
}
fn arm_wait(wait: &mut RateWaitState, blocked_user: bool, blocked_cidr: bool) {
if wait.sleep.is_none() {
wait.sleep = Some(Box::pin(tokio::time::sleep(next_refill_delay())));
wait.started_at = Some(Instant::now());
}
wait.blocked_user |= blocked_user;
wait.blocked_cidr |= blocked_cidr;
}
fn poll_wait(
wait: &mut RateWaitState,
cx: &mut Context<'_>,
lease: Option<&Arc<TrafficLease>>,
direction: RateDirection,
) -> Poll<()> {
let Some(sleep) = wait.sleep.as_mut() else {
return Poll::Ready(());
};
if sleep.as_mut().poll(cx).is_pending() {
return Poll::Pending;
}
wait.sleep = None;
Self::record_wait(wait, lease, direction);
Poll::Ready(())
}
fn settle_c2s_rate_debt(&mut self, cx: &mut Context<'_>) -> Poll<()> {
let Some(lease) = self.traffic_lease.as_ref() else {
self.c2s_rate_debt_bytes = 0;
return Poll::Ready(());
};
while self.c2s_rate_debt_bytes > 0 {
let consume = lease.try_consume(RateDirection::Up, self.c2s_rate_debt_bytes);
if consume.granted > 0 {
self.c2s_rate_debt_bytes =
self.c2s_rate_debt_bytes.saturating_sub(consume.granted);
continue;
}
Self::arm_wait(
&mut self.c2s_wait,
consume.blocked_user,
consume.blocked_cidr,
);
if Self::poll_wait(&mut self.c2s_wait, cx, Some(lease), RateDirection::Up).is_pending()
{
return Poll::Pending;
}
}
if Self::poll_wait(&mut self.c2s_wait, cx, Some(lease), RateDirection::Up).is_pending() {
return Poll::Pending;
}
Poll::Ready(())
}
} }
#[derive(Debug)] #[derive(Debug)]
@@ -406,25 +286,6 @@ fn should_immediate_quota_check(remaining_before: u64, charge_bytes: u64) -> boo
remaining_before <= QUOTA_NEAR_LIMIT_BYTES || charge_bytes >= QUOTA_LARGE_CHARGE_BYTES remaining_before <= QUOTA_NEAR_LIMIT_BYTES || charge_bytes >= QUOTA_LARGE_CHARGE_BYTES
} }
fn refund_reserved_quota_bytes(user_stats: &UserStats, reserved_bytes: u64) {
if reserved_bytes == 0 {
return;
}
let mut current = user_stats.quota_used.load(Ordering::Relaxed);
loop {
let next = current.saturating_sub(reserved_bytes);
match user_stats.quota_used.compare_exchange_weak(
current,
next,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return,
Err(observed) => current = observed,
}
}
}
impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> { impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
@@ -435,9 +296,6 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
if this.quota_exceeded.load(Ordering::Acquire) { if this.quota_exceeded.load(Ordering::Acquire) {
return Poll::Ready(Err(quota_io_error())); return Poll::Ready(Err(quota_io_error()));
} }
if this.settle_c2s_rate_debt(cx).is_pending() {
return Poll::Pending;
}
let mut remaining_before = None; let mut remaining_before = None;
if let Some(limit) = this.quota_limit { if let Some(limit) = this.quota_limit {
@@ -519,11 +377,6 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
.add_user_octets_from_handle(this.user_stats.as_ref(), n_to_charge); .add_user_octets_from_handle(this.user_stats.as_ref(), n_to_charge);
this.stats this.stats
.increment_user_msgs_from_handle(this.user_stats.as_ref()); .increment_user_msgs_from_handle(this.user_stats.as_ref());
if this.traffic_lease.is_some() {
this.c2s_rate_debt_bytes =
this.c2s_rate_debt_bytes.saturating_add(n_to_charge);
let _ = this.settle_c2s_rate_debt(cx);
}
trace!(user = %this.user, bytes = n, "C->S"); trace!(user = %this.user, bytes = n, "C->S");
} }
@@ -545,66 +398,28 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
return Poll::Ready(Err(quota_io_error())); return Poll::Ready(Err(quota_io_error()));
} }
let mut shaper_reserved_bytes = 0u64;
let mut write_buf = buf;
if let Some(lease) = this.traffic_lease.as_ref() {
if !buf.is_empty() {
loop {
let consume = lease.try_consume(RateDirection::Down, buf.len() as u64);
if consume.granted > 0 {
shaper_reserved_bytes = consume.granted;
if consume.granted < buf.len() as u64 {
write_buf = &buf[..consume.granted as usize];
}
let _ = Self::poll_wait(
&mut this.s2c_wait,
cx,
Some(lease),
RateDirection::Down,
);
break;
}
Self::arm_wait(
&mut this.s2c_wait,
consume.blocked_user,
consume.blocked_cidr,
);
if Self::poll_wait(&mut this.s2c_wait, cx, Some(lease), RateDirection::Down)
.is_pending()
{
return Poll::Pending;
}
}
} else {
let _ = Self::poll_wait(&mut this.s2c_wait, cx, Some(lease), RateDirection::Down);
}
}
let mut remaining_before = None; let mut remaining_before = None;
let mut reserved_bytes = 0u64; let mut reserved_bytes = 0u64;
let mut write_buf = buf;
if let Some(limit) = this.quota_limit { if let Some(limit) = this.quota_limit {
if !write_buf.is_empty() { if !buf.is_empty() {
let mut reserve_rounds = 0usize; let mut reserve_rounds = 0usize;
while reserved_bytes == 0 { while reserved_bytes == 0 {
let used_before = this.user_stats.quota_used(); let used_before = this.user_stats.quota_used();
let remaining = limit.saturating_sub(used_before); let remaining = limit.saturating_sub(used_before);
if remaining == 0 { if remaining == 0 {
if let Some(lease) = this.traffic_lease.as_ref() {
lease.refund(RateDirection::Down, shaper_reserved_bytes);
}
this.quota_exceeded.store(true, Ordering::Release); this.quota_exceeded.store(true, Ordering::Release);
return Poll::Ready(Err(quota_io_error())); return Poll::Ready(Err(quota_io_error()));
} }
remaining_before = Some(remaining); remaining_before = Some(remaining);
let desired = remaining.min(write_buf.len() as u64); let desired = remaining.min(buf.len() as u64);
let mut saw_contention = false; let mut saw_contention = false;
for _ in 0..QUOTA_RESERVE_SPIN_RETRIES { for _ in 0..QUOTA_RESERVE_SPIN_RETRIES {
match this.user_stats.quota_try_reserve(desired, limit) { match this.user_stats.quota_try_reserve(desired, limit) {
Ok(_) => { Ok(_) => {
reserved_bytes = desired; reserved_bytes = desired;
write_buf = &write_buf[..desired as usize]; write_buf = &buf[..desired as usize];
break; break;
} }
Err(crate::stats::QuotaReserveError::LimitExceeded) => { Err(crate::stats::QuotaReserveError::LimitExceeded) => {
@@ -619,9 +434,6 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
if reserved_bytes == 0 { if reserved_bytes == 0 {
reserve_rounds = reserve_rounds.saturating_add(1); reserve_rounds = reserve_rounds.saturating_add(1);
if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS { if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS {
if let Some(lease) = this.traffic_lease.as_ref() {
lease.refund(RateDirection::Down, shaper_reserved_bytes);
}
this.quota_exceeded.store(true, Ordering::Release); this.quota_exceeded.store(true, Ordering::Release);
return Poll::Ready(Err(quota_io_error())); return Poll::Ready(Err(quota_io_error()));
} }
@@ -634,9 +446,6 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
let used_before = this.user_stats.quota_used(); let used_before = this.user_stats.quota_used();
let remaining = limit.saturating_sub(used_before); let remaining = limit.saturating_sub(used_before);
if remaining == 0 { if remaining == 0 {
if let Some(lease) = this.traffic_lease.as_ref() {
lease.refund(RateDirection::Down, shaper_reserved_bytes);
}
this.quota_exceeded.store(true, Ordering::Release); this.quota_exceeded.store(true, Ordering::Release);
return Poll::Ready(Err(quota_io_error())); return Poll::Ready(Err(quota_io_error()));
} }
@@ -647,17 +456,23 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
match Pin::new(&mut this.inner).poll_write(cx, write_buf) { match Pin::new(&mut this.inner).poll_write(cx, write_buf) {
Poll::Ready(Ok(n)) => { Poll::Ready(Ok(n)) => {
if reserved_bytes > n as u64 { if reserved_bytes > n as u64 {
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_bytes - n as u64); let refund = reserved_bytes - n as u64;
let mut current = this.user_stats.quota_used.load(Ordering::Relaxed);
loop {
let next = current.saturating_sub(refund);
match this.user_stats.quota_used.compare_exchange_weak(
current,
next,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(observed) => current = observed,
} }
if shaper_reserved_bytes > n as u64
&& let Some(lease) = this.traffic_lease.as_ref()
{
lease.refund(RateDirection::Down, shaper_reserved_bytes - n as u64);
} }
}
if n > 0 { if n > 0 {
if let Some(lease) = this.traffic_lease.as_ref() {
Self::record_wait(&mut this.s2c_wait, Some(lease), RateDirection::Down);
}
let n_to_charge = n as u64; let n_to_charge = n as u64;
// S→C: data written to client // S→C: data written to client
@@ -697,23 +512,37 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
} }
Poll::Ready(Err(err)) => { Poll::Ready(Err(err)) => {
if reserved_bytes > 0 { if reserved_bytes > 0 {
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_bytes); let mut current = this.user_stats.quota_used.load(Ordering::Relaxed);
loop {
let next = current.saturating_sub(reserved_bytes);
match this.user_stats.quota_used.compare_exchange_weak(
current,
next,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(observed) => current = observed,
}
} }
if shaper_reserved_bytes > 0
&& let Some(lease) = this.traffic_lease.as_ref()
{
lease.refund(RateDirection::Down, shaper_reserved_bytes);
} }
Poll::Ready(Err(err)) Poll::Ready(Err(err))
} }
Poll::Pending => { Poll::Pending => {
if reserved_bytes > 0 { if reserved_bytes > 0 {
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_bytes); let mut current = this.user_stats.quota_used.load(Ordering::Relaxed);
loop {
let next = current.saturating_sub(reserved_bytes);
match this.user_stats.quota_used.compare_exchange_weak(
current,
next,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(observed) => current = observed,
}
} }
if shaper_reserved_bytes > 0
&& let Some(lease) = this.traffic_lease.as_ref()
{
lease.refund(RateDirection::Down, shaper_reserved_bytes);
} }
Poll::Pending Poll::Pending
} }
@@ -798,43 +627,6 @@ pub async fn relay_bidirectional_with_activity_timeout<CR, CW, SR, SW>(
_buffer_pool: Arc<BufferPool>, _buffer_pool: Arc<BufferPool>,
activity_timeout: Duration, activity_timeout: Duration,
) -> Result<()> ) -> Result<()>
where
CR: AsyncRead + Unpin + Send + 'static,
CW: AsyncWrite + Unpin + Send + 'static,
SR: AsyncRead + Unpin + Send + 'static,
SW: AsyncWrite + Unpin + Send + 'static,
{
relay_bidirectional_with_activity_timeout_and_lease(
client_reader,
client_writer,
server_reader,
server_writer,
c2s_buf_size,
s2c_buf_size,
user,
stats,
quota_limit,
_buffer_pool,
None,
activity_timeout,
)
.await
}
pub async fn relay_bidirectional_with_activity_timeout_and_lease<CR, CW, SR, SW>(
client_reader: CR,
client_writer: CW,
server_reader: SR,
server_writer: SW,
c2s_buf_size: usize,
s2c_buf_size: usize,
user: &str,
stats: Arc<Stats>,
quota_limit: Option<u64>,
_buffer_pool: Arc<BufferPool>,
traffic_lease: Option<Arc<TrafficLease>>,
activity_timeout: Duration,
) -> Result<()>
where where
CR: AsyncRead + Unpin + Send + 'static, CR: AsyncRead + Unpin + Send + 'static,
CW: AsyncWrite + Unpin + Send + 'static, CW: AsyncWrite + Unpin + Send + 'static,
@@ -852,12 +644,11 @@ where
let mut server = CombinedStream::new(server_reader, server_writer); let mut server = CombinedStream::new(server_reader, server_writer);
// Wrap client with stats/activity tracking // Wrap client with stats/activity tracking
let mut client = StatsIo::new_with_traffic_lease( let mut client = StatsIo::new(
client_combined, client_combined,
Arc::clone(&counters), Arc::clone(&counters),
Arc::clone(&stats), Arc::clone(&stats),
user_owned.clone(), user_owned.clone(),
traffic_lease,
quota_limit, quota_limit,
Arc::clone(&quota_exceeded), Arc::clone(&quota_exceeded),
epoch, epoch,

View File

@@ -10,7 +10,6 @@ use tokio::sync::mpsc;
use crate::proxy::handshake::{AuthProbeSaturationState, AuthProbeState}; use crate::proxy::handshake::{AuthProbeSaturationState, AuthProbeState};
use crate::proxy::middle_relay::{DesyncDedupRotationState, RelayIdleCandidateRegistry}; use crate::proxy::middle_relay::{DesyncDedupRotationState, RelayIdleCandidateRegistry};
use crate::proxy::traffic_limiter::TrafficLimiter;
const HANDSHAKE_RECENT_USER_RING_LEN: usize = 64; const HANDSHAKE_RECENT_USER_RING_LEN: usize = 64;
@@ -66,7 +65,6 @@ pub(crate) struct MiddleRelaySharedState {
pub(crate) struct ProxySharedState { pub(crate) struct ProxySharedState {
pub(crate) handshake: HandshakeSharedState, pub(crate) handshake: HandshakeSharedState,
pub(crate) middle_relay: MiddleRelaySharedState, pub(crate) middle_relay: MiddleRelaySharedState,
pub(crate) traffic_limiter: Arc<TrafficLimiter>,
pub(crate) conntrack_pressure_active: AtomicBool, pub(crate) conntrack_pressure_active: AtomicBool,
pub(crate) conntrack_close_tx: Mutex<Option<mpsc::Sender<ConntrackCloseEvent>>>, pub(crate) conntrack_close_tx: Mutex<Option<mpsc::Sender<ConntrackCloseEvent>>>,
} }
@@ -100,7 +98,6 @@ impl ProxySharedState {
relay_idle_registry: Mutex::new(RelayIdleCandidateRegistry::default()), relay_idle_registry: Mutex::new(RelayIdleCandidateRegistry::default()),
relay_idle_mark_seq: AtomicU64::new(0), relay_idle_mark_seq: AtomicU64::new(0),
}, },
traffic_limiter: TrafficLimiter::new(),
conntrack_pressure_active: AtomicBool::new(false), conntrack_pressure_active: AtomicBool::new(false),
conntrack_close_tx: Mutex::new(None), conntrack_close_tx: Mutex::new(None),
}) })

View File

@@ -1,847 +0,0 @@
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::net::IpAddr;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use arc_swap::ArcSwap;
use dashmap::DashMap;
use ipnetwork::IpNetwork;
use crate::config::RateLimitBps;
const REGISTRY_SHARDS: usize = 64;
const FAIR_EPOCH_MS: u64 = 20;
const MAX_BORROW_CHUNK_BYTES: u64 = 32 * 1024;
const CLEANUP_INTERVAL_SECS: u64 = 60;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RateDirection {
Up,
Down,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TrafficConsumeResult {
pub granted: u64,
pub blocked_user: bool,
pub blocked_cidr: bool,
}
#[derive(Debug, Clone, Copy)]
pub struct TrafficLimiterMetricsSnapshot {
pub user_throttle_up_total: u64,
pub user_throttle_down_total: u64,
pub cidr_throttle_up_total: u64,
pub cidr_throttle_down_total: u64,
pub user_wait_up_ms_total: u64,
pub user_wait_down_ms_total: u64,
pub cidr_wait_up_ms_total: u64,
pub cidr_wait_down_ms_total: u64,
pub user_active_leases: u64,
pub cidr_active_leases: u64,
pub user_policy_entries: u64,
pub cidr_policy_entries: u64,
}
#[derive(Default)]
struct ScopeMetrics {
throttle_up_total: AtomicU64,
throttle_down_total: AtomicU64,
wait_up_ms_total: AtomicU64,
wait_down_ms_total: AtomicU64,
active_leases: AtomicU64,
policy_entries: AtomicU64,
}
impl ScopeMetrics {
fn throttle(&self, direction: RateDirection) {
match direction {
RateDirection::Up => {
self.throttle_up_total.fetch_add(1, Ordering::Relaxed);
}
RateDirection::Down => {
self.throttle_down_total.fetch_add(1, Ordering::Relaxed);
}
}
}
fn wait_ms(&self, direction: RateDirection, wait_ms: u64) {
match direction {
RateDirection::Up => {
self.wait_up_ms_total.fetch_add(wait_ms, Ordering::Relaxed);
}
RateDirection::Down => {
self.wait_down_ms_total.fetch_add(wait_ms, Ordering::Relaxed);
}
}
}
}
#[derive(Default)]
struct AtomicRatePair {
up_bps: AtomicU64,
down_bps: AtomicU64,
}
impl AtomicRatePair {
fn set(&self, limits: RateLimitBps) {
self.up_bps.store(limits.up_bps, Ordering::Relaxed);
self.down_bps.store(limits.down_bps, Ordering::Relaxed);
}
fn get(&self, direction: RateDirection) -> u64 {
match direction {
RateDirection::Up => self.up_bps.load(Ordering::Relaxed),
RateDirection::Down => self.down_bps.load(Ordering::Relaxed),
}
}
}
#[derive(Default)]
struct DirectionBucket {
epoch: AtomicU64,
used: AtomicU64,
}
impl DirectionBucket {
fn sync_epoch(&self, epoch: u64) {
let current = self.epoch.load(Ordering::Relaxed);
if current == epoch {
return;
}
if current < epoch
&& self
.epoch
.compare_exchange(current, epoch, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.used.store(0, Ordering::Relaxed);
}
}
fn try_consume(&self, cap_bps: u64, requested: u64) -> u64 {
if requested == 0 {
return 0;
}
if cap_bps == 0 {
return requested;
}
let epoch = current_epoch();
self.sync_epoch(epoch);
let cap_epoch = bytes_per_epoch(cap_bps);
loop {
let used = self.used.load(Ordering::Relaxed);
if used >= cap_epoch {
return 0;
}
let remaining = cap_epoch.saturating_sub(used);
let grant = requested.min(remaining);
if grant == 0 {
return 0;
}
let next = used.saturating_add(grant);
if self
.used
.compare_exchange_weak(used, next, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
return grant;
}
}
}
fn refund(&self, bytes: u64) {
if bytes == 0 {
return;
}
decrement_atomic_saturating(&self.used, bytes);
}
}
struct UserBucket {
rates: AtomicRatePair,
up: DirectionBucket,
down: DirectionBucket,
active_leases: AtomicU64,
}
impl UserBucket {
fn new(limits: RateLimitBps) -> Self {
let rates = AtomicRatePair::default();
rates.set(limits);
Self {
rates,
up: DirectionBucket::default(),
down: DirectionBucket::default(),
active_leases: AtomicU64::new(0),
}
}
fn set_rates(&self, limits: RateLimitBps) {
self.rates.set(limits);
}
fn try_consume(&self, direction: RateDirection, requested: u64) -> u64 {
let cap_bps = self.rates.get(direction);
match direction {
RateDirection::Up => self.up.try_consume(cap_bps, requested),
RateDirection::Down => self.down.try_consume(cap_bps, requested),
}
}
fn refund(&self, direction: RateDirection, bytes: u64) {
match direction {
RateDirection::Up => self.up.refund(bytes),
RateDirection::Down => self.down.refund(bytes),
}
}
}
#[derive(Default)]
struct CidrDirectionBucket {
epoch: AtomicU64,
used: AtomicU64,
active_users: AtomicU64,
}
impl CidrDirectionBucket {
fn sync_epoch(&self, epoch: u64) {
let current = self.epoch.load(Ordering::Relaxed);
if current == epoch {
return;
}
if current < epoch
&& self
.epoch
.compare_exchange(current, epoch, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.used.store(0, Ordering::Relaxed);
self.active_users.store(0, Ordering::Relaxed);
}
}
fn try_consume(
&self,
user_state: &CidrUserDirectionState,
cap_epoch: u64,
requested: u64,
) -> u64 {
if requested == 0 || cap_epoch == 0 {
return 0;
}
let epoch = current_epoch();
self.sync_epoch(epoch);
user_state.sync_epoch_and_mark_active(epoch, &self.active_users);
let active_users = self.active_users.load(Ordering::Relaxed).max(1);
let fair_share = cap_epoch.saturating_div(active_users).max(1);
loop {
let total_used = self.used.load(Ordering::Relaxed);
if total_used >= cap_epoch {
return 0;
}
let total_remaining = cap_epoch.saturating_sub(total_used);
let user_used = user_state.used.load(Ordering::Relaxed);
let guaranteed_remaining = fair_share.saturating_sub(user_used);
let grant = if guaranteed_remaining > 0 {
requested.min(guaranteed_remaining).min(total_remaining)
} else {
requested
.min(total_remaining)
.min(MAX_BORROW_CHUNK_BYTES)
};
if grant == 0 {
return 0;
}
let next_total = total_used.saturating_add(grant);
if self
.used
.compare_exchange_weak(
total_used,
next_total,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
user_state.used.fetch_add(grant, Ordering::Relaxed);
return grant;
}
}
}
fn refund(&self, bytes: u64) {
if bytes == 0 {
return;
}
decrement_atomic_saturating(&self.used, bytes);
}
}
#[derive(Default)]
struct CidrUserDirectionState {
epoch: AtomicU64,
used: AtomicU64,
}
impl CidrUserDirectionState {
fn sync_epoch_and_mark_active(&self, epoch: u64, active_users: &AtomicU64) {
let current = self.epoch.load(Ordering::Relaxed);
if current == epoch {
return;
}
if current < epoch
&& self
.epoch
.compare_exchange(current, epoch, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.used.store(0, Ordering::Relaxed);
active_users.fetch_add(1, Ordering::Relaxed);
}
}
fn refund(&self, bytes: u64) {
if bytes == 0 {
return;
}
decrement_atomic_saturating(&self.used, bytes);
}
}
struct CidrUserShare {
active_conns: AtomicU64,
up: CidrUserDirectionState,
down: CidrUserDirectionState,
}
impl CidrUserShare {
fn new() -> Self {
Self {
active_conns: AtomicU64::new(0),
up: CidrUserDirectionState::default(),
down: CidrUserDirectionState::default(),
}
}
}
struct CidrBucket {
rates: AtomicRatePair,
up: CidrDirectionBucket,
down: CidrDirectionBucket,
users: ShardedRegistry<CidrUserShare>,
active_leases: AtomicU64,
}
impl CidrBucket {
fn new(limits: RateLimitBps) -> Self {
let rates = AtomicRatePair::default();
rates.set(limits);
Self {
rates,
up: CidrDirectionBucket::default(),
down: CidrDirectionBucket::default(),
users: ShardedRegistry::new(REGISTRY_SHARDS),
active_leases: AtomicU64::new(0),
}
}
fn set_rates(&self, limits: RateLimitBps) {
self.rates.set(limits);
}
fn acquire_user_share(&self, user: &str) -> Arc<CidrUserShare> {
let share = self.users.get_or_insert_with(user, CidrUserShare::new);
share.active_conns.fetch_add(1, Ordering::Relaxed);
share
}
fn release_user_share(&self, user: &str, share: &Arc<CidrUserShare>) {
decrement_atomic_saturating(&share.active_conns, 1);
let share_for_remove = Arc::clone(share);
let _ = self.users.remove_if(user, |candidate| {
Arc::ptr_eq(candidate, &share_for_remove)
&& candidate.active_conns.load(Ordering::Relaxed) == 0
});
}
fn try_consume_for_user(
&self,
direction: RateDirection,
share: &CidrUserShare,
requested: u64,
) -> u64 {
let cap_bps = self.rates.get(direction);
if cap_bps == 0 {
return requested;
}
let cap_epoch = bytes_per_epoch(cap_bps);
match direction {
RateDirection::Up => self.up.try_consume(&share.up, cap_epoch, requested),
RateDirection::Down => self.down.try_consume(&share.down, cap_epoch, requested),
}
}
fn refund_for_user(&self, direction: RateDirection, share: &CidrUserShare, bytes: u64) {
match direction {
RateDirection::Up => {
self.up.refund(bytes);
share.up.refund(bytes);
}
RateDirection::Down => {
self.down.refund(bytes);
share.down.refund(bytes);
}
}
}
fn cleanup_idle_users(&self) {
self.users
.retain(|_, share| share.active_conns.load(Ordering::Relaxed) > 0);
}
}
#[derive(Clone)]
struct CidrRule {
key: String,
cidr: IpNetwork,
limits: RateLimitBps,
prefix_len: u8,
}
#[derive(Default)]
struct PolicySnapshot {
user_limits: HashMap<String, RateLimitBps>,
cidr_rules_v4: Vec<CidrRule>,
cidr_rules_v6: Vec<CidrRule>,
cidr_rule_keys: HashSet<String>,
}
impl PolicySnapshot {
fn match_cidr(&self, ip: IpAddr) -> Option<&CidrRule> {
match ip {
IpAddr::V4(_) => self.cidr_rules_v4.iter().find(|rule| rule.cidr.contains(ip)),
IpAddr::V6(_) => self.cidr_rules_v6.iter().find(|rule| rule.cidr.contains(ip)),
}
}
}
struct ShardedRegistry<T> {
shards: Box<[DashMap<String, Arc<T>>]>,
mask: usize,
}
impl<T> ShardedRegistry<T> {
fn new(shards: usize) -> Self {
let shard_count = shards.max(1).next_power_of_two();
let mut items = Vec::with_capacity(shard_count);
for _ in 0..shard_count {
items.push(DashMap::<String, Arc<T>>::new());
}
Self {
shards: items.into_boxed_slice(),
mask: shard_count.saturating_sub(1),
}
}
fn shard_index(&self, key: &str) -> usize {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
key.hash(&mut hasher);
(hasher.finish() as usize) & self.mask
}
fn get_or_insert_with<F>(&self, key: &str, make: F) -> Arc<T>
where
F: FnOnce() -> T,
{
let shard = &self.shards[self.shard_index(key)];
match shard.entry(key.to_string()) {
dashmap::mapref::entry::Entry::Occupied(entry) => Arc::clone(entry.get()),
dashmap::mapref::entry::Entry::Vacant(slot) => {
let value = Arc::new(make());
slot.insert(Arc::clone(&value));
value
}
}
}
fn retain<F>(&self, predicate: F)
where
F: Fn(&String, &Arc<T>) -> bool + Copy,
{
for shard in &*self.shards {
shard.retain(|key, value| predicate(key, value));
}
}
fn remove_if<F>(&self, key: &str, predicate: F) -> bool
where
F: Fn(&Arc<T>) -> bool,
{
let shard = &self.shards[self.shard_index(key)];
let should_remove = match shard.get(key) {
Some(entry) => predicate(entry.value()),
None => false,
};
if !should_remove {
return false;
}
shard.remove(key).is_some()
}
}
pub struct TrafficLease {
limiter: Arc<TrafficLimiter>,
user_bucket: Option<Arc<UserBucket>>,
cidr_bucket: Option<Arc<CidrBucket>>,
cidr_user_key: Option<String>,
cidr_user_share: Option<Arc<CidrUserShare>>,
}
impl TrafficLease {
pub fn try_consume(&self, direction: RateDirection, requested: u64) -> TrafficConsumeResult {
if requested == 0 {
return TrafficConsumeResult {
granted: 0,
blocked_user: false,
blocked_cidr: false,
};
}
let mut granted = requested;
if let Some(user_bucket) = self.user_bucket.as_ref() {
let user_granted = user_bucket.try_consume(direction, granted);
if user_granted == 0 {
self.limiter.observe_throttle(direction, true, false);
return TrafficConsumeResult {
granted: 0,
blocked_user: true,
blocked_cidr: false,
};
}
granted = user_granted;
}
if let (Some(cidr_bucket), Some(cidr_user_share)) =
(self.cidr_bucket.as_ref(), self.cidr_user_share.as_ref())
{
let cidr_granted = cidr_bucket.try_consume_for_user(direction, cidr_user_share, granted);
if cidr_granted < granted
&& let Some(user_bucket) = self.user_bucket.as_ref()
{
user_bucket.refund(direction, granted.saturating_sub(cidr_granted));
}
if cidr_granted == 0 {
self.limiter.observe_throttle(direction, false, true);
return TrafficConsumeResult {
granted: 0,
blocked_user: false,
blocked_cidr: true,
};
}
granted = cidr_granted;
}
TrafficConsumeResult {
granted,
blocked_user: false,
blocked_cidr: false,
}
}
pub fn refund(&self, direction: RateDirection, bytes: u64) {
if bytes == 0 {
return;
}
if let Some(user_bucket) = self.user_bucket.as_ref() {
user_bucket.refund(direction, bytes);
}
if let (Some(cidr_bucket), Some(cidr_user_share)) =
(self.cidr_bucket.as_ref(), self.cidr_user_share.as_ref())
{
cidr_bucket.refund_for_user(direction, cidr_user_share, bytes);
}
}
pub fn observe_wait_ms(
&self,
direction: RateDirection,
blocked_user: bool,
blocked_cidr: bool,
wait_ms: u64,
) {
if wait_ms == 0 {
return;
}
self.limiter
.observe_wait(direction, blocked_user, blocked_cidr, wait_ms);
}
}
impl Drop for TrafficLease {
fn drop(&mut self) {
if let Some(bucket) = self.user_bucket.as_ref() {
decrement_atomic_saturating(&bucket.active_leases, 1);
decrement_atomic_saturating(&self.limiter.user_scope.active_leases, 1);
}
if let Some(bucket) = self.cidr_bucket.as_ref() {
if let (Some(user_key), Some(share)) =
(self.cidr_user_key.as_ref(), self.cidr_user_share.as_ref())
{
bucket.release_user_share(user_key, share);
}
decrement_atomic_saturating(&bucket.active_leases, 1);
decrement_atomic_saturating(&self.limiter.cidr_scope.active_leases, 1);
}
}
}
pub struct TrafficLimiter {
policy: ArcSwap<PolicySnapshot>,
user_buckets: ShardedRegistry<UserBucket>,
cidr_buckets: ShardedRegistry<CidrBucket>,
user_scope: ScopeMetrics,
cidr_scope: ScopeMetrics,
last_cleanup_epoch_secs: AtomicU64,
}
impl TrafficLimiter {
pub fn new() -> Arc<Self> {
Arc::new(Self {
policy: ArcSwap::from_pointee(PolicySnapshot::default()),
user_buckets: ShardedRegistry::new(REGISTRY_SHARDS),
cidr_buckets: ShardedRegistry::new(REGISTRY_SHARDS),
user_scope: ScopeMetrics::default(),
cidr_scope: ScopeMetrics::default(),
last_cleanup_epoch_secs: AtomicU64::new(0),
})
}
pub fn apply_policy(
&self,
user_limits: HashMap<String, RateLimitBps>,
cidr_limits: HashMap<IpNetwork, RateLimitBps>,
) {
let filtered_users = user_limits
.into_iter()
.filter(|(_, limit)| limit.up_bps > 0 || limit.down_bps > 0)
.collect::<HashMap<_, _>>();
let mut cidr_rules_v4 = Vec::new();
let mut cidr_rules_v6 = Vec::new();
let mut cidr_rule_keys = HashSet::new();
for (cidr, limits) in cidr_limits {
if limits.up_bps == 0 && limits.down_bps == 0 {
continue;
}
let key = cidr.to_string();
let rule = CidrRule {
key: key.clone(),
cidr,
limits,
prefix_len: cidr.prefix(),
};
cidr_rule_keys.insert(key);
match rule.cidr {
IpNetwork::V4(_) => cidr_rules_v4.push(rule),
IpNetwork::V6(_) => cidr_rules_v6.push(rule),
}
}
cidr_rules_v4.sort_by(|a, b| b.prefix_len.cmp(&a.prefix_len));
cidr_rules_v6.sort_by(|a, b| b.prefix_len.cmp(&a.prefix_len));
self.user_scope
.policy_entries
.store(filtered_users.len() as u64, Ordering::Relaxed);
self.cidr_scope
.policy_entries
.store(cidr_rule_keys.len() as u64, Ordering::Relaxed);
self.policy.store(Arc::new(PolicySnapshot {
user_limits: filtered_users,
cidr_rules_v4,
cidr_rules_v6,
cidr_rule_keys,
}));
self.maybe_cleanup();
}
pub fn acquire_lease(
self: &Arc<Self>,
user: &str,
client_ip: IpAddr,
) -> Option<Arc<TrafficLease>> {
let policy = self.policy.load_full();
let mut user_bucket = None;
if let Some(limit) = policy.user_limits.get(user).copied() {
let bucket = self
.user_buckets
.get_or_insert_with(user, || UserBucket::new(limit));
bucket.set_rates(limit);
bucket.active_leases.fetch_add(1, Ordering::Relaxed);
self.user_scope.active_leases.fetch_add(1, Ordering::Relaxed);
user_bucket = Some(bucket);
}
let mut cidr_bucket = None;
let mut cidr_user_key = None;
let mut cidr_user_share = None;
if let Some(rule) = policy.match_cidr(client_ip) {
let bucket = self
.cidr_buckets
.get_or_insert_with(rule.key.as_str(), || CidrBucket::new(rule.limits));
bucket.set_rates(rule.limits);
bucket.active_leases.fetch_add(1, Ordering::Relaxed);
self.cidr_scope.active_leases.fetch_add(1, Ordering::Relaxed);
let share = bucket.acquire_user_share(user);
cidr_user_key = Some(user.to_string());
cidr_user_share = Some(share);
cidr_bucket = Some(bucket);
}
if user_bucket.is_none() && cidr_bucket.is_none() {
return None;
}
self.maybe_cleanup();
Some(Arc::new(TrafficLease {
limiter: Arc::clone(self),
user_bucket,
cidr_bucket,
cidr_user_key,
cidr_user_share,
}))
}
pub fn metrics_snapshot(&self) -> TrafficLimiterMetricsSnapshot {
TrafficLimiterMetricsSnapshot {
user_throttle_up_total: self.user_scope.throttle_up_total.load(Ordering::Relaxed),
user_throttle_down_total: self.user_scope.throttle_down_total.load(Ordering::Relaxed),
cidr_throttle_up_total: self.cidr_scope.throttle_up_total.load(Ordering::Relaxed),
cidr_throttle_down_total: self.cidr_scope.throttle_down_total.load(Ordering::Relaxed),
user_wait_up_ms_total: self.user_scope.wait_up_ms_total.load(Ordering::Relaxed),
user_wait_down_ms_total: self.user_scope.wait_down_ms_total.load(Ordering::Relaxed),
cidr_wait_up_ms_total: self.cidr_scope.wait_up_ms_total.load(Ordering::Relaxed),
cidr_wait_down_ms_total: self.cidr_scope.wait_down_ms_total.load(Ordering::Relaxed),
user_active_leases: self.user_scope.active_leases.load(Ordering::Relaxed),
cidr_active_leases: self.cidr_scope.active_leases.load(Ordering::Relaxed),
user_policy_entries: self.user_scope.policy_entries.load(Ordering::Relaxed),
cidr_policy_entries: self.cidr_scope.policy_entries.load(Ordering::Relaxed),
}
}
fn observe_throttle(&self, direction: RateDirection, blocked_user: bool, blocked_cidr: bool) {
if blocked_user {
self.user_scope.throttle(direction);
}
if blocked_cidr {
self.cidr_scope.throttle(direction);
}
}
fn observe_wait(
&self,
direction: RateDirection,
blocked_user: bool,
blocked_cidr: bool,
wait_ms: u64,
) {
if blocked_user {
self.user_scope.wait_ms(direction, wait_ms);
}
if blocked_cidr {
self.cidr_scope.wait_ms(direction, wait_ms);
}
}
fn maybe_cleanup(&self) {
let now_epoch_secs = now_epoch_secs();
let last = self.last_cleanup_epoch_secs.load(Ordering::Relaxed);
if now_epoch_secs.saturating_sub(last) < CLEANUP_INTERVAL_SECS {
return;
}
if self
.last_cleanup_epoch_secs
.compare_exchange(last, now_epoch_secs, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
return;
}
let policy = self.policy.load_full();
self.user_buckets.retain(|user, bucket| {
bucket.active_leases.load(Ordering::Relaxed) > 0 || policy.user_limits.contains_key(user)
});
self.cidr_buckets.retain(|cidr_key, bucket| {
bucket.cleanup_idle_users();
bucket.active_leases.load(Ordering::Relaxed) > 0
|| policy.cidr_rule_keys.contains(cidr_key)
});
}
}
pub fn next_refill_delay() -> Duration {
let start = limiter_epoch_start();
let elapsed_ms = start.elapsed().as_millis() as u64;
let epoch_pos = elapsed_ms % FAIR_EPOCH_MS;
let wait_ms = FAIR_EPOCH_MS.saturating_sub(epoch_pos).max(1);
Duration::from_millis(wait_ms)
}
fn decrement_atomic_saturating(counter: &AtomicU64, by: u64) {
if by == 0 {
return;
}
let mut current = counter.load(Ordering::Relaxed);
loop {
if current == 0 {
return;
}
let next = current.saturating_sub(by);
match counter.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed) {
Ok(_) => return,
Err(actual) => current = actual,
}
}
}
fn now_epoch_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
fn bytes_per_epoch(bps: u64) -> u64 {
if bps == 0 {
return 0;
}
let numerator = bps.saturating_mul(FAIR_EPOCH_MS);
let bytes = numerator.saturating_div(8_000);
bytes.max(1)
}
fn current_epoch() -> u64 {
let start = limiter_epoch_start();
let elapsed_ms = start.elapsed().as_millis() as u64;
elapsed_ms / FAIR_EPOCH_MS
}
fn limiter_epoch_start() -> &'static Instant {
static START: OnceLock<Instant> = OnceLock::new();
START.get_or_init(Instant::now)
}

View File

@@ -11,7 +11,6 @@ use crc32fast::Hasher;
const MIN_APP_DATA: usize = 64; const MIN_APP_DATA: usize = 64;
const MAX_APP_DATA: usize = MAX_TLS_CIPHERTEXT_SIZE; const MAX_APP_DATA: usize = MAX_TLS_CIPHERTEXT_SIZE;
const MAX_TICKET_RECORDS: usize = 4;
fn jitter_and_clamp_sizes(sizes: &[usize], rng: &SecureRandom) -> Vec<usize> { fn jitter_and_clamp_sizes(sizes: &[usize], rng: &SecureRandom) -> Vec<usize> {
sizes sizes
@@ -63,51 +62,6 @@ fn ensure_payload_capacity(mut sizes: Vec<usize>, payload_len: usize) -> Vec<usi
sizes sizes
} }
fn emulated_app_data_sizes(cached: &CachedTlsData) -> Vec<usize> {
match cached.behavior_profile.source {
TlsProfileSource::Raw | TlsProfileSource::Merged => {
if !cached.behavior_profile.app_data_record_sizes.is_empty() {
return cached.behavior_profile.app_data_record_sizes.clone();
}
}
TlsProfileSource::Default | TlsProfileSource::Rustls => {}
}
let mut sizes = cached.app_data_records_sizes.clone();
if sizes.is_empty() {
sizes.push(cached.total_app_data_len.max(1024));
}
sizes
}
fn emulated_change_cipher_spec_count(cached: &CachedTlsData) -> usize {
usize::from(cached.behavior_profile.change_cipher_spec_count.max(1))
}
fn emulated_ticket_record_sizes(
cached: &CachedTlsData,
new_session_tickets: u8,
rng: &SecureRandom,
) -> Vec<usize> {
let mut sizes = match cached.behavior_profile.source {
TlsProfileSource::Raw | TlsProfileSource::Merged => {
cached.behavior_profile.ticket_record_sizes.clone()
}
TlsProfileSource::Default | TlsProfileSource::Rustls => Vec::new(),
};
let target_count = sizes
.len()
.max(usize::from(new_session_tickets.min(MAX_TICKET_RECORDS as u8)))
.min(MAX_TICKET_RECORDS);
while sizes.len() < target_count {
sizes.push(rng.range(48) + 48);
}
sizes
}
fn build_compact_cert_info_payload(cert_info: &ParsedCertificateInfo) -> Option<Vec<u8>> { fn build_compact_cert_info_payload(cert_info: &ParsedCertificateInfo) -> Option<Vec<u8>> {
let mut fields = Vec::new(); let mut fields = Vec::new();
@@ -226,21 +180,39 @@ pub fn build_emulated_server_hello(
server_hello.extend_from_slice(&message); server_hello.extend_from_slice(&message);
// --- ChangeCipherSpec --- // --- ChangeCipherSpec ---
let change_cipher_spec_count = emulated_change_cipher_spec_count(cached); let change_cipher_spec = [
let mut change_cipher_spec = Vec::with_capacity(change_cipher_spec_count * 6);
for _ in 0..change_cipher_spec_count {
change_cipher_spec.extend_from_slice(&[
TLS_RECORD_CHANGE_CIPHER, TLS_RECORD_CHANGE_CIPHER,
TLS_VERSION[0], TLS_VERSION[0],
TLS_VERSION[1], TLS_VERSION[1],
0x00, 0x00,
0x01, 0x01,
0x01, 0x01,
]); ];
}
// --- ApplicationData (fake encrypted records) --- // --- ApplicationData (fake encrypted records) ---
let mut sizes = jitter_and_clamp_sizes(&emulated_app_data_sizes(cached), rng); let sizes = match cached.behavior_profile.source {
TlsProfileSource::Raw | TlsProfileSource::Merged => cached
.app_data_records_sizes
.first()
.copied()
.or_else(|| {
cached
.behavior_profile
.app_data_record_sizes
.first()
.copied()
})
.map(|size| vec![size])
.unwrap_or_else(|| vec![cached.total_app_data_len.max(1024)]),
_ => {
let mut sizes = cached.app_data_records_sizes.clone();
if sizes.is_empty() {
sizes.push(cached.total_app_data_len.max(1024));
}
sizes
}
};
let mut sizes = jitter_and_clamp_sizes(&sizes, rng);
let compact_payload = cached let compact_payload = cached
.cert_info .cert_info
.as_ref() .as_ref()
@@ -327,7 +299,10 @@ pub fn build_emulated_server_hello(
// --- Combine --- // --- Combine ---
// Optional NewSessionTicket mimic records (opaque ApplicationData for fingerprint). // Optional NewSessionTicket mimic records (opaque ApplicationData for fingerprint).
let mut tickets = Vec::new(); let mut tickets = Vec::new();
for ticket_len in emulated_ticket_record_sizes(cached, new_session_tickets, rng) { let ticket_count = new_session_tickets.min(4);
if ticket_count > 0 {
for _ in 0..ticket_count {
let ticket_len: usize = rng.range(48) + 48;
let mut rec = Vec::with_capacity(5 + ticket_len); let mut rec = Vec::with_capacity(5 + ticket_len);
rec.push(TLS_RECORD_APPLICATION); rec.push(TLS_RECORD_APPLICATION);
rec.extend_from_slice(&TLS_VERSION); rec.extend_from_slice(&TLS_VERSION);
@@ -335,6 +310,7 @@ pub fn build_emulated_server_hello(
rec.extend_from_slice(&rng.bytes(ticket_len)); rec.extend_from_slice(&rng.bytes(ticket_len));
tickets.extend_from_slice(&rec); tickets.extend_from_slice(&rec);
} }
}
let mut response = Vec::with_capacity( let mut response = Vec::with_capacity(
server_hello.len() + change_cipher_spec.len() + app_data.len() + tickets.len(), server_hello.len() + change_cipher_spec.len() + app_data.len() + tickets.len(),
@@ -358,10 +334,6 @@ pub fn build_emulated_server_hello(
#[path = "tests/emulator_security_tests.rs"] #[path = "tests/emulator_security_tests.rs"]
mod security_tests; mod security_tests;
#[cfg(test)]
#[path = "tests/emulator_profile_fidelity_security_tests.rs"]
mod emulator_profile_fidelity_security_tests;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::time::SystemTime; use std::time::SystemTime;
@@ -506,7 +478,7 @@ mod tests {
} }
#[test] #[test]
fn test_build_emulated_server_hello_replays_tail_records_for_profiled_tls() { fn test_build_emulated_server_hello_ignores_tail_records_for_raw_profile() {
let mut cached = make_cached(None); let mut cached = make_cached(None);
cached.app_data_records_sizes = vec![27, 3905, 537, 69]; cached.app_data_records_sizes = vec![27, 3905, 537, 69];
cached.total_app_data_len = 4538; cached.total_app_data_len = 4538;
@@ -528,19 +500,11 @@ mod tests {
let hello_len = u16::from_be_bytes([response[3], response[4]]) as usize; let hello_len = u16::from_be_bytes([response[3], response[4]]) as usize;
let ccs_start = 5 + hello_len; let ccs_start = 5 + hello_len;
let mut pos = ccs_start + 6; let app_start = ccs_start + 6;
let mut app_lengths = Vec::new(); let app_len =
while pos + 5 <= response.len() { u16::from_be_bytes([response[app_start + 3], response[app_start + 4]]) as usize;
assert_eq!(response[pos], TLS_RECORD_APPLICATION);
let record_len = u16::from_be_bytes([response[pos + 3], response[pos + 4]]) as usize;
app_lengths.push(record_len);
pos += 5 + record_len;
}
assert_eq!(app_lengths.len(), 4); assert_eq!(response[app_start], TLS_RECORD_APPLICATION);
assert_eq!(app_lengths[0], 64); assert_eq!(app_start + 5 + app_len, response.len());
assert_eq!(app_lengths[3], 69);
assert!(app_lengths[1] >= 64);
assert!(app_lengths[2] >= 64);
} }
} }

View File

@@ -1,95 +0,0 @@
use std::time::SystemTime;
use crate::crypto::SecureRandom;
use crate::protocol::constants::{
TLS_RECORD_APPLICATION, TLS_RECORD_CHANGE_CIPHER, TLS_RECORD_HANDSHAKE,
};
use crate::tls_front::emulator::build_emulated_server_hello;
use crate::tls_front::types::{
CachedTlsData, ParsedServerHello, TlsBehaviorProfile, TlsProfileSource,
};
fn make_cached() -> CachedTlsData {
CachedTlsData {
server_hello_template: ParsedServerHello {
version: [0x03, 0x03],
random: [0u8; 32],
session_id: Vec::new(),
cipher_suite: [0x13, 0x01],
compression: 0,
extensions: Vec::new(),
},
cert_info: None,
cert_payload: None,
app_data_records_sizes: vec![1200, 900, 220, 180],
total_app_data_len: 2500,
behavior_profile: TlsBehaviorProfile {
change_cipher_spec_count: 2,
app_data_record_sizes: vec![1200, 900],
ticket_record_sizes: vec![220, 180],
source: TlsProfileSource::Merged,
},
fetched_at: SystemTime::now(),
domain: "example.com".to_string(),
}
}
fn record_lengths_by_type(response: &[u8], wanted_type: u8) -> Vec<usize> {
let mut out = Vec::new();
let mut pos = 0usize;
while pos + 5 <= response.len() {
let record_type = response[pos];
let record_len = u16::from_be_bytes([response[pos + 3], response[pos + 4]]) as usize;
if pos + 5 + record_len > response.len() {
break;
}
if record_type == wanted_type {
out.push(record_len);
}
pos += 5 + record_len;
}
out
}
#[test]
fn emulated_server_hello_replays_profile_change_cipher_spec_count() {
let cached = make_cached();
let rng = SecureRandom::new();
let response = build_emulated_server_hello(
b"secret",
&[0x71; 32],
&[0x72; 16],
&cached,
false,
&rng,
None,
0,
);
assert_eq!(response[0], TLS_RECORD_HANDSHAKE);
let ccs_records = record_lengths_by_type(&response, TLS_RECORD_CHANGE_CIPHER);
assert_eq!(ccs_records.len(), 2);
assert!(ccs_records.iter().all(|len| *len == 1));
}
#[test]
fn emulated_server_hello_replays_profile_ticket_tail_lengths() {
let cached = make_cached();
let rng = SecureRandom::new();
let response = build_emulated_server_hello(
b"secret",
&[0x81; 32],
&[0x82; 16],
&cached,
false,
&rng,
None,
0,
);
let app_records = record_lengths_by_type(&response, TLS_RECORD_APPLICATION);
assert!(app_records.len() >= 4);
assert_eq!(&app_records[app_records.len() - 2..], &[220, 180]);
}