Compare commits

..

1 Commits

Author SHA1 Message Date
Stanislau Pilipeika e2a4f9abc5 Merge 7acc76b422 into 6d5a1a29df 2026-04-11 11:52:37 +03:00
67 changed files with 4973 additions and 9663 deletions
Generated
+1 -1
View File
@@ -2780,7 +2780,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "telemt"
version = "3.4.0"
version = "3.3.39"
dependencies = [
"aes",
"anyhow",
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.4.0"
version = "3.3.39"
edition = "2024"
[features]
-3
View File
@@ -2,8 +2,6 @@
![Latest Release](https://img.shields.io/github/v/release/telemt/telemt?color=neon) ![Stars](https://img.shields.io/github/stars/telemt/telemt?style=social) ![Forks](https://img.shields.io/github/forks/telemt/telemt?style=social) [![Telegram](https://img.shields.io/badge/Telegram-Chat-24a1de?logo=telegram&logoColor=24a1de)](https://t.me/telemtrs)
[🇷🇺 README на русском](https://github.com/telemt/telemt/blob/main/README.ru.md)
***Löst Probleme, bevor andere überhaupt wissen, dass sie existieren*** / ***It solves problems before others even realize they exist***
> [!NOTE]
@@ -27,7 +25,6 @@ curl -fsSL https://raw.githubusercontent.com/telemt/telemt/main/install.sh | sh
- [Quick Start Guide](docs/Quick_start/QUICK_START_GUIDE.en.md)
- [Инструкция по быстрому запуску](docs/Quick_start/QUICK_START_GUIDE.ru.md)
## Features
Our implementation of **TLS-fronting** is one of the most deeply debugged, focused, advanced and *almost* **"behaviorally consistent to real"**: we are confident we have it right - [see evidence on our validation and traces](docs/FAQ.en.md#recognizability-for-dpi-and-crawler)
Our ***Middle-End Pool*** is fastest by design in standard scenarios, compared to other implementations of connecting to the Middle-End Proxy: non dramatically, but usual
+2 -21
View File
@@ -54,6 +54,7 @@ curl -fsSL https://raw.githubusercontent.com/telemt/telemt/main/install.sh | sh
- [FAQ EN](docs/FAQ.en.md)
## Сборка
```bash
# Клонируйте репозиторий
git clone https://github.com/telemt/telemt
@@ -62,6 +63,7 @@ cd telemt
# Начните процесс сборки
cargo build --release
# Устройства с небольшим объёмом оперативной памяти (1 ГБ, например NanoPi Neo3 / Raspberry Pi Zero 2):
# В текущем release-профиле используется lto = "fat" для максимальной оптимизации (см. Cargo.toml).
# На системах с малым объёмом RAM (~1 ГБ) можно переопределить это значение на "thin".
@@ -85,25 +87,4 @@ telemt config.toml
- Безопасность памяти;
- Асинхронная архитектура Tokio.
## Поддержать Telemt
Telemt — это бесплатное программное обеспечение с открытым исходным кодом, разработанное в свободное время.
Если оно оказалось вам полезным, вы можете поддержать дальнейшую разработку.
Принимаемые криптовалюты (BTC, ETH, USDT, 350+ и другие):
<p align="center">
<a href="https://nowpayments.io/donation?api_key=2bf1afd2-abc2-49f9-a012-f1e715b37223" target="_blank" rel="noreferrer noopener">
<img src="https://nowpayments.io/images/embeds/donation-button-white.svg" alt="Cryptocurrency & Bitcoin donation button by NOWPayments" height="80">
</a>
</p>
Monero (XMR) напрямую:
```
8Bk4tZEYPQWSypeD2hrUXG2rKbAKF16GqEN942ZdAP5cFdSqW6h4DwkP5cJMAdszzuPeHeHZPTyjWWFwzeFdjuci3ktfMoB
```
Все пожертвования пойдут на инфраструктуру, разработку и исследования.
![telemt_scheme](docs/assets/telemt.png)
+5 -8
View File
@@ -32,13 +32,13 @@ show = "*"
port = 443
# proxy_protocol = false # Enable if behind HAProxy/nginx with PROXY protocol
# metrics_port = 9090
# metrics_listen = "127.0.0.1:9090" # Listen address for metrics (overrides metrics_port)
# metrics_whitelist = ["127.0.0.1/32", "::1/128"]
# metrics_listen = "0.0.0.0:9090" # Listen address for metrics (overrides metrics_port)
# metrics_whitelist = ["127.0.0.1", "::1", "0.0.0.0/0"]
[server.api]
enabled = true
listen = "127.0.0.1:9091"
whitelist = ["127.0.0.1/32", "::1/128"]
listen = "0.0.0.0:9091"
whitelist = ["127.0.0.0/8"]
minimal_runtime_enabled = false
minimal_runtime_cache_ttl_ms = 1000
@@ -48,12 +48,9 @@ ip = "0.0.0.0"
# === Anti-Censorship & Masking ===
[censorship]
# Fake-TLS / SNI masking domain used in generated ee-links.
# Changing tls_domain invalidates previously generated TLS links.
tls_domain = "petrovich.ru"
mask = true
tls_emulation = true # Fetch real cert lengths and emulate TLS records
tls_emulation = true # Fetch real cert lengths and emulate TLS records
tls_front_dir = "tlsfront" # Cache directory for TLS emulation
[access.users]
+4 -4
View File
@@ -9,12 +9,12 @@ API runtime is configured in `[server.api]`.
| Field | Type | Default | Description |
| --- | --- | --- | --- |
| `enabled` | `bool` | `true` | Enables REST API listener. |
| `listen` | `string` (`IP:PORT`) | `0.0.0.0:9091` | API bind address. |
| `whitelist` | `CIDR[]` | `127.0.0.0/8` | Source IP allowlist. Empty list means allow all. |
| `enabled` | `bool` | `false` | Enables REST API listener. |
| `listen` | `string` (`IP:PORT`) | `127.0.0.1:9091` | API bind address. |
| `whitelist` | `CIDR[]` | `127.0.0.1/32, ::1/128` | Source IP allowlist. Empty list means allow all. |
| `auth_header` | `string` | `""` | Exact value for `Authorization` header. Empty disables header auth. |
| `request_body_limit_bytes` | `usize` | `65536` | Maximum request body size. Must be `> 0`. |
| `minimal_runtime_enabled` | `bool` | `true` | Enables runtime snapshot endpoints requiring ME pool read-lock aggregation. |
| `minimal_runtime_enabled` | `bool` | `false` | Enables runtime snapshot endpoints requiring ME pool read-lock aggregation. |
| `minimal_runtime_cache_ttl_ms` | `u64` | `1000` | Cache TTL for minimal snapshots. `0` disables cache; valid range is `[0, 60000]`. |
| `runtime_edge_enabled` | `bool` | `false` | Enables runtime edge endpoints with cached aggregation payloads. |
| `runtime_edge_cache_ttl_ms` | `u64` | `1000` | Cache TTL for runtime edge summary payloads. `0` disables cache. |
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+3 -25
View File
@@ -36,11 +36,8 @@ hello2 = "ad_tag2"
On April 1, 2026, we became aware of a method for detecting MTProxy Fake-TLS,
based on the ECH extension and the ordering of cipher suites,
as well as an overall unique JA3/JA4 fingerprint
that does not occur in modern browsers.
> [!IMPORTANT]
> TLS fingerprint has been fixed in latest version of clients for Desktop / Android / iOS.
> Please update your client for MTProxy Fake-TLS to work correctly.
that does not occur in modern browsers:
we have already submitted initial changes to the Telegram Desktop developers and are working on updates for other clients.
- We consider this a breakthrough aspect, which has no stable analogues today
- Based on this: if `telemt` configured correctly, **TLS mode is completely identical to real-life handshake + communication** with a specified host
@@ -157,24 +154,6 @@ Keep-Alive: timeout=60
### Why do you need a middle proxy (ME)
https://github.com/telemt/telemt/discussions/167
## How clients interact with Telegram DCs
When you register a Telegram account, it gets permanently bound to one of Telegram's data centers (DCs).
It is deciced beforehand by Telegram based on the phone number's region.
This DC becomes your **home DC**: all content you upload (photos, videos, files, messages) is stored there.
Your client authenticates on it with every connection.
For example, if your account is registered on **DC2**, your client will always connect to DC2 first.
When you open a chat with another user whose home DC is **DC5**, your client opens an additional connection to DC5 to download their media.
Those cross-DC requests are normal and happen constantly.
> [!WARNING]
> Because every session is anchored to your home DC, an outage there causes other DCs to be unavaliable.
> If your home DC is DC2 and DC2 goes down, you **cannot** reach DC5 even though DC5 itself is perfectly healthy.
> The client has no valid session to route the request through.
This is also why an MTProxy only needs to reach Telegram's DC infrastructure as a whole.
The proxy itself doesn't care which DC your account lives on. The client negotiates the correct DC through the proxy after connecting.
### How many people can use one link
By default, an unlimited number of people can use a single link.
However, you can limit the number of unique IP addresses for each user:
@@ -182,8 +161,7 @@ However, you can limit the number of unique IP addresses for each user:
[access.user_max_unique_ips]
hello = 1
```
This parameter sets the maximum number of unique IP addresses from which a single link can be used simultaneously. If the first user disconnects, a second one can connect.
At the same time, multiple users can connect from a single IP address simultaneously (for example, devices on the same Wi-Fi network).
This parameter sets the maximum number of unique IP addresses from which a single link can be used simultaneously. If the first user disconnects, a second one can connect. At the same time, multiple users can connect from a single IP address simultaneously (for example, devices on the same Wi-Fi network).
### How to create multiple different links
1. Generate the required number of secrets using the command: `openssl rand -hex 16`.
+2 -22
View File
@@ -33,12 +33,9 @@ hello = "ad_tag"
hello2 = "ad_tag2"
```
## Распознаваемость для DPI и сканеров
1 апреля 2026 года нам стало известно о методе обнаружения MTProxy Fake-TLS, основанном на расширении ECH и порядке набора шифров,
а также об общем уникальном отпечатке JA3/JA4, который не встречается в современных браузерах.
> [!IMPORTANT]
> Проблема с TLS отпечатком исправлена в последних версиях клиентов Telegram для Desktop / Android / iOS.
> Обновите свой клиент для корректной работы с MTProxy Fake-TLS!
1 апреля 2026 года нам стало известно о методе обнаружения MTProxy Fake-TLS, основанном на расширении ECH и порядке набора шифров,
а также об общем уникальном отпечатке JA3/JA4, который не встречается в современных браузерах: мы уже отправили первоначальные изменения разработчикам Telegram Desktop и работаем над обновлениями для других клиентов.
- Мы считаем это прорывом, которому на сегодняшний день нет стабильных аналогов;
- Исходя из этого: если `telemt` настроен правильно, **режим TLS полностью идентичен реальному «рукопожатию» + обмену данными** с указанным хостом;
@@ -155,23 +152,6 @@ Keep-Alive: timeout=60
## Зачем нужен middle proxy (ME)
https://github.com/telemt/telemt/discussions/167
## Как клиенты взаимодействуют с дата-центрами Telegram
При регистрации аккаунта Telegram он навсегда привязывается к одному из дата-центров (DC).
Telegram заранее определяет к какому DC привязать аккаунт исходя из региона, к которому относиться номер телефона.
Этот DC становится вашим **домашним**: именно там хранится весь контент, который вы загружаете (фото, видео, файлы, сообщения).
И именно на нем клиент авторизуется при каждом подключении.
Например, если ваш аккаунт зарегистрирован на **DC2**, клиент всегда будет подключаться в первую очередь к DC2.
Когда вы открываете переписку с пользователем, чей домашний DC — **DC5**, клиент устанавливает доп. соединение с DC5, чтобы загрузить его контент.
Такие кросс-запросы к DC — это нормальная часть работы Telegram.
> [!WARNING]
> Поскольку аккаунт всегда привязан к домашнему DC, при его падении контент с других DC будет недоступен.
> Если ваш домашний DC — DC2, и DC2 лежит, вы **не сможете** достучаться и до DC5, даже если сам DC5 полностью исправен.
> У клиента просто нет валидной сессии, через которую можно было бы направить запрос.
По той же причине MTProxy достаточно иметь доступ к инфраструктуре Telegram в целом.
Cамому MTProxy всё равно, на каком DC живёт ваш аккаунт. Клиент cам договаривается о нужном DC через прокси уже после подключения.
## Что такое dd и ee в контексте MTProxy?
@@ -27,8 +27,7 @@ cargo build --release
./target/release/telemt --version
```
For low-RAM systems, note that this repository currently uses `lto = "fat"` in release profile.
On constrained builders, a local override to `lto = "thin"` may be more practical.
For low-RAM systems, this repository already uses `lto = "thin"` in release profile.
## 3. Install binary and config
+10 -37
View File
@@ -1,36 +1,9 @@
# Installation Options
There are three options for installing Telemt:
- [Automated installation using a script](#very-quick-start).
- [Manual installation of Telemt as a service](#telemt-via-systemd).
- [Installation using Docker Compose](#telemt-via-docker-compose).
# Very quick start
### One-command installation / update on re-run
```bash
curl -fsSL https://raw.githubusercontent.com/telemt/telemt/main/install.sh | sh
```
After starting, the script will prompt for:
- Your language (1 - English, 2 - Russian);
- Your TLS domain (press Enter for petrovich.ru).
The script checks if the port (default **443**) is free. If the port is already in use, installation will fail. You need to free up the port or use the **-p** flag with a different port to retry the installation.
To modify the scripts startup parameters, you can use the following flags:
- **-d, --domain** - TLS domain;
- **-p, --port** - server port (165535);
- **-s, --secret** - 32 hex secret;
- **-a, --ad-tag** - ad_tag;
- **-l, --lan**g - language (1/en or 2/ru);
Providing all options skips interactive prompts.
After completion, the script will provide a link for client connections:
```bash
tg://proxy?server=IP&port=PORT&secret=SECRET
```
### Installing a specific version
```bash
curl -fsSL https://raw.githubusercontent.com/telemt/telemt/main/install.sh | sh -s -- 3.3.39
@@ -137,15 +110,15 @@ show = "*"
# === Server Binding ===
[server]
port = 443
# proxy_protocol = false # Enable if behind HAProxy/nginx with PROXY protocol
# proxy_protocol = false # Enable if behind HAProxy/nginx with PROXY protocol
# metrics_port = 9090
# metrics_listen = "127.0.0.1:9090" # Listen address for metrics (overrides metrics_port)
# metrics_whitelist = ["127.0.0.1/32", "::1/128"]
# metrics_listen = "0.0.0.0:9090" # Listen address for metrics (overrides metrics_port)
# metrics_whitelist = ["127.0.0.1", "::1", "0.0.0.0/0"]
[server.api]
enabled = true
listen = "127.0.0.1:9091"
whitelist = ["127.0.0.1/32", "::1/128"]
listen = "0.0.0.0:9091"
whitelist = ["127.0.0.0/8"]
minimal_runtime_enabled = false
minimal_runtime_cache_ttl_ms = 1000
@@ -155,9 +128,9 @@ ip = "0.0.0.0"
# === Anti-Censorship & Masking ===
[censorship]
tls_domain = "petrovich.ru" # Fake-TLS / SNI masking domain used in generated ee-links
tls_domain = "petrovich.ru"
mask = true
tls_emulation = true # Fetch real cert lengths and emulate TLS records
tls_emulation = true # Fetch real cert lengths and emulate TLS records
tls_front_dir = "tlsfront" # Cache directory for TLS emulation
[access.users]
@@ -168,9 +141,9 @@ hello = "00000000000000000000000000000000"
then Ctrl+S -> Ctrl+X to save
> [!WARNING]
> Replace the value of the `hello` parameter with the value you obtained in step 0.
> Additionally, change the value of the `tls_domain` parameter to a different website.
> Changing the `tls_domain` parameter will break all links that use the old domain!
> Replace the value of the hello parameter with the value you obtained in step 0.
> Additionally, change the value of the tls_domain parameter to a different website.
> Changing the tls_domain parameter will break all links that use the old domain!
---
+13 -39
View File
@@ -1,35 +1,9 @@
# Варианты установки
Имеется три варианта установки Telemt:
- [Автоматизированная установка с помощью скрипта](#очень-быстрый-старт).
- [Ручная установка Telemt в качестве службы](#telemt-через-systemd-вручную).
- [Установка через Docker Compose](#telemt-через-docker-compose).
# Очень быстрый старт
### Установка одной командой / обновление при повторном запуске
```bash
curl -fsSL https://raw.githubusercontent.com/telemt/telemt/main/install.sh | sh
```
После запуска скрипт запросит:
- ваш язык (1 - English, 2 - Русский);
- ваш TLS-домен (нажмите Enter для petrovich.ru).
Во время установки скрипт проверяет, свободен ли порт (по умолчанию **443**). Если порт занят другим процессом - установка завершится с ошибкой. Для повторной установки необходимо освободить порт или указать другой через флаг **-p**.
Для изменения параметров запуска скрипта можно использовать следующие флаги:
- **-d, --domain** - TLS-домен;
- **-p, --port** - порт (165535);
- **-s, --secret** - секрет (32 hex символа);
- **-a, --ad-tag** - ad_tag;
- **-l, --lang** - язык (1/en или 2/ru).
Если заданы флаги для языка и домена, интерактивных вопросов не будет.
После завершения установки скрипт выдаст ссылку для подключения клиентов:
```bash
tg://proxy?server=IP&port=PORT&secret=SECRET
```
### Установка нужной версии
```bash
curl -fsSL https://raw.githubusercontent.com/telemt/telemt/main/install.sh | sh -s -- 3.3.39
@@ -129,22 +103,22 @@ tls = true
[general.links]
show = "*"
# show = ["alice", "bob"] # Показывать ссылки только для alice и bob
# show = "*" # Показывать ссылки для всех пользователей
# public_host = "proxy.example.com" # Хост (IP-адрес или домен) для ссылок tg://
# public_port = 443 # Порт для ссылок tg:// (по умолчанию: server.port)
# show = "*"              # Показывать ссылки для всех пользователей
# public_host = "proxy.example.com"  # Хост (IP-адрес или домен) для ссылок tg://
# public_port = 443                  # Порт для ссылок tg:// (по умолчанию: server.port)
# === Привязка сервера ===
[server]
port = 443
# proxy_protocol = false # Включите, если сервер находится за HAProxy/nginx с протоколом PROXY
# proxy_protocol = false           # Включите, если сервер находится за HAProxy/nginx с протоколом PROXY
# metrics_port = 9090
# metrics_listen = "127.0.0.1:9090" # Адрес прослушивания для метрик (переопределяет metrics_port)
# metrics_whitelist = ["127.0.0.1/32", "::1/128"]
# metrics_listen = "0.0.0.0:9090"  # Адрес прослушивания для метрик (переопределяет metrics_port)
# metrics_whitelist = ["127.0.0.1", "::1", "0.0.0.0/0"]
[server.api]
enabled = true
listen = "127.0.0.1:9091"
whitelist = ["127.0.0.1/32", "::1/128"]
listen = "0.0.0.0:9091"
whitelist = ["127.0.0.0/8"]
minimal_runtime_enabled = false
minimal_runtime_cache_ttl_ms = 1000
@@ -154,9 +128,9 @@ ip = "0.0.0.0"
# === Обход блокировок и маскировка ===
[censorship]
tls_domain = "petrovich.ru" # Домен Fake-TLS / SNI, который будет использоваться в сгенерированных ee-ссылках
tls_domain = "petrovich.ru"
mask = true
tls_emulation = true # Получить реальную длину сертификата и эмулировать запись TLS
tls_emulation = true # Получить реальную длину сертификата и эмулировать запись TLS
tls_front_dir = "tlsfront" # Директория кэша для эмуляции TLS
[access.users]
@@ -167,9 +141,9 @@ hello = "00000000000000000000000000000000"
Затем нажмите Ctrl+S -> Ctrl+X, чтобы сохранить
> [!WARNING]
> Замените значение параметра `hello` на значение, которое вы получили в пункте 0.
> Так же замените значение параметра `tls_domain` на другой сайт.
> Изменение параметра `tls_domain` сделает нерабочими все ссылки, использующие старый домен!
> Замените значение параметра hello на значение, которое вы получили в пункте 0.
> Так же замените значение параметра tls_domain на другой сайт.
> Изменение параметра tls_domain сделает нерабочими все ссылки, использующие старый домен!
---
+14 -38
View File
@@ -1,6 +1,6 @@
#![allow(clippy::too_many_arguments)]
use std::io::{Error as IoError, ErrorKind};
use std::convert::Infallible;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
@@ -16,7 +16,7 @@ use tokio::net::TcpListener;
use tokio::sync::{Mutex, RwLock, watch};
use tracing::{debug, info, warn};
use crate::config::{ApiGrayAction, ProxyConfig};
use crate::config::ProxyConfig;
use crate::ip_tracker::UserIpTracker;
use crate::proxy::route_mode::RouteRuntimeController;
use crate::startup::StartupTracker;
@@ -184,9 +184,7 @@ pub async fn serve(
.serve_connection(hyper_util::rt::TokioIo::new(stream), svc)
.await
{
if !error.is_user() {
debug!(error = %error, "API connection error");
}
debug!(error = %error, "API connection error");
}
});
}
@@ -197,7 +195,7 @@ async fn handle(
peer: SocketAddr,
shared: Arc<ApiShared>,
config_rx: watch::Receiver<Arc<ProxyConfig>>,
) -> Result<Response<Full<Bytes>>, IoError> {
) -> Result<Response<Full<Bytes>>, Infallible> {
let request_id = shared.next_request_id();
let cfg = config_rx.borrow().clone();
let api_cfg = &cfg.server.api;
@@ -215,25 +213,14 @@ async fn handle(
if !api_cfg.whitelist.is_empty() && !api_cfg.whitelist.iter().any(|net| net.contains(peer.ip()))
{
return match api_cfg.gray_action {
ApiGrayAction::Api => Ok(error_response(
request_id,
ApiFailure::new(
StatusCode::FORBIDDEN,
"forbidden",
"Source IP is not allowed",
),
)),
ApiGrayAction::Ok200 => Ok(Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/html; charset=utf-8")
.body(Full::new(Bytes::new()))
.unwrap()),
ApiGrayAction::Drop => Err(IoError::new(
ErrorKind::ConnectionAborted,
"api request dropped by gray_action=drop",
)),
};
return Ok(error_response(
request_id,
ApiFailure::new(
StatusCode::FORBIDDEN,
"forbidden",
"Source IP is not allowed",
),
));
}
if !api_cfg.auth_header.is_empty() {
@@ -257,16 +244,11 @@ async fn handle(
let method = req.method().clone();
let path = req.uri().path().to_string();
let normalized_path = if path.len() > 1 {
path.trim_end_matches('/')
} else {
path.as_str()
};
let query = req.uri().query().map(str::to_string);
let body_limit = api_cfg.request_body_limit_bytes;
let result: Result<Response<Full<Bytes>>, ApiFailure> = async {
match (method.as_str(), normalized_path) {
match (method.as_str(), path.as_str()) {
("GET", "/v1/health") => {
let revision = current_revision(&shared.config_path).await?;
let data = HealthData {
@@ -449,7 +431,7 @@ async fn handle(
Ok(success_response(status, data, revision))
}
_ => {
if let Some(user) = normalized_path.strip_prefix("/v1/users/")
if let Some(user) = path.strip_prefix("/v1/users/")
&& !user.is_empty()
&& !user.contains('/')
{
@@ -618,12 +600,6 @@ async fn handle(
),
));
}
debug!(
method = method.as_str(),
path = %path,
normalized_path = %normalized_path,
"API route not found"
);
Ok(error_response(
request_id,
ApiFailure::new(StatusCode::NOT_FOUND, "not_found", "Route not found"),
+1 -13
View File
@@ -452,11 +452,7 @@ fn build_user_links(
startup_detected_ip_v6: Option<IpAddr>,
) -> UserLinks {
let hosts = resolve_link_hosts(cfg, startup_detected_ip_v4, startup_detected_ip_v6);
let port = cfg
.general
.links
.public_port
.unwrap_or(resolve_default_link_port(cfg));
let port = cfg.general.links.public_port.unwrap_or(cfg.server.port);
let tls_domains = resolve_tls_domains(cfg);
let mut classic = Vec::new();
@@ -494,14 +490,6 @@ fn build_user_links(
}
}
fn resolve_default_link_port(cfg: &ProxyConfig) -> u16 {
cfg.server
.listeners
.first()
.and_then(|listener| listener.port)
.unwrap_or(cfg.server.port)
}
fn resolve_link_hosts(
cfg: &ProxyConfig,
startup_detected_ip_v4: Option<IpAddr>,
+1 -2
View File
@@ -598,17 +598,16 @@ secure = false
tls = true
[server]
port = {port}
listen_addr_ipv4 = "0.0.0.0"
listen_addr_ipv6 = "::"
[[server.listeners]]
ip = "0.0.0.0"
port = {port}
# reuse_allow = false # Set true only when intentionally running multiple telemt instances on same port
[[server.listeners]]
ip = "::"
port = {port}
[timeouts]
client_first_byte_idle_secs = 300
-20
View File
@@ -615,26 +615,6 @@ pub(crate) fn default_mask_relay_max_bytes() -> usize {
32 * 1024
}
#[cfg(not(test))]
pub(crate) fn default_mask_relay_timeout_ms() -> u64 {
60_000
}
#[cfg(test)]
pub(crate) fn default_mask_relay_timeout_ms() -> u64 {
200
}
#[cfg(not(test))]
pub(crate) fn default_mask_relay_idle_timeout_ms() -> u64 {
5_000
}
#[cfg(test)]
pub(crate) fn default_mask_relay_idle_timeout_ms() -> u64 {
100
}
pub(crate) fn default_mask_classifier_prefetch_timeout_ms() -> u64 {
5
}
+3 -16
View File
@@ -17,9 +17,8 @@
//! | `network` | `dns_overrides` | Applied immediately |
//! | `access` | All user/quota fields | Effective immediately |
//!
//! Fields that require re-binding sockets (`server.listeners`, legacy
//! `server.port`, `censorship.*`, `network.*`, `use_middle_proxy`) are **not**
//! applied; a warning is emitted.
//! Fields that require re-binding sockets (`server.port`, `censorship.*`,
//! `network.*`, `use_middle_proxy`) are **not** applied; a warning is emitted.
//! Non-hot changes are never mixed into the runtime config snapshot.
use std::collections::BTreeSet;
@@ -300,7 +299,6 @@ fn listeners_equal(
}
lhs.iter().zip(rhs.iter()).all(|(a, b)| {
a.ip == b.ip
&& a.port == b.port
&& a.announce == b.announce
&& a.announce_ip == b.announce_ip
&& a.proxy_protocol == b.proxy_protocol
@@ -308,14 +306,6 @@ fn listeners_equal(
})
}
fn resolve_default_link_port(cfg: &ProxyConfig) -> u16 {
cfg.server
.listeners
.first()
.and_then(|listener| listener.port)
.unwrap_or(cfg.server.port)
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
struct WatchManifest {
files: BTreeSet<PathBuf>,
@@ -570,7 +560,6 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
if old.server.api.enabled != new.server.api.enabled
|| old.server.api.listen != new.server.api.listen
|| old.server.api.whitelist != new.server.api.whitelist
|| old.server.api.gray_action != new.server.api.gray_action
|| old.server.api.auth_header != new.server.api.auth_header
|| old.server.api.request_body_limit_bytes != new.server.api.request_body_limit_bytes
|| old.server.api.minimal_runtime_enabled != new.server.api.minimal_runtime_enabled
@@ -622,8 +611,6 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
|| old.censorship.mask_shape_above_cap_blur_max_bytes
!= new.censorship.mask_shape_above_cap_blur_max_bytes
|| old.censorship.mask_relay_max_bytes != new.censorship.mask_relay_max_bytes
|| old.censorship.mask_relay_timeout_ms != new.censorship.mask_relay_timeout_ms
|| old.censorship.mask_relay_idle_timeout_ms != new.censorship.mask_relay_idle_timeout_ms
|| old.censorship.mask_classifier_prefetch_timeout_ms
!= new.censorship.mask_classifier_prefetch_timeout_ms
|| old.censorship.mask_timing_normalization_enabled
@@ -1130,7 +1117,7 @@ fn log_changes(
.general
.links
.public_port
.unwrap_or(resolve_default_link_port(new_cfg));
.unwrap_or(new_cfg.server.port);
for user in &added {
if let Some(secret) = new_hot.users.get(*user) {
print_user_links(user, secret, &host, port, new_cfg);
-208
View File
@@ -253,12 +253,6 @@ fn validate_upstreams(config: &ProxyConfig) -> Result<()> {
}
for upstream in &config.upstreams {
if matches!(upstream.ipv4, Some(false)) && matches!(upstream.ipv6, Some(false)) {
return Err(ProxyError::Config(
"upstream.ipv4 and upstream.ipv6 cannot both be false".to_string(),
));
}
if let UpstreamType::Shadowsocks { url, .. } = &upstream.upstream_type {
let parsed = ShadowsocksServerConfig::from_url(url)
.map_err(|error| ProxyError::Config(format!("invalid shadowsocks url: {error}")))?;
@@ -346,29 +340,12 @@ impl ProxyConfig {
let update_every_is_explicit = general_table
.map(|table| table.contains_key("update_every"))
.unwrap_or(false);
let beobachten_is_explicit = general_table
.map(|table| table.contains_key("beobachten"))
.unwrap_or(false);
let beobachten_minutes_is_explicit = general_table
.map(|table| table.contains_key("beobachten_minutes"))
.unwrap_or(false);
let beobachten_flush_secs_is_explicit = general_table
.map(|table| table.contains_key("beobachten_flush_secs"))
.unwrap_or(false);
let beobachten_file_is_explicit = general_table
.map(|table| table.contains_key("beobachten_file"))
.unwrap_or(false);
let legacy_secret_is_explicit = general_table
.map(|table| table.contains_key("proxy_secret_auto_reload_secs"))
.unwrap_or(false);
let legacy_config_is_explicit = general_table
.map(|table| table.contains_key("proxy_config_auto_reload_secs"))
.unwrap_or(false);
let legacy_top_level_beobachten = parsed_toml.get("beobachten").cloned();
let legacy_top_level_beobachten_minutes = parsed_toml.get("beobachten_minutes").cloned();
let legacy_top_level_beobachten_flush_secs =
parsed_toml.get("beobachten_flush_secs").cloned();
let legacy_top_level_beobachten_file = parsed_toml.get("beobachten_file").cloned();
let stun_servers_is_explicit = network_table
.map(|table| table.contains_key("stun_servers"))
.unwrap_or(false);
@@ -381,59 +358,6 @@ impl ProxyConfig {
config.general.update_every = None;
}
// Backward compatibility: legacy top-level beobachten* keys.
// Prefer `[general].*` when both are present.
let mut legacy_beobachten_applied = false;
if !beobachten_is_explicit && let Some(value) = legacy_top_level_beobachten.as_ref() {
let parsed = value.as_bool().ok_or_else(|| {
ProxyError::Config("beobachten (top-level) must be a boolean".to_string())
})?;
config.general.beobachten = parsed;
legacy_beobachten_applied = true;
}
if !beobachten_minutes_is_explicit
&& let Some(value) = legacy_top_level_beobachten_minutes.as_ref()
{
let raw = value.as_integer().ok_or_else(|| {
ProxyError::Config("beobachten_minutes (top-level) must be an integer".to_string())
})?;
let parsed = u64::try_from(raw).map_err(|_| {
ProxyError::Config(
"beobachten_minutes (top-level) must be within u64 range".to_string(),
)
})?;
config.general.beobachten_minutes = parsed;
legacy_beobachten_applied = true;
}
if !beobachten_flush_secs_is_explicit
&& let Some(value) = legacy_top_level_beobachten_flush_secs.as_ref()
{
let raw = value.as_integer().ok_or_else(|| {
ProxyError::Config(
"beobachten_flush_secs (top-level) must be an integer".to_string(),
)
})?;
let parsed = u64::try_from(raw).map_err(|_| {
ProxyError::Config(
"beobachten_flush_secs (top-level) must be within u64 range".to_string(),
)
})?;
config.general.beobachten_flush_secs = parsed;
legacy_beobachten_applied = true;
}
if !beobachten_file_is_explicit
&& let Some(value) = legacy_top_level_beobachten_file.as_ref()
{
let parsed = value.as_str().ok_or_else(|| {
ProxyError::Config("beobachten_file (top-level) must be a string".to_string())
})?;
config.general.beobachten_file = parsed.to_string();
legacy_beobachten_applied = true;
}
if legacy_beobachten_applied {
warn!("top-level beobachten* keys are deprecated; use general.beobachten* instead");
}
let legacy_nat_stun = config.general.middle_proxy_nat_stun.take();
let legacy_nat_stun_servers =
std::mem::take(&mut config.general.middle_proxy_nat_stun_servers);
@@ -1326,7 +1250,6 @@ impl ProxyConfig {
if let Ok(ipv4) = ipv4_str.parse::<IpAddr>() {
config.server.listeners.push(ListenerConfig {
ip: ipv4,
port: Some(config.server.port),
announce: None,
announce_ip: None,
proxy_protocol: None,
@@ -1338,7 +1261,6 @@ impl ProxyConfig {
{
config.server.listeners.push(ListenerConfig {
ip: ipv6,
port: Some(config.server.port),
announce: None,
announce_ip: None,
proxy_protocol: None,
@@ -1347,13 +1269,6 @@ impl ProxyConfig {
}
}
// Migration: listeners[].port fallback to legacy server.port.
for listener in &mut config.server.listeners {
if listener.port.is_none() {
listener.port = Some(config.server.port);
}
}
// Migration: announce_ip → announce for each listener.
for listener in &mut config.server.listeners {
if listener.announce.is_none()
@@ -1374,14 +1289,11 @@ impl ProxyConfig {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
});
}
@@ -1473,21 +1385,6 @@ mod tests {
const TEST_SHADOWSOCKS_URL: &str =
"ss://2022-blake3-aes-256-gcm:MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5MDE=@127.0.0.1:8388";
fn load_config_from_temp_toml(toml: &str) -> ProxyConfig {
let nonce = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let dir = std::env::temp_dir().join(format!("telemt_load_cfg_{nonce}"));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("config.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
let _ = std::fs::remove_file(path);
let _ = std::fs::remove_dir(dir);
cfg
}
#[test]
fn serde_defaults_remain_unchanged_for_present_sections() {
let toml = r#"
@@ -1584,7 +1481,6 @@ mod tests {
cfg.general.rpc_proxy_req_every,
default_rpc_proxy_req_every()
);
assert_eq!(cfg.general.beobachten_file, default_beobachten_file());
assert_eq!(cfg.general.update_every, default_update_every());
assert_eq!(cfg.server.listen_addr_ipv4, default_listen_addr_ipv4());
assert_eq!(cfg.server.listen_addr_ipv6, default_listen_addr_ipv6_opt());
@@ -1595,7 +1491,6 @@ mod tests {
assert_eq!(cfg.censorship.unknown_sni_action, UnknownSniAction::Drop);
assert_eq!(cfg.server.api.listen, default_api_listen());
assert_eq!(cfg.server.api.whitelist, default_api_whitelist());
assert_eq!(cfg.server.api.gray_action, ApiGrayAction::Drop);
assert_eq!(
cfg.server.api.request_body_limit_bytes,
default_api_request_body_limit_bytes()
@@ -1752,7 +1647,6 @@ mod tests {
default_upstream_connect_failfast_hard_errors()
);
assert_eq!(general.rpc_proxy_req_every, default_rpc_proxy_req_every());
assert_eq!(general.beobachten_file, default_beobachten_file());
assert_eq!(general.update_every, default_update_every());
let server = ServerConfig::default();
@@ -1767,7 +1661,6 @@ mod tests {
);
assert_eq!(server.api.listen, default_api_listen());
assert_eq!(server.api.whitelist, default_api_whitelist());
assert_eq!(server.api.gray_action, ApiGrayAction::Drop);
assert_eq!(
server.api.request_body_limit_bytes,
default_api_request_body_limit_bytes()
@@ -1915,107 +1808,6 @@ mod tests {
);
}
#[test]
fn api_gray_action_parses_and_defaults_to_drop() {
let cfg_default: ProxyConfig = toml::from_str(
r#"
[server]
[general]
[network]
[access]
"#,
)
.unwrap();
assert_eq!(cfg_default.server.api.gray_action, ApiGrayAction::Drop);
let cfg_api: ProxyConfig = toml::from_str(
r#"
[server]
[general]
[network]
[access]
[server.api]
gray_action = "api"
"#,
)
.unwrap();
assert_eq!(cfg_api.server.api.gray_action, ApiGrayAction::Api);
let cfg_200: ProxyConfig = toml::from_str(
r#"
[server]
[general]
[network]
[access]
[server.api]
gray_action = "200"
"#,
)
.unwrap();
assert_eq!(cfg_200.server.api.gray_action, ApiGrayAction::Ok200);
let cfg_drop: ProxyConfig = toml::from_str(
r#"
[server]
[general]
[network]
[access]
[server.api]
gray_action = "drop"
"#,
)
.unwrap();
assert_eq!(cfg_drop.server.api.gray_action, ApiGrayAction::Drop);
}
#[test]
fn top_level_beobachten_keys_migrate_to_general_when_general_not_explicit() {
let cfg = load_config_from_temp_toml(
r#"
beobachten = false
beobachten_minutes = 7
beobachten_flush_secs = 3
beobachten_file = "tmp/legacy-beob.txt"
[server]
[general]
[network]
[access]
"#,
);
assert!(!cfg.general.beobachten);
assert_eq!(cfg.general.beobachten_minutes, 7);
assert_eq!(cfg.general.beobachten_flush_secs, 3);
assert_eq!(cfg.general.beobachten_file, "tmp/legacy-beob.txt");
}
#[test]
fn general_beobachten_keys_have_priority_over_legacy_top_level() {
let cfg = load_config_from_temp_toml(
r#"
beobachten = true
beobachten_minutes = 30
beobachten_flush_secs = 30
beobachten_file = "tmp/legacy-beob.txt"
[server]
[general]
beobachten = false
beobachten_minutes = 5
beobachten_flush_secs = 2
beobachten_file = "tmp/general-beob.txt"
[network]
[access]
"#,
);
assert!(!cfg.general.beobachten);
assert_eq!(cfg.general.beobachten_minutes, 5);
assert_eq!(cfg.general.beobachten_flush_secs, 2);
assert_eq!(cfg.general.beobachten_file, "tmp/general-beob.txt");
}
#[test]
fn dc_overrides_allow_string_and_array() {
let toml = r#"
+1 -55
View File
@@ -1153,8 +1153,7 @@ pub struct LinksConfig {
#[serde(default)]
pub public_host: Option<String>,
/// Public port for tg:// link generation.
/// Overrides listener ports and legacy `server.port`.
/// Public port for tg:// link generation (overrides server.port).
#[serde(default)]
pub public_port: Option<u16>,
}
@@ -1184,13 +1183,6 @@ pub struct ApiConfig {
#[serde(default = "default_api_whitelist")]
pub whitelist: Vec<IpNetwork>,
/// Behavior for requests from source IPs outside `whitelist`.
/// - `api`: return structured API forbidden response.
/// - `200`: return `200 OK` with an empty body.
/// - `drop`: close the connection without HTTP response.
#[serde(default)]
pub gray_action: ApiGrayAction,
/// Optional static value for `Authorization` header validation.
/// Empty string disables header auth.
#[serde(default)]
@@ -1235,7 +1227,6 @@ impl Default for ApiConfig {
enabled: default_true(),
listen: default_api_listen(),
whitelist: default_api_whitelist(),
gray_action: ApiGrayAction::default(),
auth_header: String::new(),
request_body_limit_bytes: default_api_request_body_limit_bytes(),
minimal_runtime_enabled: default_api_minimal_runtime_enabled(),
@@ -1249,19 +1240,6 @@ impl Default for ApiConfig {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum ApiGrayAction {
/// Preserve current API behavior for denied source IPs.
Api,
/// Mimic a plain web endpoint by returning `200 OK` with an empty body.
#[serde(rename = "200")]
Ok200,
/// Drop connection without HTTP response for denied source IPs.
#[default]
Drop,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum ConntrackMode {
@@ -1376,8 +1354,6 @@ impl Default for ConntrackControlConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
/// Legacy listener port used for backward compatibility.
/// For new configs prefer `[[server.listeners]].port`.
#[serde(default = "default_port")]
pub port: u16,
@@ -1734,19 +1710,6 @@ pub struct AntiCensorshipConfig {
#[serde(default = "default_mask_relay_max_bytes")]
pub mask_relay_max_bytes: usize,
/// Wall-clock cap for the full masking relay on non-MTProto fallback paths.
/// Raise when the mask target is a long-lived service (e.g. WebSocket).
/// Default: 60 000 ms (60 s).
#[serde(default = "default_mask_relay_timeout_ms")]
pub mask_relay_timeout_ms: u64,
/// Per-read idle timeout on masking relay and drain paths.
/// Limits resource consumption by slow-loris attacks and port scanners.
/// A read call stalling beyond this is treated as an abandoned connection.
/// Default: 5 000 ms (5 s).
#[serde(default = "default_mask_relay_idle_timeout_ms")]
pub mask_relay_idle_timeout_ms: u64,
/// Prefetch timeout (ms) for extending fragmented masking classifier window.
#[serde(default = "default_mask_classifier_prefetch_timeout_ms")]
pub mask_classifier_prefetch_timeout_ms: u64,
@@ -1792,8 +1755,6 @@ impl Default for AntiCensorshipConfig {
mask_shape_above_cap_blur: default_mask_shape_above_cap_blur(),
mask_shape_above_cap_blur_max_bytes: default_mask_shape_above_cap_blur_max_bytes(),
mask_relay_max_bytes: default_mask_relay_max_bytes(),
mask_relay_timeout_ms: default_mask_relay_timeout_ms(),
mask_relay_idle_timeout_ms: default_mask_relay_idle_timeout_ms(),
mask_classifier_prefetch_timeout_ms: default_mask_classifier_prefetch_timeout_ms(),
mask_timing_normalization_enabled: default_mask_timing_normalization_enabled(),
mask_timing_normalization_floor_ms: default_mask_timing_normalization_floor_ms(),
@@ -1880,10 +1841,6 @@ pub enum UpstreamType {
interface: Option<String>,
#[serde(default)]
bind_addresses: Option<Vec<String>>,
/// Linux-only hard interface pinning via `SO_BINDTODEVICE`.
/// Optional alias: `force_bind`.
#[serde(default, alias = "force_bind")]
bindtodevice: Option<String>,
},
Socks4 {
address: String,
@@ -1920,22 +1877,11 @@ pub struct UpstreamConfig {
pub scopes: String,
#[serde(skip)]
pub selected_scope: String,
/// Allow IPv4 DC targets for this upstream.
/// `None` means auto-detect from runtime connectivity state.
#[serde(default)]
pub ipv4: Option<bool>,
/// Allow IPv6 DC targets for this upstream.
/// `None` means auto-detect from runtime connectivity state.
#[serde(default)]
pub ipv6: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListenerConfig {
pub ip: IpAddr,
/// Per-listener TCP port. If omitted, falls back to legacy `server.port`.
#[serde(default)]
pub port: Option<u16>,
/// IP address or hostname to announce in proxy links.
/// Takes precedence over `announce_ip` if both are set.
#[serde(default)]
+21 -40
View File
@@ -343,28 +343,15 @@ fn command_exists(binary: &str) -> bool {
})
}
fn listener_port_set(cfg: &ProxyConfig) -> Vec<u16> {
let mut ports: BTreeSet<u16> = BTreeSet::new();
if cfg.server.listeners.is_empty() {
ports.insert(cfg.server.port);
} else {
for listener in &cfg.server.listeners {
ports.insert(listener.port.unwrap_or(cfg.server.port));
}
}
ports.into_iter().collect()
}
fn notrack_targets(cfg: &ProxyConfig) -> (Vec<(Option<IpAddr>, u16)>, Vec<(Option<IpAddr>, u16)>) {
fn notrack_targets(cfg: &ProxyConfig) -> (Vec<Option<IpAddr>>, Vec<Option<IpAddr>>) {
let mode = cfg.server.conntrack_control.mode;
let mut v4_targets: BTreeSet<(Option<IpAddr>, u16)> = BTreeSet::new();
let mut v6_targets: BTreeSet<(Option<IpAddr>, u16)> = BTreeSet::new();
let mut v4_targets: BTreeSet<Option<IpAddr>> = BTreeSet::new();
let mut v6_targets: BTreeSet<Option<IpAddr>> = BTreeSet::new();
match mode {
ConntrackMode::Tracked => {}
ConntrackMode::Notrack => {
if cfg.server.listeners.is_empty() {
let port = cfg.server.port;
if let Some(ipv4) = cfg
.server
.listen_addr_ipv4
@@ -372,9 +359,9 @@ fn notrack_targets(cfg: &ProxyConfig) -> (Vec<(Option<IpAddr>, u16)>, Vec<(Optio
.and_then(|s| s.parse::<IpAddr>().ok())
{
if ipv4.is_unspecified() {
v4_targets.insert((None, port));
v4_targets.insert(None);
} else {
v4_targets.insert((Some(ipv4), port));
v4_targets.insert(Some(ipv4));
}
}
if let Some(ipv6) = cfg
@@ -384,39 +371,33 @@ fn notrack_targets(cfg: &ProxyConfig) -> (Vec<(Option<IpAddr>, u16)>, Vec<(Optio
.and_then(|s| s.parse::<IpAddr>().ok())
{
if ipv6.is_unspecified() {
v6_targets.insert((None, port));
v6_targets.insert(None);
} else {
v6_targets.insert((Some(ipv6), port));
v6_targets.insert(Some(ipv6));
}
}
} else {
for listener in &cfg.server.listeners {
let port = listener.port.unwrap_or(cfg.server.port);
if listener.ip.is_ipv4() {
if listener.ip.is_unspecified() {
v4_targets.insert((None, port));
v4_targets.insert(None);
} else {
v4_targets.insert((Some(listener.ip), port));
v4_targets.insert(Some(listener.ip));
}
} else if listener.ip.is_unspecified() {
v6_targets.insert((None, port));
v6_targets.insert(None);
} else {
v6_targets.insert((Some(listener.ip), port));
v6_targets.insert(Some(listener.ip));
}
}
}
}
ConntrackMode::Hybrid => {
let ports = listener_port_set(cfg);
for ip in &cfg.server.conntrack_control.hybrid_listener_ips {
if ip.is_ipv4() {
for port in &ports {
v4_targets.insert((Some(*ip), *port));
}
v4_targets.insert(Some(*ip));
} else {
for port in &ports {
v6_targets.insert((Some(*ip), *port));
}
v6_targets.insert(Some(*ip));
}
}
}
@@ -441,19 +422,19 @@ async fn apply_nft_rules(cfg: &ProxyConfig) -> Result<(), String> {
let (v4_targets, v6_targets) = notrack_targets(cfg);
let mut rules = Vec::new();
for (ip, port) in v4_targets {
for ip in v4_targets {
let rule = if let Some(ip) = ip {
format!("tcp dport {} ip daddr {} notrack", port, ip)
format!("tcp dport {} ip daddr {} notrack", cfg.server.port, ip)
} else {
format!("tcp dport {} notrack", port)
format!("tcp dport {} notrack", cfg.server.port)
};
rules.push(rule);
}
for (ip, port) in v6_targets {
for ip in v6_targets {
let rule = if let Some(ip) = ip {
format!("tcp dport {} ip6 daddr {} notrack", port, ip)
format!("tcp dport {} ip6 daddr {} notrack", cfg.server.port, ip)
} else {
format!("tcp dport {} notrack", port)
format!("tcp dport {} notrack", cfg.server.port)
};
rules.push(rule);
}
@@ -517,7 +498,7 @@ async fn apply_iptables_rules_for_binary(
let (v4_targets, v6_targets) = notrack_targets(cfg);
let selected = if ipv4 { v4_targets } else { v6_targets };
for (ip, port) in selected {
for ip in selected {
let mut args = vec![
"-t".to_string(),
"raw".to_string(),
@@ -526,7 +507,7 @@ async fn apply_iptables_rules_for_binary(
"-p".to_string(),
"tcp".to_string(),
"--dport".to_string(),
port.to_string(),
cfg.server.port.to_string(),
];
if let Some(ip) = ip {
args.push("-d".to_string());
+8 -18
View File
@@ -31,19 +31,6 @@ pub(crate) struct BoundListeners {
pub(crate) has_unix_listener: bool,
}
fn listener_port_or_legacy(listener: &crate::config::ListenerConfig, config: &ProxyConfig) -> u16 {
listener.port.unwrap_or(config.server.port)
}
fn default_link_port(config: &ProxyConfig) -> u16 {
config
.server
.listeners
.first()
.and_then(|listener| listener.port)
.unwrap_or(config.server.port)
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn bind_listeners(
config: &Arc<ProxyConfig>,
@@ -76,8 +63,7 @@ pub(crate) async fn bind_listeners(
let mut listeners = Vec::new();
for listener_conf in &config.server.listeners {
let listener_port = listener_port_or_legacy(listener_conf, config);
let addr = SocketAddr::new(listener_conf.ip, listener_port);
let addr = SocketAddr::new(listener_conf.ip, config.server.port);
if addr.is_ipv4() && !decision_ipv4_dc {
warn!(%addr, "Skipping IPv4 listener: IPv4 disabled by [network]");
continue;
@@ -120,7 +106,11 @@ pub(crate) async fn bind_listeners(
if config.general.links.public_host.is_none()
&& !config.general.links.show.is_empty()
{
let link_port = config.general.links.public_port.unwrap_or(listener_port);
let link_port = config
.general
.links
.public_port
.unwrap_or(config.server.port);
print_proxy_links(&public_host, link_port, config);
}
@@ -168,7 +158,7 @@ pub(crate) async fn bind_listeners(
.general
.links
.public_port
.unwrap_or(default_link_port(config)),
.unwrap_or(config.server.port),
)
} else {
let ip = detected_ip_v4.or(detected_ip_v6).map(|ip| ip.to_string());
@@ -183,7 +173,7 @@ pub(crate) async fn bind_listeners(
.general
.links
.public_port
.unwrap_or(default_link_port(config)),
.unwrap_or(config.server.port),
)
};
+27 -42
View File
@@ -81,11 +81,23 @@ pub async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
}
}
// Shared maestro startup and main loop. `drop_after_bind` runs on Unix after listeners are bound
// (for privilege drop); it is a no-op on other platforms.
async fn run_telemt_core(
drop_after_bind: impl FnOnce(),
#[cfg(unix)]
async fn run_inner(
daemon_opts: DaemonOptions,
) -> std::result::Result<(), Box<dyn std::error::Error>> {
// Acquire PID file if daemonizing or if explicitly requested
// Keep it alive until shutdown (underscore prefix = intentionally kept for RAII cleanup)
let _pid_file = if daemon_opts.daemonize || daemon_opts.pid_file.is_some() {
let mut pf = PidFile::new(daemon_opts.pid_file_path());
if let Err(e) = pf.acquire() {
eprintln!("[telemt] {}", e);
std::process::exit(1);
}
Some(pf)
} else {
None
};
let process_started_at = Instant::now();
let process_started_at_epoch_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -749,8 +761,17 @@ async fn run_telemt_core(
std::process::exit(1);
}
// On Unix, caller supplies privilege drop after bind (may require root for port < 1024).
drop_after_bind();
// Drop privileges after binding sockets (which may require root for port < 1024)
if daemon_opts.user.is_some() || daemon_opts.group.is_some() {
if let Err(e) = drop_privileges(
daemon_opts.user.as_deref(),
daemon_opts.group.as_deref(),
_pid_file.as_ref(),
) {
error!(error = %e, "Failed to drop privileges");
std::process::exit(1);
}
}
runtime_tasks::apply_runtime_log_filter(
has_rust_log,
@@ -798,39 +819,3 @@ async fn run_telemt_core(
Ok(())
}
#[cfg(unix)]
async fn run_inner(
daemon_opts: DaemonOptions,
) -> std::result::Result<(), Box<dyn std::error::Error>> {
// Acquire PID file if daemonizing or if explicitly requested
// Keep it alive until shutdown (underscore prefix = intentionally kept for RAII cleanup)
let _pid_file = if daemon_opts.daemonize || daemon_opts.pid_file.is_some() {
let mut pf = PidFile::new(daemon_opts.pid_file_path());
if let Err(e) = pf.acquire() {
eprintln!("[telemt] {}", e);
std::process::exit(1);
}
Some(pf)
} else {
None
};
let user = daemon_opts.user.clone();
let group = daemon_opts.group.clone();
run_telemt_core(|| {
if user.is_some() || group.is_some() {
if let Err(e) = drop_privileges(user.as_deref(), group.as_deref(), _pid_file.as_ref()) {
error!(error = %e, "Failed to drop privileges");
std::process::exit(1);
}
}
})
.await
}
#[cfg(not(unix))]
async fn run_inner() -> std::result::Result<(), Box<dyn std::error::Error>> {
run_telemt_core(|| {}).await
}
-1
View File
@@ -97,7 +97,6 @@ pub async fn run_probe(
let UpstreamType::Direct {
interface,
bind_addresses,
..
} = &upstream.upstream_type
else {
continue;
+23 -63
View File
@@ -28,10 +28,14 @@ use tracing::debug;
const MASK_TIMEOUT: Duration = Duration::from_secs(5);
#[cfg(test)]
const MASK_TIMEOUT: Duration = Duration::from_millis(50);
/// Maximum duration for the entire masking relay under test (replaced by config at runtime).
/// Maximum duration for the entire masking relay.
/// Limits resource consumption from slow-loris attacks and port scanners.
#[cfg(not(test))]
const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60);
#[cfg(test)]
const MASK_RELAY_TIMEOUT: Duration = Duration::from_millis(200);
/// Per-read idle timeout for masking relay and drain paths under test (replaced by config at runtime).
#[cfg(not(test))]
const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_secs(5);
#[cfg(test)]
const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
const MASK_BUFFER_SIZE: usize = 8192;
@@ -51,7 +55,6 @@ async fn copy_with_idle_timeout<R, W>(
writer: &mut W,
byte_cap: usize,
shutdown_on_eof: bool,
idle_timeout: Duration,
) -> CopyOutcome
where
R: AsyncRead + Unpin,
@@ -75,7 +78,7 @@ where
}
let read_len = remaining_budget.min(MASK_BUFFER_SIZE);
let read_res = timeout(idle_timeout, reader.read(&mut buf[..read_len])).await;
let read_res = timeout(MASK_RELAY_IDLE_TIMEOUT, reader.read(&mut buf[..read_len])).await;
let n = match read_res {
Ok(Ok(n)) => n,
Ok(Err(_)) | Err(_) => break,
@@ -83,13 +86,13 @@ where
if n == 0 {
ended_by_eof = true;
if shutdown_on_eof {
let _ = timeout(idle_timeout, writer.shutdown()).await;
let _ = timeout(MASK_RELAY_IDLE_TIMEOUT, writer.shutdown()).await;
}
break;
}
total = total.saturating_add(n);
let write_res = timeout(idle_timeout, writer.write_all(&buf[..n])).await;
let write_res = timeout(MASK_RELAY_IDLE_TIMEOUT, writer.write_all(&buf[..n])).await;
match write_res {
Ok(Ok(())) => {}
Ok(Err(_)) | Err(_) => break,
@@ -227,20 +230,13 @@ where
}
}
async fn consume_client_data_with_timeout_and_cap<R>(
reader: R,
byte_cap: usize,
relay_timeout: Duration,
idle_timeout: Duration,
) where
async fn consume_client_data_with_timeout_and_cap<R>(reader: R, byte_cap: usize)
where
R: AsyncRead + Unpin,
{
if timeout(
relay_timeout,
consume_client_data(reader, byte_cap, idle_timeout),
)
.await
.is_err()
if timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader, byte_cap))
.await
.is_err()
{
debug!("Timed out while consuming client data on masking fallback path");
}
@@ -643,18 +639,10 @@ pub async fn handle_bad_client<R, W>(
beobachten.record(client_type, peer.ip(), ttl);
}
let relay_timeout = Duration::from_millis(config.censorship.mask_relay_timeout_ms);
let idle_timeout = Duration::from_millis(config.censorship.mask_relay_idle_timeout_ms);
if !config.censorship.mask {
// Masking disabled, just consume data
consume_client_data_with_timeout_and_cap(
reader,
config.censorship.mask_relay_max_bytes,
relay_timeout,
idle_timeout,
)
.await;
consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes)
.await;
return;
}
@@ -686,7 +674,7 @@ pub async fn handle_bad_client<R, W>(
return;
}
if timeout(
relay_timeout,
MASK_RELAY_TIMEOUT,
relay_to_mask(
reader,
writer,
@@ -700,7 +688,6 @@ pub async fn handle_bad_client<R, W>(
config.censorship.mask_shape_above_cap_blur_max_bytes,
config.censorship.mask_shape_hardening_aggressive_mode,
config.censorship.mask_relay_max_bytes,
idle_timeout,
),
)
.await
@@ -716,8 +703,6 @@ pub async fn handle_bad_client<R, W>(
consume_client_data_with_timeout_and_cap(
reader,
config.censorship.mask_relay_max_bytes,
relay_timeout,
idle_timeout,
)
.await;
wait_mask_outcome_budget(outcome_started, config).await;
@@ -727,8 +712,6 @@ pub async fn handle_bad_client<R, W>(
consume_client_data_with_timeout_and_cap(
reader,
config.censorship.mask_relay_max_bytes,
relay_timeout,
idle_timeout,
)
.await;
wait_mask_outcome_budget(outcome_started, config).await;
@@ -759,13 +742,8 @@ pub async fn handle_bad_client<R, W>(
local = %local_addr,
"Mask target resolves to local listener; refusing self-referential masking fallback"
);
consume_client_data_with_timeout_and_cap(
reader,
config.censorship.mask_relay_max_bytes,
relay_timeout,
idle_timeout,
)
.await;
consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes)
.await;
wait_mask_outcome_budget(outcome_started, config).await;
return;
}
@@ -799,7 +777,7 @@ pub async fn handle_bad_client<R, W>(
return;
}
if timeout(
relay_timeout,
MASK_RELAY_TIMEOUT,
relay_to_mask(
reader,
writer,
@@ -813,7 +791,6 @@ pub async fn handle_bad_client<R, W>(
config.censorship.mask_shape_above_cap_blur_max_bytes,
config.censorship.mask_shape_hardening_aggressive_mode,
config.censorship.mask_relay_max_bytes,
idle_timeout,
),
)
.await
@@ -829,8 +806,6 @@ pub async fn handle_bad_client<R, W>(
consume_client_data_with_timeout_and_cap(
reader,
config.censorship.mask_relay_max_bytes,
relay_timeout,
idle_timeout,
)
.await;
wait_mask_outcome_budget(outcome_started, config).await;
@@ -840,8 +815,6 @@ pub async fn handle_bad_client<R, W>(
consume_client_data_with_timeout_and_cap(
reader,
config.censorship.mask_relay_max_bytes,
relay_timeout,
idle_timeout,
)
.await;
wait_mask_outcome_budget(outcome_started, config).await;
@@ -863,7 +836,6 @@ async fn relay_to_mask<R, W, MR, MW>(
shape_above_cap_blur_max_bytes: usize,
shape_hardening_aggressive_mode: bool,
mask_relay_max_bytes: usize,
idle_timeout: Duration,
) where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
@@ -885,19 +857,11 @@ async fn relay_to_mask<R, W, MR, MW>(
&mut mask_write,
mask_relay_max_bytes,
!shape_hardening_enabled,
idle_timeout,
)
.await
},
async {
copy_with_idle_timeout(
&mut mask_read,
&mut writer,
mask_relay_max_bytes,
true,
idle_timeout,
)
.await
copy_with_idle_timeout(&mut mask_read, &mut writer, mask_relay_max_bytes, true).await
}
);
@@ -925,11 +889,7 @@ async fn relay_to_mask<R, W, MR, MW>(
}
/// Just consume all data from client without responding.
async fn consume_client_data<R: AsyncRead + Unpin>(
mut reader: R,
byte_cap: usize,
idle_timeout: Duration,
) {
async fn consume_client_data<R: AsyncRead + Unpin>(mut reader: R, byte_cap: usize) {
if byte_cap == 0 {
return;
}
@@ -945,7 +905,7 @@ async fn consume_client_data<R: AsyncRead + Unpin>(
}
let read_len = remaining_budget.min(MASK_BUFFER_SIZE);
let n = match timeout(idle_timeout, reader.read(&mut buf[..read_len])).await {
let n = match timeout(MASK_RELAY_IDLE_TIMEOUT, reader.read(&mut buf[..read_len])).await {
Ok(Ok(n)) => n,
Ok(Err(_)) | Err(_) => break,
};
@@ -31,14 +31,11 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -27,14 +27,11 @@ fn build_harness(config: ProxyConfig) -> PipelineHarness {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -11,14 +11,11 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -11,14 +11,11 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -25,14 +25,11 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -11,14 +11,11 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -11,14 +11,11 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -38,14 +38,11 @@ fn build_harness(secret_hex: &str, mask_port: u16) -> PipelineHarness {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -16,14 +16,11 @@ fn make_test_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -39,14 +39,11 @@ fn build_harness(secret_hex: &str, mask_port: u16) -> RedTeamHarness {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -232,14 +229,11 @@ async fn redteam_03_masking_duration_must_be_less_than_1ms_when_backend_down() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -476,14 +470,11 @@ async fn measure_invalid_probe_duration_ms(delay_ms: u64, tls_len: u16, body_sen
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -553,14 +544,11 @@ async fn capture_forwarded_probe_len(tls_len: u16, body_sent: usize) -> usize {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -13,14 +13,11 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -11,14 +11,11 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -11,14 +11,11 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -11,14 +11,11 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -11,14 +11,11 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -25,14 +25,11 @@ fn new_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
-81
View File
@@ -332,14 +332,11 @@ async fn relay_task_abort_releases_user_gate_and_ip_reservation() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -449,14 +446,11 @@ async fn relay_cutover_releases_user_gate_and_ip_reservation() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -576,14 +570,11 @@ async fn integration_route_cutover_and_quota_overlap_fails_closed_and_releases_s
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -749,14 +740,11 @@ async fn proxy_protocol_header_is_rejected_when_trust_list_is_empty() {
upstream_type: crate::config::UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -829,14 +817,11 @@ async fn proxy_protocol_header_from_untrusted_peer_range_is_rejected_under_load(
upstream_type: crate::config::UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -992,14 +977,11 @@ async fn short_tls_probe_is_masked_through_client_pipeline() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -1083,14 +1065,11 @@ async fn tls12_record_probe_is_masked_through_client_pipeline() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -1172,14 +1151,11 @@ async fn handle_client_stream_increments_connects_all_exactly_once() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -1268,14 +1244,11 @@ async fn running_client_handler_increments_connects_all_exactly_once() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -1361,14 +1334,11 @@ async fn idle_pooled_connection_closes_cleanly_in_generic_stream_path() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -1435,14 +1405,11 @@ async fn idle_pooled_connection_closes_cleanly_in_client_handler_path() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -1524,14 +1491,11 @@ async fn partial_tls_header_stall_triggers_handshake_timeout() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -1852,14 +1816,11 @@ async fn valid_tls_path_does_not_fall_back_to_mask_backend() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -1964,14 +1925,11 @@ async fn valid_tls_with_invalid_mtproto_falls_back_to_mask_backend() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -2074,14 +2032,11 @@ async fn client_handler_tls_bad_mtproto_is_forwarded_to_mask_backend() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -2199,14 +2154,11 @@ async fn alpn_mismatch_tls_probe_is_masked_through_client_pipeline() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -2295,14 +2247,11 @@ async fn invalid_hmac_tls_probe_is_masked_through_client_pipeline() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -2397,14 +2346,11 @@ async fn burst_invalid_tls_probes_are_masked_verbatim() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -3305,14 +3251,11 @@ async fn relay_connect_error_releases_user_and_ip_before_return() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -3869,14 +3812,11 @@ async fn untrusted_proxy_header_source_is_rejected() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -3942,14 +3882,11 @@ async fn empty_proxy_trusted_cidrs_rejects_proxy_header_by_default() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -4042,14 +3979,11 @@ async fn oversized_tls_record_is_masked_in_generic_stream_pipeline() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -4148,14 +4082,11 @@ async fn oversized_tls_record_is_masked_in_client_handler_pipeline() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -4268,14 +4199,11 @@ async fn tls_record_len_min_minus_1_is_rejected_in_generic_stream_pipeline() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -4374,14 +4302,11 @@ async fn tls_record_len_min_minus_1_is_rejected_in_client_handler_pipeline() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -4483,14 +4408,11 @@ async fn tls_record_len_16384_is_accepted_in_generic_stream_pipeline() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -4587,14 +4509,11 @@ async fn tls_record_len_16384_is_accepted_in_client_handler_pipeline() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -24,14 +24,11 @@ fn make_test_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -26,14 +26,11 @@ fn make_test_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -27,14 +27,11 @@ fn make_test_upstream_manager(stats: Arc<Stats>) -> Arc<UpstreamManager> {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -41,14 +41,11 @@ fn build_harness(secret_hex: &str, mask_port: u16) -> PipelineHarness {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -1293,14 +1293,11 @@ async fn direct_relay_abort_midflight_releases_route_gauge() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -1403,14 +1400,11 @@ async fn direct_relay_cutover_midflight_releases_route_gauge() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -1528,14 +1522,11 @@ async fn direct_relay_cutover_storm_multi_session_keeps_generic_errors_and_relea
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
@@ -1767,11 +1758,8 @@ async fn negative_direct_relay_dc_connection_refused_fails_fast() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
100,
@@ -1861,11 +1849,8 @@ async fn adversarial_direct_relay_cutover_integrity() {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
100,
@@ -47,7 +47,7 @@ async fn consume_client_data_stops_after_byte_cap_without_eof() {
};
let cap = 10_000usize;
consume_client_data(reader, cap, MASK_RELAY_IDLE_TIMEOUT).await;
consume_client_data(reader, cap).await;
let total = produced.load(Ordering::Relaxed);
assert!(
@@ -31,7 +31,7 @@ async fn stalling_client_terminates_at_idle_not_relay_timeout() {
let result = tokio::time::timeout(
MASK_RELAY_TIMEOUT,
consume_client_data(reader, MASK_BUFFER_SIZE * 4, MASK_RELAY_IDLE_TIMEOUT),
consume_client_data(reader, MASK_BUFFER_SIZE * 4),
)
.await;
@@ -57,12 +57,9 @@ async fn fast_reader_drains_to_eof() {
let data = vec![0xAAu8; 32 * 1024];
let reader = std::io::Cursor::new(data);
tokio::time::timeout(
MASK_RELAY_TIMEOUT,
consume_client_data(reader, usize::MAX, MASK_RELAY_IDLE_TIMEOUT),
)
.await
.expect("consume_client_data did not complete for fast EOF reader");
tokio::time::timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader, usize::MAX))
.await
.expect("consume_client_data did not complete for fast EOF reader");
}
#[tokio::test]
@@ -84,7 +81,7 @@ async fn io_error_terminates_cleanly() {
tokio::time::timeout(
MASK_RELAY_TIMEOUT,
consume_client_data(ErrReader, usize::MAX, MASK_RELAY_IDLE_TIMEOUT),
consume_client_data(ErrReader, usize::MAX),
)
.await
.expect("consume_client_data did not return on I/O error");
@@ -34,11 +34,7 @@ async fn consume_stall_stress_finishes_within_idle_budget() {
set.spawn(async {
tokio::time::timeout(
MASK_RELAY_TIMEOUT,
consume_client_data(
OneByteThenStall { sent: false },
usize::MAX,
MASK_RELAY_IDLE_TIMEOUT,
),
consume_client_data(OneByteThenStall { sent: false }, usize::MAX),
)
.await
.expect("consume_client_data exceeded relay timeout under stall load");
@@ -60,7 +56,7 @@ async fn consume_stall_stress_finishes_within_idle_budget() {
#[tokio::test]
async fn consume_zero_cap_returns_immediately() {
let started = Instant::now();
consume_client_data(tokio::io::empty(), 0, MASK_RELAY_IDLE_TIMEOUT).await;
consume_client_data(tokio::io::empty(), 0).await;
assert!(
started.elapsed() < MASK_RELAY_IDLE_TIMEOUT,
"zero byte cap must return immediately"
@@ -127,14 +127,7 @@ async fn positive_copy_with_production_cap_stops_exactly_at_budget() {
let mut reader = FinitePatternReader::new(PROD_CAP_BYTES + (256 * 1024), 4096, read_calls);
let mut writer = CountingWriter::default();
let outcome = copy_with_idle_timeout(
&mut reader,
&mut writer,
PROD_CAP_BYTES,
true,
MASK_RELAY_IDLE_TIMEOUT,
)
.await;
let outcome = copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await;
assert_eq!(
outcome.total, PROD_CAP_BYTES,
@@ -152,13 +145,7 @@ async fn negative_consume_with_zero_cap_performs_no_reads() {
let read_calls = Arc::new(AtomicUsize::new(0));
let reader = FinitePatternReader::new(1024, 64, Arc::clone(&read_calls));
consume_client_data_with_timeout_and_cap(
reader,
0,
MASK_RELAY_TIMEOUT,
MASK_RELAY_IDLE_TIMEOUT,
)
.await;
consume_client_data_with_timeout_and_cap(reader, 0).await;
assert_eq!(
read_calls.load(Ordering::Relaxed),
@@ -174,14 +161,7 @@ async fn edge_copy_below_cap_reports_eof_without_overread() {
let mut reader = FinitePatternReader::new(payload, 3072, read_calls);
let mut writer = CountingWriter::default();
let outcome = copy_with_idle_timeout(
&mut reader,
&mut writer,
PROD_CAP_BYTES,
true,
MASK_RELAY_IDLE_TIMEOUT,
)
.await;
let outcome = copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await;
assert_eq!(outcome.total, payload);
assert_eq!(writer.written, payload);
@@ -195,13 +175,7 @@ async fn edge_copy_below_cap_reports_eof_without_overread() {
async fn adversarial_blackhat_never_ready_reader_is_bounded_by_timeout_guards() {
let started = Instant::now();
consume_client_data_with_timeout_and_cap(
NeverReadyReader,
PROD_CAP_BYTES,
MASK_RELAY_TIMEOUT,
MASK_RELAY_IDLE_TIMEOUT,
)
.await;
consume_client_data_with_timeout_and_cap(NeverReadyReader, PROD_CAP_BYTES).await;
assert!(
started.elapsed() < Duration::from_millis(350),
@@ -216,12 +190,7 @@ async fn integration_consume_path_honors_production_cap_for_large_payload() {
let bounded = timeout(
Duration::from_millis(350),
consume_client_data_with_timeout_and_cap(
reader,
PROD_CAP_BYTES,
MASK_RELAY_TIMEOUT,
MASK_RELAY_IDLE_TIMEOUT,
),
consume_client_data_with_timeout_and_cap(reader, PROD_CAP_BYTES),
)
.await;
@@ -237,13 +206,7 @@ async fn adversarial_consume_path_never_reads_beyond_declared_byte_cap() {
let total_read = Arc::new(AtomicUsize::new(0));
let reader = BudgetProbeReader::new(256 * 1024, Arc::clone(&total_read));
consume_client_data_with_timeout_and_cap(
reader,
byte_cap,
MASK_RELAY_TIMEOUT,
MASK_RELAY_IDLE_TIMEOUT,
)
.await;
consume_client_data_with_timeout_and_cap(reader, byte_cap).await;
assert!(
total_read.load(Ordering::Relaxed) <= byte_cap,
@@ -268,9 +231,7 @@ async fn light_fuzz_cap_and_payload_matrix_preserves_min_budget_invariant() {
let mut reader = FinitePatternReader::new(payload, chunk, read_calls);
let mut writer = CountingWriter::default();
let outcome =
copy_with_idle_timeout(&mut reader, &mut writer, cap, true, MASK_RELAY_IDLE_TIMEOUT)
.await;
let outcome = copy_with_idle_timeout(&mut reader, &mut writer, cap, true).await;
let expected = payload.min(cap);
assert_eq!(
@@ -300,14 +261,7 @@ async fn stress_parallel_copy_tasks_with_production_cap_complete_without_leaks()
read_calls,
);
let mut writer = CountingWriter::default();
copy_with_idle_timeout(
&mut reader,
&mut writer,
PROD_CAP_BYTES,
true,
MASK_RELAY_IDLE_TIMEOUT,
)
.await
copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await
}));
}
@@ -26,7 +26,6 @@ async fn relay_to_mask_enforces_masking_session_byte_cap() {
0,
false,
32 * 1024,
MASK_RELAY_IDLE_TIMEOUT,
)
.await;
});
@@ -82,7 +81,6 @@ async fn relay_to_mask_propagates_client_half_close_without_waiting_for_other_di
0,
false,
32 * 1024,
MASK_RELAY_IDLE_TIMEOUT,
)
.await;
});
@@ -1377,7 +1377,6 @@ async fn relay_to_mask_keeps_backend_to_client_flow_when_client_to_backend_stall
0,
false,
5 * 1024 * 1024,
MASK_RELAY_IDLE_TIMEOUT,
)
.await;
});
@@ -1509,7 +1508,6 @@ async fn relay_to_mask_timeout_cancels_and_drops_all_io_endpoints() {
0,
false,
5 * 1024 * 1024,
MASK_RELAY_IDLE_TIMEOUT,
),
)
.await;
@@ -228,7 +228,6 @@ async fn relay_path_idle_timeout_eviction_remains_effective() {
0,
false,
5 * 1024 * 1024,
MASK_RELAY_IDLE_TIMEOUT,
)
.await;
@@ -44,7 +44,6 @@ async fn run_relay_case(
above_cap_blur_max_bytes,
false,
5 * 1024 * 1024,
MASK_RELAY_IDLE_TIMEOUT,
)
.await;
});
@@ -89,7 +89,6 @@ async fn relay_to_mask_applies_cap_clamped_padding_for_non_power_of_two_cap() {
0,
false,
5 * 1024 * 1024,
MASK_RELAY_IDLE_TIMEOUT,
)
.await;
});
@@ -53,14 +53,11 @@ fn new_client_harness() -> ClientHarness {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
1,
+95 -57
View File
@@ -67,8 +67,10 @@ struct FamilyReconnectOutcome {
key: (i32, IpFamily),
dc: i32,
family: IpFamily,
alive: usize,
required: usize,
endpoint_count: usize,
restored: usize,
}
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
@@ -80,6 +82,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new();
let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new();
let mut degraded_interval = true;
@@ -105,6 +109,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut single_endpoint_outage,
&mut shadow_rotate_deadline,
&mut idle_refresh_next_attempt,
&mut adaptive_idle_since,
&mut adaptive_recover_until,
&mut floor_warn_next_allowed,
)
.await;
@@ -120,6 +126,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut single_endpoint_outage,
&mut shadow_rotate_deadline,
&mut idle_refresh_next_attempt,
&mut adaptive_idle_since,
&mut adaptive_recover_until,
&mut floor_warn_next_allowed,
)
.await;
@@ -352,6 +360,8 @@ async fn check_family(
single_endpoint_outage: &mut HashSet<(i32, IpFamily)>,
shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>,
idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>,
) -> bool {
let enabled = match family {
@@ -383,7 +393,10 @@ async fn check_family(
let reconnect_budget = health_reconnect_budget(pool, dc_endpoints.len());
let reconnect_sem = Arc::new(Semaphore::new(reconnect_budget));
if pool.floor_mode() == MeFloorMode::Static {}
if pool.floor_mode() == MeFloorMode::Static {
adaptive_idle_since.clear();
adaptive_recover_until.clear();
}
let mut live_addr_counts = HashMap::<(i32, SocketAddr), usize>::new();
let mut live_writer_ids_by_addr = HashMap::<(i32, SocketAddr), Vec<u64>>::new();
@@ -422,6 +435,8 @@ async fn check_family(
&live_addr_counts,
&live_writer_ids_by_addr,
&bound_clients_by_writer,
adaptive_idle_since,
adaptive_recover_until,
)
.await;
pool.set_adaptive_floor_runtime_caps(
@@ -488,6 +503,8 @@ async fn check_family(
outage_next_attempt.remove(&key);
shadow_rotate_deadline.remove(&key);
idle_refresh_next_attempt.remove(&key);
adaptive_idle_since.remove(&key);
adaptive_recover_until.remove(&key);
info!(
dc = %dc,
?family,
@@ -615,28 +632,22 @@ async fn check_family(
restored += 1;
continue;
}
let base_req = pool_for_reconnect
.required_writers_for_dc_with_floor_mode(endpoints_for_dc.len(), false);
if alive + restored >= base_req {
pool_for_reconnect
.stats
.increment_me_floor_cap_block_total();
pool_for_reconnect
.stats
.increment_me_floor_swap_idle_failed_total();
debug!(
dc = %dc,
?family,
alive,
required,
active_cap_effective_total,
"Adaptive floor cap reached, reconnect attempt blocked"
);
break;
}
pool_for_reconnect
.stats
.increment_me_floor_cap_block_total();
pool_for_reconnect
.stats
.increment_me_floor_swap_idle_failed_total();
debug!(
dc = %dc,
?family,
alive,
required,
active_cap_effective_total,
"Adaptive floor cap reached, reconnect attempt blocked"
);
break;
}
pool_for_reconnect.stats.increment_me_reconnect_attempt();
let res = tokio::time::timeout(
pool_for_reconnect.reconnect_runtime.me_one_timeout,
pool_for_reconnect.connect_endpoints_round_robin(
@@ -652,9 +663,11 @@ async fn check_family(
pool_for_reconnect.stats.increment_me_reconnect_success();
}
Ok(false) => {
pool_for_reconnect.stats.increment_me_reconnect_attempt();
debug!(dc = %dc, ?family, "ME round-robin reconnect failed")
}
Err(_) => {
pool_for_reconnect.stats.increment_me_reconnect_attempt();
debug!(dc = %dc, ?family, "ME reconnect timed out");
}
}
@@ -665,8 +678,10 @@ async fn check_family(
key,
dc,
family,
alive,
required,
endpoint_count: endpoints_for_dc.len(),
restored,
}
});
}
@@ -680,7 +695,7 @@ async fn check_family(
}
};
let now = Instant::now();
let now_alive = live_active_writers_for_dc_family(pool, outcome.dc, outcome.family).await;
let now_alive = outcome.alive + outcome.restored;
if now_alive >= outcome.required {
info!(
dc = %outcome.dc,
@@ -836,33 +851,6 @@ fn should_emit_rate_limited_warn(
false
}
async fn live_active_writers_for_dc_family(pool: &Arc<MePool>, dc: i32, family: IpFamily) -> usize {
let writers = pool.writers.read().await;
writers
.iter()
.filter(|writer| {
if writer.draining.load(std::sync::atomic::Ordering::Relaxed) {
return false;
}
if writer.writer_dc != dc {
return false;
}
if !matches!(
super::pool::WriterContour::from_u8(
writer.contour.load(std::sync::atomic::Ordering::Relaxed),
),
super::pool::WriterContour::Active
) {
return false;
}
match family {
IpFamily::V4 => writer.addr.is_ipv4(),
IpFamily::V6 => writer.addr.is_ipv6(),
}
})
.count()
}
fn adaptive_floor_class_min(
pool: &Arc<MePool>,
endpoint_count: usize,
@@ -916,6 +904,8 @@ async fn build_family_floor_plan(
live_addr_counts: &HashMap<(i32, SocketAddr), usize>,
live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec<u64>>,
bound_clients_by_writer: &HashMap<u64, usize>,
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
) -> FamilyFloorPlan {
let mut entries = Vec::<DcFloorPlanEntry>::new();
let mut by_dc = HashMap::<i32, DcFloorPlanEntry>::new();
@@ -931,7 +921,18 @@ async fn build_family_floor_plan(
if endpoints.is_empty() {
continue;
}
let _key = (*dc, family);
let key = (*dc, family);
let reduce_for_idle = should_reduce_floor_for_idle(
pool,
key,
*dc,
endpoints,
live_writer_ids_by_addr,
bound_clients_by_writer,
adaptive_idle_since,
adaptive_recover_until,
)
.await;
let base_required = pool.required_writers_for_dc(endpoints.len()).max(1);
let min_required = if is_adaptive {
adaptive_floor_class_min(pool, endpoints.len(), base_required)
@@ -946,11 +947,11 @@ async fn build_family_floor_plan(
if max_required < min_required {
max_required = min_required;
}
// We initialize target_required at base_required to prevent 0-writer blackouts
// caused by proactively dropping an idle DC to a single fragile connection.
// The Adaptive Floor constraint loop below will gracefully compress idle DCs
// (prioritized via has_bound_clients = false) to min_required only when global capacity is reached.
let desired_raw = base_required;
let desired_raw = if is_adaptive && reduce_for_idle {
min_required
} else {
base_required
};
let target_required = desired_raw.clamp(min_required, max_required);
let alive = endpoints
.iter()
@@ -1277,6 +1278,43 @@ async fn maybe_refresh_idle_writer_for_dc(
);
}
async fn should_reduce_floor_for_idle(
pool: &Arc<MePool>,
key: (i32, IpFamily),
dc: i32,
endpoints: &[SocketAddr],
live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec<u64>>,
bound_clients_by_writer: &HashMap<u64, usize>,
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
) -> bool {
if pool.floor_mode() != MeFloorMode::Adaptive {
adaptive_idle_since.remove(&key);
adaptive_recover_until.remove(&key);
return false;
}
let now = Instant::now();
let writer_ids = list_writer_ids_for_endpoints(dc, endpoints, live_writer_ids_by_addr);
let has_bound_clients = has_bound_clients_on_endpoint(&writer_ids, bound_clients_by_writer);
if has_bound_clients {
adaptive_idle_since.remove(&key);
adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration());
return false;
}
if let Some(recover_until) = adaptive_recover_until.get(&key)
&& now < *recover_until
{
adaptive_idle_since.remove(&key);
return false;
}
adaptive_recover_until.remove(&key);
let idle_since = adaptive_idle_since.entry(key).or_insert(now);
now.saturating_duration_since(*idle_since) >= pool.adaptive_floor_idle_duration()
}
fn has_bound_clients_on_endpoint(
writer_ids: &[u64],
bound_clients_by_writer: &HashMap<u64, usize>,
@@ -1326,7 +1364,6 @@ async fn recover_single_endpoint_outage(
);
return;
};
pool.stats.increment_me_reconnect_attempt();
pool.stats
.increment_me_single_endpoint_outage_reconnect_attempt_total();
@@ -1402,6 +1439,7 @@ async fn recover_single_endpoint_outage(
return;
}
pool.stats.increment_me_reconnect_attempt();
let current_ms = *outage_backoff.get(&key).unwrap_or(&min_backoff_ms);
let next_ms = current_ms.saturating_mul(2).min(max_backoff_ms);
outage_backoff.insert(key, next_ms);
+1 -8
View File
@@ -67,7 +67,6 @@ pub fn format_sample_line(sample: &MePingSample) -> String {
fn format_direct_with_config(
interface: &Option<String>,
bind_addresses: &Option<Vec<String>>,
bindtodevice: &Option<String>,
) -> Option<String> {
let mut direct_parts: Vec<String> = Vec::new();
if let Some(dev) = interface.as_deref().filter(|v| !v.is_empty()) {
@@ -76,9 +75,6 @@ fn format_direct_with_config(
if let Some(src) = bind_addresses.as_ref().filter(|v| !v.is_empty()) {
direct_parts.push(format!("src={}", src.join(",")));
}
if let Some(device) = bindtodevice.as_deref().filter(|v| !v.is_empty()) {
direct_parts.push(format!("bindtodevice={device}"));
}
if direct_parts.is_empty() {
None
} else {
@@ -235,11 +231,8 @@ pub async fn format_me_route(
UpstreamType::Direct {
interface,
bind_addresses,
bindtodevice,
} => {
if let Some(route) =
format_direct_with_config(interface, bind_addresses, bindtodevice)
{
if let Some(route) = format_direct_with_config(interface, bind_addresses) {
route
} else {
detect_direct_route_details(reports, prefer_ipv6, v4_ok, v6_ok)
+16 -38
View File
@@ -1422,6 +1422,22 @@ impl MePool {
MeFloorMode::from_u8(self.floor_runtime.me_floor_mode.load(Ordering::Relaxed))
}
pub(super) fn adaptive_floor_idle_duration(&self) -> Duration {
Duration::from_secs(
self.floor_runtime
.me_adaptive_floor_idle_secs
.load(Ordering::Relaxed),
)
}
pub(super) fn adaptive_floor_recover_grace_duration(&self) -> Duration {
Duration::from_secs(
self.floor_runtime
.me_adaptive_floor_recover_grace_secs
.load(Ordering::Relaxed),
)
}
pub(super) fn adaptive_floor_min_writers_multi_endpoint(&self) -> usize {
(self
.floor_runtime
@@ -1643,7 +1659,6 @@ impl MePool {
&self,
contour: WriterContour,
allow_coverage_override: bool,
writer_dc: i32,
) -> bool {
let (active_writers, warm_writers, _) = self.non_draining_writer_counts_by_contour().await;
match contour {
@@ -1655,43 +1670,6 @@ impl MePool {
if !allow_coverage_override {
return false;
}
let mut endpoints_len = 0;
let now_epoch = Self::now_epoch_secs();
if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch) {
if let Some(addrs) = self.proxy_map_v4.read().await.get(&writer_dc) {
endpoints_len += addrs.len();
}
}
if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch) {
if let Some(addrs) = self.proxy_map_v6.read().await.get(&writer_dc) {
endpoints_len += addrs.len();
}
}
if endpoints_len > 0 {
let base_req =
self.required_writers_for_dc_with_floor_mode(endpoints_len, false);
let active_for_dc = {
let ws = self.writers.read().await;
ws.iter()
.filter(|w| {
!w.draining.load(std::sync::atomic::Ordering::Relaxed)
&& w.writer_dc == writer_dc
&& matches!(
WriterContour::from_u8(
w.contour.load(std::sync::atomic::Ordering::Relaxed),
),
WriterContour::Active
)
})
.count()
};
if active_for_dc < base_req {
return true;
}
}
let coverage_required = self.active_coverage_required_total().await;
active_writers < coverage_required
}
+2 -17
View File
@@ -77,12 +77,6 @@ impl MePool {
return Vec::new();
}
if endpoints.len() == 1 && self.single_endpoint_outage_disable_quarantine() {
let mut guard = self.endpoint_quarantine.lock().await;
guard.retain(|_, expiry| *expiry > Instant::now());
return endpoints.to_vec();
}
let mut guard = self.endpoint_quarantine.lock().await;
let now = Instant::now();
guard.retain(|_, expiry| *expiry > now);
@@ -242,18 +236,8 @@ impl MePool {
let fast_retries = self.reconnect_runtime.me_reconnect_fast_retry_count.max(1);
let mut total_attempts = 0u32;
let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await;
let dc_endpoints = self.endpoints_for_dc(writer_dc).await;
let single_endpoint_dc = dc_endpoints.len() == 1 && dc_endpoints[0] == addr;
let bypass_quarantine_for_single_endpoint =
single_endpoint_dc && self.single_endpoint_outage_disable_quarantine();
if !same_endpoint_quarantined || bypass_quarantine_for_single_endpoint {
if same_endpoint_quarantined && bypass_quarantine_for_single_endpoint {
debug!(
%addr,
"Bypassing quarantine for immediate reconnect on single-endpoint DC"
);
}
if !same_endpoint_quarantined {
for attempt in 0..fast_retries {
if total_attempts >= ME_REFILL_TOTAL_ATTEMPT_CAP {
break;
@@ -292,6 +276,7 @@ impl MePool {
);
}
let dc_endpoints = self.endpoints_for_dc(writer_dc).await;
if dc_endpoints.is_empty() {
self.stats.increment_me_refill_failed_total();
return false;
+1 -1
View File
@@ -342,7 +342,7 @@ impl MePool {
allow_coverage_override: bool,
) -> Result<()> {
if !self
.can_open_writer_for_contour(contour, allow_coverage_override, writer_dc)
.can_open_writer_for_contour(contour, allow_coverage_override)
.await
{
return Err(ProxyError::Proxy(format!(
@@ -109,16 +109,18 @@ async fn connectable_endpoints_waits_until_quarantine_expires() {
{
let mut guard = pool.endpoint_quarantine.lock().await;
guard.insert(addr, Instant::now() + Duration::from_millis(500));
guard.insert(addr, Instant::now() + Duration::from_millis(80));
}
let endpoints = tokio::time::timeout(
Duration::from_millis(120),
pool.connectable_endpoints_for_test(&[addr]),
)
.await
.expect("single-endpoint outage mode should bypass quarantine delay");
let started = Instant::now();
let endpoints = pool.connectable_endpoints_for_test(&[addr]).await;
let elapsed = started.elapsed();
assert_eq!(endpoints, vec![addr]);
assert!(
elapsed >= Duration::from_millis(50),
"single-endpoint DC should honor quarantine before retry"
);
}
#[tokio::test]
-50
View File
@@ -158,56 +158,6 @@ pub fn create_outgoing_socket_bound(addr: SocketAddr, bind_addr: Option<IpAddr>)
Ok(socket)
}
/// Pin an outgoing socket to a specific Linux network interface via SO_BINDTODEVICE.
#[cfg(target_os = "linux")]
pub fn bind_outgoing_socket_to_device(socket: &Socket, device: &str) -> Result<()> {
use std::io::{Error, ErrorKind};
use std::os::fd::AsRawFd;
let name = device.trim();
if name.is_empty() {
return Err(Error::new(
ErrorKind::InvalidInput,
"bindtodevice must not be empty",
));
}
// The kernel expects an interface name buffer with a trailing NUL.
if name.len() >= libc::IFNAMSIZ {
return Err(Error::new(
ErrorKind::InvalidInput,
"bindtodevice exceeds IFNAMSIZ",
));
}
let mut ifname = [0u8; libc::IFNAMSIZ];
ifname[..name.len()].copy_from_slice(name.as_bytes());
let rc = unsafe {
libc::setsockopt(
socket.as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_BINDTODEVICE,
ifname.as_ptr().cast::<libc::c_void>(),
(name.len() + 1) as libc::socklen_t,
)
};
if rc != 0 {
return Err(Error::last_os_error());
}
debug!("Pinned outgoing socket to interface {}", name);
Ok(())
}
/// Stub for non-Linux targets where SO_BINDTODEVICE is unavailable.
#[cfg(not(target_os = "linux"))]
pub fn bind_outgoing_socket_to_device(_socket: &Socket, _device: &str) -> Result<()> {
use std::io::{Error, ErrorKind};
Err(Error::new(
ErrorKind::Unsupported,
"bindtodevice is supported only on Linux",
))
}
/// Get local address of a socket
#[allow(dead_code)]
pub fn get_local_addr(stream: &TcpStream) -> Option<SocketAddr> {
+25 -233
View File
@@ -26,9 +26,7 @@ use crate::stats::Stats;
use crate::transport::shadowsocks::{
ShadowsocksStream, connect_shadowsocks, sanitize_shadowsocks_url,
};
use crate::transport::socket::{
bind_outgoing_socket_to_device, create_outgoing_socket_bound, resolve_interface_ip,
};
use crate::transport::socket::{create_outgoing_socket_bound, resolve_interface_ip};
use crate::transport::socks::{connect_socks4, connect_socks5};
/// Number of Telegram datacenters
@@ -329,17 +327,6 @@ pub struct UpstreamManager {
}
impl UpstreamManager {
fn is_unscoped_upstream(upstream: &UpstreamConfig) -> bool {
upstream.scopes.is_empty()
}
fn should_check_in_default_dc_connectivity(
has_unscoped: bool,
upstream: &UpstreamConfig,
) -> bool {
!has_unscoped || Self::is_unscoped_upstream(upstream)
}
pub fn new(
configs: Vec<UpstreamConfig>,
connect_retry_attempts: u32,
@@ -466,87 +453,6 @@ impl UpstreamManager {
}
}
fn resolve_probe_dc_families(
upstream: &UpstreamConfig,
ipv4_available: bool,
ipv6_available: bool,
) -> (bool, bool) {
(
upstream.ipv4.unwrap_or(ipv4_available),
upstream.ipv6.unwrap_or(ipv6_available),
)
}
fn resolve_runtime_dc_families(
upstream: &UpstreamConfig,
dc_preference: IpPreference,
) -> (bool, bool) {
let (auto_ipv4, auto_ipv6) = match dc_preference {
IpPreference::PreferV4 => (true, false),
IpPreference::PreferV6 => (false, true),
IpPreference::BothWork | IpPreference::Unknown | IpPreference::Unavailable => {
(true, true)
}
};
(
upstream.ipv4.unwrap_or(auto_ipv4),
upstream.ipv6.unwrap_or(auto_ipv6),
)
}
fn dc_table_addr(dc_idx: i16, ipv6: bool, port: u16) -> Option<SocketAddr> {
let arr_idx = UpstreamState::dc_array_idx(dc_idx)?;
let ip = if ipv6 {
TG_DATACENTERS_V6[arr_idx]
} else {
TG_DATACENTERS_V4[arr_idx]
};
Some(SocketAddr::new(ip, port))
}
fn resolve_runtime_dc_target(
target: SocketAddr,
dc_idx: Option<i16>,
upstream: &UpstreamConfig,
dc_preference: IpPreference,
) -> Result<SocketAddr> {
let (allow_ipv4, allow_ipv6) = Self::resolve_runtime_dc_families(upstream, dc_preference);
if (target.is_ipv4() && allow_ipv4) || (target.is_ipv6() && allow_ipv6) {
return Ok(target);
}
if !allow_ipv4 && !allow_ipv6 {
return Err(ProxyError::Config(format!(
"Upstream DC family policy blocks all families for target {target}"
)));
}
let Some(dc_idx) = dc_idx else {
return Err(ProxyError::Config(format!(
"Upstream DC family policy cannot remap target {target} without dc_idx"
)));
};
let remapped = if target.is_ipv4() {
if allow_ipv6 {
Self::dc_table_addr(dc_idx, true, target.port())
} else {
None
}
} else if allow_ipv4 {
Self::dc_table_addr(dc_idx, false, target.port())
} else {
None
};
remapped.ok_or_else(|| {
ProxyError::Config(format!(
"Upstream DC family policy rejected target {target} (dc_idx={dc_idx})"
))
})
}
#[cfg(unix)]
fn resolve_interface_addrs(name: &str, want_ipv6: bool) -> Vec<IpAddr> {
use nix::ifaddrs::getifaddrs;
@@ -820,28 +726,18 @@ impl UpstreamManager {
.await
.ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?;
let (mut upstream, bind_rr, dc_preference) = {
let mut upstream = {
let guard = self.upstreams.read().await;
let state = &guard[idx];
let dc_preference = dc_idx
.and_then(UpstreamState::dc_array_idx)
.map(|dc_array_idx| state.dc_ip_pref[dc_array_idx])
.unwrap_or(IpPreference::Unknown);
(
state.config.clone(),
Some(state.bind_rr.clone()),
dc_preference,
)
guard[idx].config.clone()
};
if let Some(s) = scope {
upstream.selected_scope = s.to_string();
}
let target = if dc_idx.is_some() {
Self::resolve_runtime_dc_target(target, dc_idx, &upstream, dc_preference)?
} else {
target
let bind_rr = {
let guard = self.upstreams.read().await;
guard.get(idx).map(|u| u.bind_rr.clone())
};
let (stream, _) = self
@@ -862,18 +758,9 @@ impl UpstreamManager {
.await
.ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?;
let (mut upstream, bind_rr, dc_preference) = {
let mut upstream = {
let guard = self.upstreams.read().await;
let state = &guard[idx];
let dc_preference = dc_idx
.and_then(UpstreamState::dc_array_idx)
.map(|dc_array_idx| state.dc_ip_pref[dc_array_idx])
.unwrap_or(IpPreference::Unknown);
(
state.config.clone(),
Some(state.bind_rr.clone()),
dc_preference,
)
guard[idx].config.clone()
};
// Set scope for configuration copy
@@ -881,10 +768,9 @@ impl UpstreamManager {
upstream.selected_scope = s.to_string();
}
let target = if dc_idx.is_some() {
Self::resolve_runtime_dc_target(target, dc_idx, &upstream, dc_preference)?
} else {
target
let bind_rr = {
let guard = self.upstreams.read().await;
guard.get(idx).map(|u| u.bind_rr.clone())
};
let (stream, egress) = self
@@ -1042,7 +928,6 @@ impl UpstreamManager {
UpstreamType::Direct {
interface,
bind_addresses,
bindtodevice,
} => {
let bind_ip = Self::resolve_bind_address(
interface,
@@ -1058,10 +943,6 @@ impl UpstreamManager {
}
let socket = create_outgoing_socket_bound(target, bind_ip)?;
if let Some(device) = bindtodevice.as_deref().filter(|value| !value.is_empty()) {
bind_outgoing_socket_to_device(&socket, device).map_err(ProxyError::Io)?;
debug!(bindtodevice = %device, target = %target, "Pinned socket to interface");
}
if let Some(ip) = bind_ip {
debug!(bind = %ip, target = %target, "Bound outgoing socket");
} else if interface.is_some() || bind_addresses.is_some() {
@@ -1320,26 +1201,14 @@ impl UpstreamManager {
.map(|(i, u)| (i, u.config.clone(), u.bind_rr.clone()))
.collect()
};
let has_unscoped = upstreams
.iter()
.any(|(_, cfg, _)| Self::is_unscoped_upstream(cfg));
let mut all_results = Vec::new();
for (upstream_idx, upstream_config, bind_rr) in &upstreams {
// DC connectivity checks should follow the default routing path.
// Scoped upstreams are included only when no unscoped upstream exists.
if !Self::should_check_in_default_dc_connectivity(has_unscoped, upstream_config) {
continue;
}
let (upstream_ipv4_enabled, upstream_ipv6_enabled) =
Self::resolve_probe_dc_families(upstream_config, ipv4_enabled, ipv6_enabled);
let upstream_name = match &upstream_config.upstream_type {
UpstreamType::Direct {
interface,
bind_addresses,
bindtodevice,
} => {
let mut direct_parts = Vec::new();
if let Some(dev) = interface.as_deref().filter(|v| !v.is_empty()) {
@@ -1348,9 +1217,6 @@ impl UpstreamManager {
if let Some(src) = bind_addresses.as_ref().filter(|v| !v.is_empty()) {
direct_parts.push(format!("src={}", src.join(",")));
}
if let Some(device) = bindtodevice.as_deref().filter(|v| !v.is_empty()) {
direct_parts.push(format!("bindtodevice={device}"));
}
if direct_parts.is_empty() {
"direct".to_string()
} else {
@@ -1367,7 +1233,7 @@ impl UpstreamManager {
};
let mut v6_results = Vec::with_capacity(NUM_DCS);
if upstream_ipv6_enabled {
if ipv6_enabled {
for dc_zero_idx in 0..NUM_DCS {
let dc_v6 = TG_DATACENTERS_V6[dc_zero_idx];
let addr_v6 = SocketAddr::new(dc_v6, TG_DATACENTER_PORT);
@@ -1418,17 +1284,13 @@ impl UpstreamManager {
dc_idx: dc_zero_idx + 1,
dc_addr: SocketAddr::new(dc_v6, TG_DATACENTER_PORT),
rtt_ms: None,
error: Some(if ipv6_enabled {
"ipv6 disabled by upstream policy".to_string()
} else {
"ipv6 disabled".to_string()
}),
error: Some("ipv6 disabled".to_string()),
});
}
}
let mut v4_results = Vec::with_capacity(NUM_DCS);
if upstream_ipv4_enabled {
if ipv4_enabled {
for dc_zero_idx in 0..NUM_DCS {
let dc_v4 = TG_DATACENTERS_V4[dc_zero_idx];
let addr_v4 = SocketAddr::new(dc_v4, TG_DATACENTER_PORT);
@@ -1479,11 +1341,7 @@ impl UpstreamManager {
dc_idx: dc_zero_idx + 1,
dc_addr: SocketAddr::new(dc_v4, TG_DATACENTER_PORT),
rtt_ms: None,
error: Some(if ipv4_enabled {
"ipv4 disabled by upstream policy".to_string()
} else {
"ipv4 disabled".to_string()
}),
error: Some("ipv4 disabled".to_string()),
});
}
}
@@ -1503,9 +1361,7 @@ impl UpstreamManager {
match addr_str.parse::<SocketAddr>() {
Ok(addr) => {
let is_v6 = addr.is_ipv6();
if (is_v6 && !upstream_ipv6_enabled)
|| (!is_v6 && !upstream_ipv4_enabled)
{
if (is_v6 && !ipv6_enabled) || (!is_v6 && !ipv4_enabled) {
continue;
}
let result = tokio::time::timeout(
@@ -1740,32 +1596,13 @@ impl UpstreamManager {
continue;
}
let target_upstreams: Vec<usize> = {
let guard = self.upstreams.read().await;
let has_unscoped = guard
.iter()
.any(|upstream| Self::is_unscoped_upstream(&upstream.config));
guard
.iter()
.enumerate()
.filter(|(_, upstream)| {
Self::should_check_in_default_dc_connectivity(
has_unscoped,
&upstream.config,
)
})
.map(|(idx, _)| idx)
.collect()
};
for i in target_upstreams {
let count = self.upstreams.read().await.len();
for i in 0..count {
let (config, bind_rr) = {
let guard = self.upstreams.read().await;
let u = &guard[i];
(u.config.clone(), u.bind_rr.clone())
};
let (upstream_ipv4_enabled, upstream_ipv6_enabled) =
Self::resolve_probe_dc_families(&config, ipv4_enabled, ipv6_enabled);
let mut healthy_groups = 0usize;
let mut latency_updates: Vec<(usize, f64)> = Vec::new();
@@ -1781,30 +1618,14 @@ impl UpstreamManager {
continue;
}
let filtered_endpoints: Vec<SocketAddr> = endpoints
.iter()
.copied()
.filter(|endpoint| {
if endpoint.is_ipv4() {
upstream_ipv4_enabled
} else {
upstream_ipv6_enabled
}
})
.collect();
if filtered_endpoints.is_empty() {
continue;
}
let rotation_key = (i, group.dc_idx, is_primary);
let start_idx = *endpoint_rotation.entry(rotation_key).or_insert(0)
% filtered_endpoints.len();
let mut next_idx = (start_idx + 1) % filtered_endpoints.len();
let start_idx =
*endpoint_rotation.entry(rotation_key).or_insert(0) % endpoints.len();
let mut next_idx = (start_idx + 1) % endpoints.len();
for step in 0..filtered_endpoints.len() {
let endpoint_idx = (start_idx + step) % filtered_endpoints.len();
let endpoint = filtered_endpoints[endpoint_idx];
for step in 0..endpoints.len() {
let endpoint_idx = (start_idx + step) % endpoints.len();
let endpoint = endpoints[endpoint_idx];
let start = Instant::now();
let result = tokio::time::timeout(
@@ -1823,7 +1644,7 @@ impl UpstreamManager {
Ok(Ok(_stream)) => {
group_ok = true;
group_rtt_ms = Some(start.elapsed().as_secs_f64() * 1000.0);
next_idx = (endpoint_idx + 1) % filtered_endpoints.len();
next_idx = (endpoint_idx + 1) % endpoints.len();
break;
}
Ok(Err(e)) => {
@@ -2038,33 +1859,6 @@ mod tests {
assert!(!UpstreamManager::is_hard_connect_error(&error));
}
#[test]
fn unscoped_selection_detects_default_route_upstream() {
let mut upstream = UpstreamConfig {
upstream_type: UpstreamType::Direct {
interface: None,
bind_addresses: None,
bindtodevice: None,
},
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
};
assert!(UpstreamManager::is_unscoped_upstream(&upstream));
upstream.scopes = "local".to_string();
assert!(!UpstreamManager::is_unscoped_upstream(&upstream));
assert!(!UpstreamManager::should_check_in_default_dc_connectivity(
true, &upstream
));
assert!(UpstreamManager::should_check_in_default_dc_connectivity(
false, &upstream
));
}
#[test]
fn resolve_bind_address_prefers_explicit_bind_ip() {
let target = "203.0.113.10:443".parse::<SocketAddr>().unwrap();
@@ -2105,8 +1899,6 @@ mod tests {
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
100,
+510 -4670
View File
File diff suppressed because it is too large Load Diff
+55 -56
View File
@@ -24,7 +24,7 @@ from urllib.request import Request, urlopen
# Exceptions
# ---------------------------------------------------------------------------
class TelemtAPIError(Exception):
class TememtAPIError(Exception):
"""Raised when the API returns an error envelope or a transport error."""
def __init__(self, message: str, code: str | None = None,
@@ -35,7 +35,7 @@ class TelemtAPIError(Exception):
self.request_id = request_id
def __repr__(self) -> str:
return (f"TelemtAPIError(message={str(self)!r}, code={self.code!r}, "
return (f"TememtAPIError(message={str(self)!r}, code={self.code!r}, "
f"http_status={self.http_status}, request_id={self.request_id})")
@@ -58,7 +58,7 @@ class APIResponse:
# Main client
# ---------------------------------------------------------------------------
class TelemtAPI:
class TememtAPI:
"""
HTTP client for the Telemt Control API.
@@ -75,10 +75,10 @@ class TelemtAPI:
"""
def __init__(
self,
base_url: str = "http://127.0.0.1:9091",
auth_header: str | None = None,
timeout: int = 10,
self,
base_url: str = "http://127.0.0.1:9091",
auth_header: str | None = None,
timeout: int = 10,
) -> None:
self.base_url = base_url.rstrip("/")
self.auth_header = auth_header
@@ -98,12 +98,12 @@ class TelemtAPI:
return h
def _request(
self,
method: str,
path: str,
body: dict | None = None,
if_match: str | None = None,
query: dict | None = None,
self,
method: str,
path: str,
body: dict | None = None,
if_match: str | None = None,
query: dict | None = None,
) -> APIResponse:
url = self.base_url + path
if query:
@@ -133,22 +133,22 @@ class TelemtAPI:
try:
payload = json.loads(raw)
except Exception:
raise TelemtAPIError(
raise TememtAPIError(
str(exc), http_status=exc.code
) from exc
err = payload.get("error", {})
raise TelemtAPIError(
raise TememtAPIError(
err.get("message", str(exc)),
code=err.get("code"),
http_status=exc.code,
request_id=payload.get("request_id"),
) from exc
except URLError as exc:
raise TelemtAPIError(str(exc)) from exc
raise TememtAPIError(str(exc)) from exc
if not payload.get("ok"):
err = payload.get("error", {})
raise TelemtAPIError(
raise TememtAPIError(
err.get("message", "unknown error"),
code=err.get("code"),
request_id=payload.get("request_id"),
@@ -298,16 +298,16 @@ class TelemtAPI:
# ------------------------------------------------------------------
def create_user(
self,
username: str,
*,
secret: str | None = None,
user_ad_tag: str | None = None,
max_tcp_conns: int | None = None,
expiration_rfc3339: str | None = None,
data_quota_bytes: int | None = None,
max_unique_ips: int | None = None,
if_match: str | None = None,
self,
username: str,
*,
secret: str | None = None,
user_ad_tag: str | None = None,
max_tcp_conns: int | None = None,
expiration_rfc3339: str | None = None,
data_quota_bytes: int | None = None,
max_unique_ips: int | None = None,
if_match: str | None = None,
) -> APIResponse:
"""POST /v1/users — create a new user.
@@ -340,16 +340,16 @@ class TelemtAPI:
return self._post("/v1/users", body=body, if_match=if_match)
def patch_user(
self,
username: str,
*,
secret: str | None = None,
user_ad_tag: str | None = None,
max_tcp_conns: int | None = None,
expiration_rfc3339: str | None = None,
data_quota_bytes: int | None = None,
max_unique_ips: int | None = None,
if_match: str | None = None,
self,
username: str,
*,
secret: str | None = None,
user_ad_tag: str | None = None,
max_tcp_conns: int | None = None,
expiration_rfc3339: str | None = None,
data_quota_bytes: int | None = None,
max_unique_ips: int | None = None,
if_match: str | None = None,
) -> APIResponse:
"""PATCH /v1/users/{username} — partial update; only provided fields change.
@@ -385,10 +385,10 @@ class TelemtAPI:
if_match=if_match)
def delete_user(
self,
username: str,
*,
if_match: str | None = None,
self,
username: str,
*,
if_match: str | None = None,
) -> APIResponse:
"""DELETE /v1/users/{username} — remove user; blocks deletion of last user.
@@ -403,11 +403,11 @@ class TelemtAPI:
# in the route matcher (documented limitation). The method is provided
# for completeness and future compatibility.
def rotate_secret(
self,
username: str,
*,
secret: str | None = None,
if_match: str | None = None,
self,
username: str,
*,
secret: str | None = None,
if_match: str | None = None,
) -> APIResponse:
"""POST /v1/users/{username}/rotate-secret — rotate user secret.
@@ -533,12 +533,12 @@ EXAMPLES
help="Username for user commands")
# user create/patch fields
p.add_argument("--secret", default=None)
p.add_argument("--ad-tag", dest="ad_tag", default=None)
p.add_argument("--secret", default=None)
p.add_argument("--ad-tag", dest="ad_tag", default=None)
p.add_argument("--max-conns", dest="max_conns", type=int, default=None)
p.add_argument("--expires", default=None)
p.add_argument("--quota", type=int, default=None)
p.add_argument("--max-ips", dest="max_ips", type=int, default=None)
p.add_argument("--expires", default=None)
p.add_argument("--quota", type=int, default=None)
p.add_argument("--max-ips", dest="max_ips", type=int, default=None)
# events
p.add_argument("--limit", type=int, default=None,
@@ -564,10 +564,10 @@ if __name__ == "__main__":
sys.exit(0)
if cmd == "gen-secret":
print(TelemtAPI.generate_secret())
print(TememtAPI.generate_secret())
sys.exit(0)
api = TelemtAPI(args.url, auth_header=args.auth, timeout=args.timeout)
api = TememtAPI(args.url, auth_header=args.auth, timeout=args.timeout)
try:
# -- read endpoints --------------------------------------------------
@@ -690,8 +690,7 @@ if __name__ == "__main__":
parser.error("patch command requires <username>")
if not any([args.secret, args.ad_tag, args.max_conns,
args.expires, args.quota, args.max_ips]):
parser.error(
"patch requires at least one field (--secret, --max-conns, --expires, --quota, --max-ips, --ad-tag)")
parser.error("patch requires at least one field (--secret, --max-conns, --expires, --quota, --max-ips, --ad-tag)")
_print(api.patch_user(
args.arg,
secret=args.secret,
@@ -722,7 +721,7 @@ if __name__ == "__main__":
file=sys.stderr)
sys.exit(1)
except TelemtAPIError as exc:
except TememtAPIError as exc:
print(f"API error [{exc.http_status}] {exc.code}: {exc}", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt: