Compare commits

..

47 Commits

Author SHA1 Message Date
Alexey
6d6cd30227 STUN Fixes + ME Pool tweaks: merge pull request #260 from telemt/flow-mep
STUN Fixes + ME Pool tweaks
2026-02-26 19:47:29 +03:00
Alexey
60231224ac Update Cargo.toml 2026-02-26 19:41:37 +03:00
Alexey
144f81c473 ME Dead Writer w/o dead-lock on timeout 2026-02-26 19:37:17 +03:00
Alexey
04e6135935 TLS-F Fetching Optimization 2026-02-26 19:35:34 +03:00
Alexey
4eebb4feb2 ME Pool Refactoring 2026-02-26 19:01:24 +03:00
Alexey
1f255d0aa4 ME Probe + STUN Legacy 2026-02-26 18:41:11 +03:00
Alexey
9d2ff25bf5 Unified STUN + ME Primary parallelized
- Unified STUN server source-of-truth
- parallelize per-DC primary ME init for multi-endpoint DCs
2026-02-26 18:18:24 +03:00
Alexey
7782336264 ME Probe parallelized 2026-02-26 17:56:22 +03:00
Alexey
92a3529733 Merge pull request #253 from ivulit/feat/mask-proxy-protocol
feat: add mask_proxy_protocol option for PROXY protocol to mask_host
2026-02-26 15:44:47 +03:00
Alexey
8ce8348cd5 Merge branch 'main' into feat/mask-proxy-protocol 2026-02-26 15:21:58 +03:00
Alexey
e25b7f5ff8 STUN List 2026-02-26 15:10:21 +03:00
Alexey
d7182ae817 Update defaults.rs 2026-02-26 15:07:04 +03:00
Alexey
97f2dc8489 Merge pull request #251 from telemt/flow-defaults
Checked defaults
2026-02-26 15:05:01 +03:00
Alexey
fb1f85559c Update load.rs 2026-02-26 14:57:28 +03:00
ivulit
da684b11fe feat: add mask_proxy_protocol option for PROXY protocol to mask_host
Adds mask_proxy_protocol config option (0 = off, 1 = v1 text, 2 = v2 binary)
that sends a PROXY protocol header when connecting to mask_host. This lets
the backend see the real client IP address.

Particularly useful when the masking site (nginx/HAProxy) runs on the same
host as telemt and listens on a local port — without this, the backend loses
the original client IP entirely.

PROXY protocol header is also sent during TLS emulation fetches so that
backends with proxy_protocol required don't reject the connection.
2026-02-26 13:36:33 +03:00
Alexey
896e129155 Checked defaults 2026-02-26 12:48:22 +03:00
Alexey
7ead0cd753 Update README.md 2026-02-26 11:45:50 +03:00
Alexey
6cf9687dd6 Update README.md 2026-02-26 11:43:27 +03:00
Alexey
4e30a4999c Update config.toml 2026-02-26 11:14:52 +03:00
Alexey
4af40f7121 Update config.toml 2026-02-26 11:13:58 +03:00
Alexey
1e4ba2eb56 Update config.toml 2026-02-26 10:45:47 +03:00
Alexey
eb921e2b17 Merge pull request #248 from telemt/config-tuning
Update config.toml
2026-02-25 22:44:51 +03:00
Alexey
76f1b51018 Update config.toml 2026-02-25 22:44:38 +03:00
Alexey
03ce267865 Update config.toml 2026-02-25 22:33:38 +03:00
Alexey
a6bfa3309e Create config.toml 2026-02-25 22:32:02 +03:00
Alexey
79a3720fd5 Rename config.toml to config.full.toml 2026-02-25 22:22:04 +03:00
Alexey
89543aed35 Merge pull request #247 from telemt/config-tuning
Update config.toml
2026-02-25 21:47:26 +03:00
Alexey
06292ff833 Update config.toml 2026-02-25 21:33:06 +03:00
Alexey
427294b103 Defaults in-place: merge pull request #245 from telemt/flow-tuning
Defaults in-place
2026-02-25 18:09:20 +03:00
Alexey
fed9346444 New config.toml + tls_emulation enabled by default 2026-02-25 17:49:54 +03:00
Alexey
f40b645c05 Defaults in-place 2026-02-25 17:28:06 +03:00
Alexey
a66d5d56bb Merge pull request #243 from vladon/add-proxy-secret-to-gitignore
Add proxy-secret to .gitignore
2026-02-25 14:16:31 +03:00
Vladislav Yaroslavlev
1b1bdfe99a Add proxy-secret to .gitignore
The proxy-secret file contains sensitive authentication data
that should never be committed to version control.
2026-02-25 14:00:50 +03:00
Alexey
49fc11ddfa Merge pull request #242 from telemt/flow-link
Detected_IP in Links
2026-02-25 13:42:41 +03:00
Alexey
5558900c44 Update main.rs 2026-02-25 13:29:46 +03:00
Alexey
5b1d976392 Merge pull request #239 from twocolors/fix-info-bracket
fix: remove bracket in info
2026-02-25 10:22:22 +03:00
D
206f87fe64 fix: remove bracket in info 2026-02-25 09:22:26 +03:00
Alexey
5a09d30e1c Update Cargo.toml 2026-02-25 03:09:02 +03:00
Alexey
f83e23c521 Update defaults.rs 2026-02-25 03:08:34 +03:00
Alexey
f9e9ddd0f7 Merge pull request #238 from telemt/flow-mep
ME Pool Beobachter
2026-02-25 02:24:07 +03:00
Alexey
6b8619d3c9 Create beobachten.rs 2026-02-25 02:17:48 +03:00
Alexey
618b7a1837 ME Pool Beobachter 2026-02-25 02:10:14 +03:00
Alexey
16f166cec8 Update README.md 2026-02-25 02:07:58 +03:00
Alexey
6efcbe9bbf Update README.md 2026-02-25 02:05:32 +03:00
Alexey
e5ad27e26e Merge pull request #237 from Dimasssss/main
Update config.toml
2026-02-25 01:50:19 +03:00
Dimasssss
53ec96b040 Update config.toml 2026-02-25 01:37:55 +03:00
Alexey
c6c3d71b08 ME Pool Flap-Detect in statistics 2026-02-25 01:26:01 +03:00
27 changed files with 2845 additions and 1423 deletions

6
.gitignore vendored
View File

@@ -19,7 +19,5 @@ target
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
*.rs
target
Cargo.lock
src
proxy-secret

View File

@@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.0.15"
version = "3.1.2"
edition = "2024"
[dependencies]

View File

@@ -2,6 +2,8 @@
**Telemt** is a fast, secure, and feature-rich server written in Rust: it fully implements the official Telegram proxy algo and adds many production-ready improvements such as connection pooling, replay protection, detailed statistics, masking from "prying" eyes
[**Telemt Chat in Telegram**](https://t.me/telemtrs)
## NEWS and EMERGENCY
### ✈️ Telemt 3 is released!
<table>
@@ -10,28 +12,18 @@
### 🇷🇺 RU
#### Драфтинг LTS и текущие улучшения
#### Релиз 3.0.15 — 25 февраля
С 21 февраля мы начали подготовку LTS-версии.
25 февраля мы выпустили версию **3.0.15**
Мы внимательно анализируем весь доступный фидбек.
Наша цель — сделать LTS-кандидаты максимально стабильными, тщательно отлаженными и готовыми к long-run и highload production-сценариям.
Мы предполагаем, что она станет завершающей версией поколения 3.0 и уже сейчас мы рассматриваем её как **LTS-кандидата** для версии **3.1.0**!
---
После нескольких дней детального анализа особенностей работы Middle-End мы спроектировали и реализовали продуманный режим **ротации ME Writer**. Данный режим позволяет поддерживать стабильно высокую производительность в long-run сценариях без возникновения ошибок, связанных с некорректной конфигурацией прокси
#### Улучшения от 23 февраля
23 февраля были внесены улучшения производительности в режимах **DC** и **Middle-End (ME)**, с акцентом на обратный канал (путь клиент → DC / ME).
Дополнительно реализован ряд изменений, направленных на повышение устойчивости системы:
- Смягчение сетевой нестабильности
- Повышение устойчивости к десинхронизации криптографии
- Снижение дрейфа сессий при неблагоприятных условиях
- Улучшение обработки ошибок в edge-case транспортных сценариях
Будем рады вашему фидбеку и предложениям по улучшению — особенно в части **статистики** и **UX**
Релиз:
[3.0.12](https://github.com/telemt/telemt/releases/tag/3.0.12)
[3.0.15](https://github.com/telemt/telemt/releases/tag/3.0.15)
---
@@ -48,28 +40,18 @@
### 🇬🇧 EN
#### LTS Drafting and Ongoing Improvements
#### Release 3.0.15 — February 25
Starting February 21, we began drafting the upcoming LTS version.
On February 25, we released version **3.0.15**
We are carefully reviewing and analyzing all available feedback.
The goal is to ensure that LTS candidates are максимально stable, thoroughly debugged, and ready for long-run and high-load production scenarios.
We expect this to become the final release of the 3.0 generation and at this point, we already see it as a strong **LTS candidate** for the upcoming **3.1.0** release!
---
After several days of deep analysis of Middle-End behavior, we designed and implemented a well-engineered **ME Writer rotation mode**. This mode enables sustained high throughput in long-run scenarios while preventing proxy misconfiguration errors
#### February 23 Improvements
On February 23, we introduced performance improvements for both **DC** and **Middle-End (ME)** modes, specifically optimizing the reverse channel (client → DC / ME data path).
Additionally, we implemented a set of robustness enhancements designed to:
- Mitigate network-related instability
- Improve resilience against cryptographic desynchronization
- Reduce session drift under adverse conditions
- Improve error handling in edge-case transport scenarios
We are looking forward to your feedback and improvement proposals — especially regarding **statistics** and **UX**
Release:
[3.0.12](https://github.com/telemt/telemt/releases/tag/3.0.12)
[3.0.15](https://github.com/telemt/telemt/releases/tag/3.0.15)
---

205
config.full.toml Normal file
View File

@@ -0,0 +1,205 @@
# Telemt full config with default values.
# Examples are kept in comments after '#'.
# Top-level legacy field.
show_link = [] # example: "*" or ["alice", "bob"]
# default_dc = 2 # example: default DC for unmapped non-standard DCs
[general]
fast_mode = true
use_middle_proxy = false
# ad_tag = "00000000000000000000000000000000" # example
# proxy_secret_path = "proxy-secret" # example custom path
# middle_proxy_nat_ip = "203.0.113.10" # example public NAT IP override
middle_proxy_nat_probe = true
# middle_proxy_nat_stun = "stun.l.google.com:19302" # example
# middle_proxy_nat_stun_servers = [] # example: ["stun1.l.google.com:19302", "stun2.l.google.com:19302"]
middle_proxy_pool_size = 8
middle_proxy_warm_standby = 16
me_keepalive_enabled = true
me_keepalive_interval_secs = 25
me_keepalive_jitter_secs = 5
me_keepalive_payload_random = true
crypto_pending_buffer = 262144
max_client_frame = 16777216
desync_all_full = false
beobachten = true
beobachten_minutes = 10
beobachten_flush_secs = 15
beobachten_file = "cache/beobachten.txt"
hardswap = true
me_warmup_stagger_enabled = true
me_warmup_step_delay_ms = 500
me_warmup_step_jitter_ms = 300
me_reconnect_max_concurrent_per_dc = 8
me_reconnect_backoff_base_ms = 500
me_reconnect_backoff_cap_ms = 30000
me_reconnect_fast_retry_count = 12
stun_iface_mismatch_ignore = false
unknown_dc_log_path = "unknown-dc.txt" # to disable: set to null
log_level = "normal" # debug | verbose | normal | silent
disable_colors = false
fast_mode_min_tls_record = 0
update_every = 300
me_reinit_every_secs = 900
me_hardswap_warmup_delay_min_ms = 1000
me_hardswap_warmup_delay_max_ms = 2000
me_hardswap_warmup_extra_passes = 3
me_hardswap_warmup_pass_backoff_base_ms = 500
me_config_stable_snapshots = 2
me_config_apply_cooldown_secs = 300
proxy_secret_stable_snapshots = 2
proxy_secret_rotate_runtime = true
proxy_secret_len_max = 256
me_pool_drain_ttl_secs = 90
me_pool_min_fresh_ratio = 0.8
me_reinit_drain_timeout_secs = 120
# Legacy compatibility fields used when update_every is omitted.
proxy_secret_auto_reload_secs = 3600
proxy_config_auto_reload_secs = 3600
ntp_check = true
ntp_servers = ["pool.ntp.org"] # example: ["pool.ntp.org", "time.cloudflare.com"]
auto_degradation_enabled = true
degradation_min_unavailable_dc_groups = 2
[general.modes]
classic = false
secure = false
tls = true
[general.links]
show ="*" # example: "*" or ["alice", "bob"]
# public_host = "proxy.example.com" # example explicit host/IP for tg:// links
# public_port = 443 # example explicit port for tg:// links
[network]
ipv4 = true
ipv6 = false # set true to enable IPv6
prefer = 4 # 4 or 6
multipath = false
stun_servers = [
"stun.l.google.com:5349",
"stun1.l.google.com:3478",
"stun.gmx.net:3478",
"stun.l.google.com:19302",
"stun.1und1.de:3478",
"stun1.l.google.com:19302",
"stun2.l.google.com:19302",
"stun3.l.google.com:19302",
"stun4.l.google.com:19302",
"stun.services.mozilla.com:3478",
"stun.stunprotocol.org:3478",
"stun.nextcloud.com:3478",
"stun.voip.eutelia.it:3478",
]
stun_tcp_fallback = true
http_ip_detect_urls = ["https://ifconfig.me/ip", "https://api.ipify.org"]
cache_public_ip_path = "cache/public_ip.txt"
[server]
port = 443
listen_addr_ipv4 = "0.0.0.0"
listen_addr_ipv6 = "::"
# listen_unix_sock = "/var/run/telemt.sock" # example
# listen_unix_sock_perm = "0660" # example unix socket mode
# listen_tcp = true # example explicit override (auto-detected when omitted)
proxy_protocol = false
# metrics_port = 9090 # example
metrics_whitelist = ["127.0.0.1/32", "::1/128"]
# Example explicit listeners (default: omitted, auto-generated from listen_addr_*):
# [[server.listeners]]
# ip = "0.0.0.0"
# announce = "proxy-v4.example.com"
# # announce_ip = "203.0.113.10" # deprecated alias
# proxy_protocol = false
# reuse_allow = false
#
# [[server.listeners]]
# ip = "::"
# announce = "proxy-v6.example.com"
# proxy_protocol = false
# reuse_allow = false
[timeouts]
client_handshake = 15
tg_connect = 10
client_keepalive = 60
client_ack = 300
me_one_retry = 3
me_one_timeout_ms = 1500
[censorship]
tls_domain = "petrovich.ru"
# tls_domains = ["example.com", "cdn.example.net"] # Additional domains for EE links
mask = true
# mask_host = "www.google.com" # example, defaults to tls_domain when both mask_host/mask_unix_sock are unset
# mask_unix_sock = "/var/run/nginx.sock" # example, mutually exclusive with mask_host
mask_port = 443
# mask_proxy_protocol = 0 # Send PROXY protocol header to mask_host: 0 = off, 1 = v1 (text), 2 = v2 (binary)
fake_cert_len = 2048 # if tls_emulation=false and default value is used, loader may randomize this value at runtime
tls_emulation = true
tls_front_dir = "tlsfront"
server_hello_delay_min_ms = 0
server_hello_delay_max_ms = 0
tls_new_session_tickets = 0
tls_full_cert_ttl_secs = 90
alpn_enforce = true
[access]
replay_check_len = 65536
replay_window_secs = 1800
ignore_time_skew = false
[access.users]
# format: "username" = "32_hex_chars_secret"
hello = "00000000000000000000000000000000"
# alice = "11111111111111111111111111111111" # example
[access.user_max_tcp_conns]
# alice = 100 # example
[access.user_expirations]
# alice = "2078-01-01T00:00:00Z" # example
[access.user_data_quota]
# hello = 10737418240 # example bytes
# alice = 10737418240 # example bytes
[access.user_max_unique_ips]
# hello = 10 # example
# alice = 100 # example
# Default behavior if [[upstreams]] is omitted: loader injects one direct upstream.
# Example explicit upstreams:
# [[upstreams]]
# type = "direct"
# interface = "eth0"
# bind_addresses = ["192.0.2.10"]
# weight = 1
# enabled = true
# scopes = "*"
#
# [[upstreams]]
# type = "socks4"
# address = "198.51.100.20:1080"
# interface = "eth0"
# user_id = "telemt"
# weight = 1
# enabled = true
# scopes = "*"
#
# [[upstreams]]
# type = "socks5"
# address = "198.51.100.30:1080"
# interface = "eth0"
# username = "proxy-user"
# password = "proxy-pass"
# weight = 1
# enabled = true
# scopes = "*"
# === DC Address Overrides ===
# [dc_overrides]
# "201" = "149.154.175.50:443" # example
# "202" = ["149.154.167.51:443", "149.154.175.100:443"] # example
# "203" = "91.105.192.100:443" # loader auto-adds this one when omitted

View File

@@ -1,11 +1,11 @@
### Telemt Based Config.toml
# We believe that these settings are sufficient for most scenarios
# where cutting-egde methods and parameters or special solutions are not needed
# === General Settings ===
[general]
fast_mode = true
use_middle_proxy = true
use_middle_proxy = false
# ad_tag = "00000000000000000000000000000000"
# Path to proxy-secret binary (auto-downloaded if missing).
proxy_secret_path = "proxy-secret"
# disable_colors = false # Disable colored output in logs (useful for files/systemd)
# === Log Level ===
# Log level: debug | verbose | normal | silent
@@ -13,51 +13,6 @@ proxy_secret_path = "proxy-secret"
# RUST_LOG env var takes absolute priority over all of these
log_level = "normal"
# === Middle Proxy - ME ===
# Public IP override for ME KDF when behind NAT; leave unset to auto-detect.
# middle_proxy_nat_ip = "203.0.113.10"
# Enable STUN probing to discover public IP:port for ME.
middle_proxy_nat_probe = true
# Primary STUN server (host:port); defaults to Telegram STUN when empty.
middle_proxy_nat_stun = "stun.l.google.com:19302"
# Optional fallback STUN servers list.
middle_proxy_nat_stun_servers = ["stun1.l.google.com:19302", "stun2.l.google.com:19302"]
# Desired number of concurrent ME writers in pool.
middle_proxy_pool_size = 8
# Pre-initialized warm-standby ME connections kept idle.
middle_proxy_warm_standby = 8
# Ignore STUN/interface mismatch and keep ME enabled even if IP differs.
stun_iface_mismatch_ignore = false
# Keepalive padding frames - fl==4
me_keepalive_enabled = true
me_keepalive_interval_secs = 25 # Period between keepalives
me_keepalive_jitter_secs = 5 # Jitter added to interval
me_keepalive_payload_random = true # Randomize 4-byte payload (vs zeros)
# Stagger extra ME connections on warmup to de-phase lifecycles.
me_warmup_stagger_enabled = true
me_warmup_step_delay_ms = 500 # Base delay between extra connects
me_warmup_step_jitter_ms = 300 # Jitter for warmup delay
# Reconnect policy knobs.
me_reconnect_max_concurrent_per_dc = 4 # Parallel reconnects per DC - EXPERIMENTAL! UNSTABLE!
me_reconnect_backoff_base_ms = 500 # Backoff start
me_reconnect_backoff_cap_ms = 30000 # Backoff cap
me_reconnect_fast_retry_count = 11 # Quick retries before backoff
update_every = 7200 # Resolve the active updater interval for ME infrastructure refresh tasks.
crypto_pending_buffer = 262144 # Max pending ciphertext buffer per client writer (bytes). Controls FakeTLS backpressure vs throughput.
max_client_frame = 16777216 # Maximum allowed client MTProto frame size (bytes).
desync_all_full = false # Emit full crypto-desync forensic logs for every event. When false, full forensic details are emitted once per key window.
auto_degradation_enabled = true # Enable auto-degradation from ME to Direct-DC.
degradation_min_unavailable_dc_groups = 2 # Minimum unavailable ME DC groups before degrading.
hardswap = true # Enable C-like hard-swap for ME pool generations. When true, Telemt prewarms a new generation and switches once full coverage is reached.
me_pool_drain_ttl_secs = 90 # Drain-TTL in seconds for stale ME writers after endpoint map changes. During TTL, stale writers may be used only as fallback for new bindings.
me_pool_min_fresh_ratio = 0.8 # Minimum desired-DC coverage ratio required before draining stale writers. Range: 0.0..=1.0.
me_reinit_drain_timeout_secs = 120 # Drain timeout in seconds for stale ME writers after endpoint map changes. Set to 0 to keep stale writers draining indefinitely (no force-close).
me_config_stable_snapshots = 2 # Number of identical getProxyConfig snapshots required before applying ME map updates.
me_config_apply_cooldown_secs = 300 # Cooldown in seconds between applied ME map updates.
proxy_secret_rotate_runtime = true # Enable runtime proxy-secret rotation from getProxySecret.
proxy_secret_stable_snapshots = 2 # Number of identical getProxySecret snapshots required before runtime secret rotation.
proxy_secret_len_max = 256 # Maximum allowed proxy-secret length in bytes for startup and runtime refresh.
[general.modes]
classic = false
secure = false
@@ -70,93 +25,24 @@ show = "*"
# public_host = "proxy.example.com" # Host (IP or domain) for tg:// links
# public_port = 443 # Port for tg:// links (default: server.port)
# === Network Parameters ===
[network]
# Enable/disable families: true/false/auto(None)
ipv4 = true
ipv6 = false # UNSTABLE WITH ME
# prefer = 4 or 6
prefer = 4
multipath = false # EXPERIMENTAL!
# === Server Binding ===
[server]
port = 443
listen_addr_ipv4 = "0.0.0.0"
listen_addr_ipv6 = "::"
# listen_unix_sock = "/var/run/telemt.sock" # Unix socket
# listen_unix_sock_perm = "0666" # Socket file permissions
# proxy_protocol = false # Enable if behind HAProxy/nginx with PROXY protocol
# metrics_port = 9090
# metrics_whitelist = ["127.0.0.1", "::1"]
# metrics_whitelist = ["127.0.0.1", "::1", "0.0.0.0/0"]
# Listen on multiple interfaces/IPs - IPv4
[[server.listeners]]
ip = "0.0.0.0"
# Listen on multiple interfaces/IPs - IPv6
[[server.listeners]]
ip = "::"
# === Timeouts (in seconds) ===
[timeouts]
client_handshake = 30
tg_connect = 10
client_keepalive = 60
client_ack = 300
# Quick ME reconnects for single-address DCs (count and per-attempt timeout, ms).
me_one_retry = 12
me_one_timeout_ms = 1200
# === Anti-Censorship & Masking ===
[censorship]
tls_domain = "petrovich.ru"
# tls_domains = ["example.com", "cdn.example.net"] # Additional domains for EE links
mask = true
mask_port = 443
# mask_host = "petrovich.ru" # Defaults to tls_domain if not set
# mask_unix_sock = "/var/run/nginx.sock" # Unix socket (mutually exclusive with mask_host)
fake_cert_len = 2048
# tls_emulation = false # Fetch real cert lengths and emulate TLS records
# tls_front_dir = "tlsfront" # Cache directory for TLS emulation
# === Access Control & Users ===
[access]
replay_check_len = 65536
replay_window_secs = 1800
ignore_time_skew = false
tls_emulation = true # Fetch real cert lengths and emulate TLS records
tls_front_dir = "tlsfront" # Cache directory for TLS emulation
[access.users]
# format: "username" = "32_hex_chars_secret"
hello = "00000000000000000000000000000000"
# [access.user_max_tcp_conns]
# hello = 50
# [access.user_max_unique_ips]
# hello = 5
# [access.user_data_quota]
# hello = 1073741824 # 1 GB
# [access.user_expirations]
# format: username = "[year]-[month]-[day]T[hour]:[minute]:[second]Z" UTC
# hello = "2027-01-01T00:00:00Z"
# === Upstreams & Routing ===
[[upstreams]]
type = "direct"
enabled = true
weight = 10
# interface = "192.168.1.100" # Bind outgoing to specific IP or iface name
# bind_addresses = ["192.168.1.100"] # List for round-robin binding (family must match target)
# [[upstreams]]
# type = "socks5"
# address = "127.0.0.1:1080"
# enabled = false
# weight = 1
# === DC Address Overrides ===
# [dc_overrides]
# "203" = "91.105.192.100:443"

View File

@@ -3,6 +3,15 @@ use ipnetwork::IpNetwork;
use serde::Deserialize;
// Helper defaults kept private to the config module.
const DEFAULT_NETWORK_IPV6: Option<bool> = Some(false);
const DEFAULT_STUN_TCP_FALLBACK: bool = true;
const DEFAULT_MIDDLE_PROXY_WARM_STANDBY: usize = 16;
const DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC: u32 = 8;
const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 16;
const DEFAULT_LISTEN_ADDR_IPV6: &str = "::";
const DEFAULT_ACCESS_USER: &str = "default";
const DEFAULT_ACCESS_SECRET: &str = "00000000000000000000000000000000";
pub(crate) fn default_true() -> bool {
true
}
@@ -12,7 +21,7 @@ pub(crate) fn default_port() -> u16 {
}
pub(crate) fn default_tls_domain() -> String {
"www.google.com".to_string()
"petrovich.ru".to_string()
}
pub(crate) fn default_mask_port() -> u16 {
@@ -36,7 +45,7 @@ pub(crate) fn default_replay_window_secs() -> u64 {
}
pub(crate) fn default_handshake_timeout() -> u64 {
15
30
}
pub(crate) fn default_connect_timeout() -> u64 {
@@ -51,17 +60,21 @@ pub(crate) fn default_ack_timeout() -> u64 {
300
}
pub(crate) fn default_me_one_retry() -> u8 {
3
12
}
pub(crate) fn default_me_one_timeout() -> u64 {
1500
1200
}
pub(crate) fn default_listen_addr() -> String {
"0.0.0.0".to_string()
}
pub(crate) fn default_listen_addr_ipv4() -> Option<String> {
Some(default_listen_addr())
}
pub(crate) fn default_weight() -> u16 {
1
}
@@ -77,6 +90,14 @@ pub(crate) fn default_prefer_4() -> u8 {
4
}
pub(crate) fn default_network_ipv6() -> Option<bool> {
DEFAULT_NETWORK_IPV6
}
pub(crate) fn default_stun_tcp_fallback() -> bool {
DEFAULT_STUN_TCP_FALLBACK
}
pub(crate) fn default_unknown_dc_log_path() -> Option<String> {
Some("unknown-dc.txt".to_string())
}
@@ -85,6 +106,26 @@ pub(crate) fn default_pool_size() -> usize {
8
}
pub(crate) fn default_proxy_secret_path() -> Option<String> {
Some("proxy-secret".to_string())
}
pub(crate) fn default_middle_proxy_nat_stun() -> Option<String> {
None
}
pub(crate) fn default_middle_proxy_nat_stun_servers() -> Vec<String> {
Vec::new()
}
pub(crate) fn default_stun_nat_probe_concurrency() -> usize {
8
}
pub(crate) fn default_middle_proxy_warm_standby() -> usize {
DEFAULT_MIDDLE_PROXY_WARM_STANDBY
}
pub(crate) fn default_keepalive_interval() -> u64 {
25
}
@@ -109,6 +150,14 @@ pub(crate) fn default_reconnect_backoff_cap_ms() -> u64 {
30_000
}
pub(crate) fn default_me_reconnect_max_concurrent_per_dc() -> u32 {
DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC
}
pub(crate) fn default_me_reconnect_fast_retry_count() -> u32 {
DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT
}
pub(crate) fn default_crypto_pending_buffer() -> usize {
256 * 1024
}
@@ -121,6 +170,18 @@ pub(crate) fn default_desync_all_full() -> bool {
false
}
pub(crate) fn default_beobachten_minutes() -> u64 {
10
}
pub(crate) fn default_beobachten_flush_secs() -> u64 {
15
}
pub(crate) fn default_beobachten_file() -> String {
"cache/beobachten.txt".to_string()
}
pub(crate) fn default_tls_new_session_tickets() -> u8 {
0
}
@@ -179,7 +240,11 @@ pub(crate) fn default_proxy_config_reload_secs() -> u64 {
}
pub(crate) fn default_update_every_secs() -> u64 {
30 * 60
5 * 60
}
pub(crate) fn default_update_every() -> Option<u64> {
Some(default_update_every_secs())
}
pub(crate) fn default_me_reinit_every_secs() -> u64 {
@@ -254,6 +319,21 @@ pub(crate) fn default_degradation_min_unavailable_dc_groups() -> u8 {
2
}
pub(crate) fn default_listen_addr_ipv6() -> String {
DEFAULT_LISTEN_ADDR_IPV6.to_string()
}
pub(crate) fn default_listen_addr_ipv6_opt() -> Option<String> {
Some(default_listen_addr_ipv6())
}
pub(crate) fn default_access_users() -> HashMap<String, String> {
HashMap::from([(
DEFAULT_ACCESS_USER.to_string(),
DEFAULT_ACCESS_SECRET.to_string(),
)])
}
// Custom deserializer helpers
#[derive(Deserialize)]

View File

@@ -96,6 +96,9 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig) {
if old.general.use_middle_proxy != new.general.use_middle_proxy {
warn!("config reload: use_middle_proxy changed; restart required");
}
if old.general.stun_nat_probe_concurrency != new.general.stun_nat_probe_concurrency {
warn!("config reload: general.stun_nat_probe_concurrency changed; restart required");
}
}
/// Resolve the public host for link generation — mirrors the logic in main.rs.

View File

@@ -65,6 +65,16 @@ fn validate_network_cfg(net: &mut NetworkConfig) -> Result<()> {
Ok(())
}
fn push_unique_nonempty(target: &mut Vec<String>, value: String) {
let trimmed = value.trim();
if trimmed.is_empty() {
return;
}
if !target.iter().any(|existing| existing == trimmed) {
target.push(trimmed.to_string());
}
}
// ============= Main Config =============
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
@@ -116,8 +126,63 @@ impl ProxyConfig {
let base_dir = path.as_ref().parent().unwrap_or(Path::new("."));
let processed = preprocess_includes(&content, base_dir, 0)?;
let mut config: ProxyConfig =
let parsed_toml: toml::Value =
toml::from_str(&processed).map_err(|e| ProxyError::Config(e.to_string()))?;
let general_table = parsed_toml
.get("general")
.and_then(|value| value.as_table());
let network_table = parsed_toml
.get("network")
.and_then(|value| value.as_table());
let update_every_is_explicit = general_table
.map(|table| table.contains_key("update_every"))
.unwrap_or(false);
let legacy_secret_is_explicit = general_table
.map(|table| table.contains_key("proxy_secret_auto_reload_secs"))
.unwrap_or(false);
let legacy_config_is_explicit = general_table
.map(|table| table.contains_key("proxy_config_auto_reload_secs"))
.unwrap_or(false);
let stun_servers_is_explicit = network_table
.map(|table| table.contains_key("stun_servers"))
.unwrap_or(false);
let mut config: ProxyConfig =
parsed_toml.try_into().map_err(|e| ProxyError::Config(e.to_string()))?;
if !update_every_is_explicit && (legacy_secret_is_explicit || legacy_config_is_explicit) {
config.general.update_every = None;
}
let legacy_nat_stun = config.general.middle_proxy_nat_stun.take();
let legacy_nat_stun_servers = std::mem::take(&mut config.general.middle_proxy_nat_stun_servers);
let legacy_nat_stun_used = legacy_nat_stun.is_some() || !legacy_nat_stun_servers.is_empty();
if stun_servers_is_explicit {
let mut explicit_stun_servers = Vec::new();
for stun in std::mem::take(&mut config.network.stun_servers) {
push_unique_nonempty(&mut explicit_stun_servers, stun);
}
config.network.stun_servers = explicit_stun_servers;
if legacy_nat_stun_used {
warn!("general.middle_proxy_nat_stun and general.middle_proxy_nat_stun_servers are ignored because network.stun_servers is explicitly set");
}
} else {
// Keep the default STUN pool unless network.stun_servers is explicitly overridden.
let mut unified_stun_servers = default_stun_servers();
if let Some(stun) = legacy_nat_stun {
push_unique_nonempty(&mut unified_stun_servers, stun);
}
for stun in legacy_nat_stun_servers {
push_unique_nonempty(&mut unified_stun_servers, stun);
}
config.network.stun_servers = unified_stun_servers;
if legacy_nat_stun_used {
warn!("general.middle_proxy_nat_stun and general.middle_proxy_nat_stun_servers are deprecated; use network.stun_servers");
}
}
if let Some(update_every) = config.general.update_every {
if update_every == 0 {
@@ -147,12 +212,36 @@ impl ProxyConfig {
}
}
if config.general.stun_nat_probe_concurrency == 0 {
return Err(ProxyError::Config(
"general.stun_nat_probe_concurrency must be > 0".to_string(),
));
}
if config.general.me_reinit_every_secs == 0 {
return Err(ProxyError::Config(
"general.me_reinit_every_secs must be > 0".to_string(),
));
}
if config.general.beobachten_minutes == 0 {
return Err(ProxyError::Config(
"general.beobachten_minutes must be > 0".to_string(),
));
}
if config.general.beobachten_flush_secs == 0 {
return Err(ProxyError::Config(
"general.beobachten_flush_secs must be > 0".to_string(),
));
}
if config.general.beobachten_file.trim().is_empty() {
return Err(ProxyError::Config(
"general.beobachten_file cannot be empty".to_string(),
));
}
if config.general.me_hardswap_warmup_delay_max_ms == 0 {
return Err(ProxyError::Config(
"general.me_hardswap_warmup_delay_max_ms must be > 0".to_string(),
@@ -409,6 +498,64 @@ impl ProxyConfig {
mod tests {
use super::*;
#[test]
fn serde_defaults_remain_unchanged_for_present_sections() {
let toml = r#"
[network]
[general]
[server]
[access]
"#;
let cfg: ProxyConfig = toml::from_str(toml).unwrap();
assert_eq!(cfg.network.ipv6, default_network_ipv6());
assert_eq!(cfg.network.stun_tcp_fallback, default_stun_tcp_fallback());
assert_eq!(
cfg.general.middle_proxy_warm_standby,
default_middle_proxy_warm_standby()
);
assert_eq!(
cfg.general.me_reconnect_max_concurrent_per_dc,
default_me_reconnect_max_concurrent_per_dc()
);
assert_eq!(
cfg.general.me_reconnect_fast_retry_count,
default_me_reconnect_fast_retry_count()
);
assert_eq!(cfg.general.update_every, default_update_every());
assert_eq!(cfg.server.listen_addr_ipv4, default_listen_addr_ipv4());
assert_eq!(cfg.server.listen_addr_ipv6, default_listen_addr_ipv6_opt());
assert_eq!(cfg.access.users, default_access_users());
}
#[test]
fn impl_defaults_are_sourced_from_default_helpers() {
let network = NetworkConfig::default();
assert_eq!(network.ipv6, default_network_ipv6());
assert_eq!(network.stun_tcp_fallback, default_stun_tcp_fallback());
let general = GeneralConfig::default();
assert_eq!(
general.middle_proxy_warm_standby,
default_middle_proxy_warm_standby()
);
assert_eq!(
general.me_reconnect_max_concurrent_per_dc,
default_me_reconnect_max_concurrent_per_dc()
);
assert_eq!(
general.me_reconnect_fast_retry_count,
default_me_reconnect_fast_retry_count()
);
assert_eq!(general.update_every, default_update_every());
let server = ServerConfig::default();
assert_eq!(server.listen_addr_ipv6, Some(default_listen_addr_ipv6()));
let access = AccessConfig::default();
assert_eq!(access.users, default_access_users());
}
#[test]
fn dc_overrides_allow_string_and_array() {
let toml = r#"
@@ -512,6 +659,26 @@ mod tests {
let _ = std::fs::remove_file(path);
}
#[test]
fn stun_nat_probe_concurrency_zero_is_rejected() {
let toml = r#"
[general]
stun_nat_probe_concurrency = 0
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_stun_nat_probe_concurrency_zero_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.stun_nat_probe_concurrency must be > 0"));
let _ = std::fs::remove_file(path);
}
#[test]
fn me_reinit_every_default_is_set() {
let toml = r#"

View File

@@ -76,7 +76,7 @@ impl Default for ProxyModes {
Self {
classic: false,
secure: false,
tls: true,
tls: default_true(),
}
}
}
@@ -87,7 +87,7 @@ pub struct NetworkConfig {
pub ipv4: bool,
/// None = auto-detect IPv6 availability.
#[serde(default)]
#[serde(default = "default_network_ipv6")]
pub ipv6: Option<bool>,
/// 4 or 6.
@@ -102,7 +102,7 @@ pub struct NetworkConfig {
pub stun_servers: Vec<String>,
/// Enable TCP STUN fallback when UDP is blocked.
#[serde(default)]
#[serde(default = "default_stun_tcp_fallback")]
pub stun_tcp_fallback: bool,
/// HTTP-based public IP detection endpoints (fallback after STUN).
@@ -117,12 +117,12 @@ pub struct NetworkConfig {
impl Default for NetworkConfig {
fn default() -> Self {
Self {
ipv4: true,
ipv6: Some(false),
prefer: 4,
ipv4: default_true(),
ipv6: default_network_ipv6(),
prefer: default_prefer_4(),
multipath: false,
stun_servers: default_stun_servers(),
stun_tcp_fallback: true,
stun_tcp_fallback: default_stun_tcp_fallback(),
http_ip_detect_urls: default_http_ip_detect_urls(),
cache_public_ip_path: default_cache_public_ip_path(),
}
@@ -140,7 +140,7 @@ pub struct GeneralConfig {
#[serde(default = "default_true")]
pub fast_mode: bool,
#[serde(default)]
#[serde(default = "default_true")]
pub use_middle_proxy: bool,
#[serde(default)]
@@ -148,7 +148,7 @@ pub struct GeneralConfig {
/// Path to proxy-secret binary file (auto-downloaded if absent).
/// Infrastructure secret from https://core.telegram.org/getProxySecret.
#[serde(default)]
#[serde(default = "default_proxy_secret_path")]
pub proxy_secret_path: Option<String>,
/// Public IP override for middle-proxy NAT environments.
@@ -157,23 +157,29 @@ pub struct GeneralConfig {
pub middle_proxy_nat_ip: Option<IpAddr>,
/// Enable STUN-based NAT probing to discover public IP:port for ME KDF.
#[serde(default)]
#[serde(default = "default_true")]
pub middle_proxy_nat_probe: bool,
/// Optional STUN server address (host:port) for NAT probing.
#[serde(default)]
/// Deprecated legacy single STUN server for NAT probing.
/// Use `network.stun_servers` instead.
#[serde(default = "default_middle_proxy_nat_stun")]
pub middle_proxy_nat_stun: Option<String>,
/// Optional list of STUN servers for NAT probing fallback.
#[serde(default)]
/// Deprecated legacy STUN list for NAT probing fallback.
/// Use `network.stun_servers` instead.
#[serde(default = "default_middle_proxy_nat_stun_servers")]
pub middle_proxy_nat_stun_servers: Vec<String>,
/// Maximum number of concurrent STUN probes during NAT detection.
#[serde(default = "default_stun_nat_probe_concurrency")]
pub stun_nat_probe_concurrency: usize,
/// Desired size of active Middle-Proxy writer pool.
#[serde(default = "default_pool_size")]
pub middle_proxy_pool_size: usize,
/// Number of warm standby ME connections kept pre-initialized.
#[serde(default)]
#[serde(default = "default_middle_proxy_warm_standby")]
pub middle_proxy_warm_standby: usize,
/// Enable ME keepalive padding frames.
@@ -206,6 +212,22 @@ pub struct GeneralConfig {
#[serde(default = "default_desync_all_full")]
pub desync_all_full: bool,
/// Enable per-IP forensic observation buckets for scanners and handshake failures.
#[serde(default = "default_true")]
pub beobachten: bool,
/// Observation retention window in minutes for per-IP forensic buckets.
#[serde(default = "default_beobachten_minutes")]
pub beobachten_minutes: u64,
/// Snapshot flush interval in seconds for beob output file.
#[serde(default = "default_beobachten_flush_secs")]
pub beobachten_flush_secs: u64,
/// Snapshot file path for beob output.
#[serde(default = "default_beobachten_file")]
pub beobachten_file: String,
/// Enable C-like hard-swap for ME pool generations.
/// When true, Telemt prewarms a new generation and switches once full coverage is reached.
#[serde(default = "default_hardswap")]
@@ -224,7 +246,7 @@ pub struct GeneralConfig {
pub me_warmup_step_jitter_ms: u64,
/// Max concurrent reconnect attempts per DC.
#[serde(default)]
#[serde(default = "default_me_reconnect_max_concurrent_per_dc")]
pub me_reconnect_max_concurrent_per_dc: u32,
/// Base backoff in ms for reconnect.
@@ -236,7 +258,7 @@ pub struct GeneralConfig {
pub me_reconnect_backoff_cap_ms: u64,
/// Fast retry attempts before backoff.
#[serde(default)]
#[serde(default = "default_me_reconnect_fast_retry_count")]
pub me_reconnect_fast_retry_count: u32,
/// Ignore STUN/interface IP mismatch (keep using Middle Proxy even if NAT detected).
@@ -264,7 +286,7 @@ pub struct GeneralConfig {
/// Unified ME updater interval in seconds for getProxyConfig/getProxyConfigV6/getProxySecret.
/// When omitted, effective value falls back to legacy proxy_*_auto_reload_secs fields.
#[serde(default)]
#[serde(default = "default_update_every")]
pub update_every: Option<u64>,
/// Periodic ME pool reinitialization interval in seconds.
@@ -354,27 +376,28 @@ impl Default for GeneralConfig {
Self {
modes: ProxyModes::default(),
prefer_ipv6: false,
fast_mode: true,
use_middle_proxy: false,
fast_mode: default_true(),
use_middle_proxy: default_true(),
ad_tag: None,
proxy_secret_path: None,
proxy_secret_path: default_proxy_secret_path(),
middle_proxy_nat_ip: None,
middle_proxy_nat_probe: false,
middle_proxy_nat_stun: None,
middle_proxy_nat_stun_servers: Vec::new(),
middle_proxy_nat_probe: default_true(),
middle_proxy_nat_stun: default_middle_proxy_nat_stun(),
middle_proxy_nat_stun_servers: default_middle_proxy_nat_stun_servers(),
stun_nat_probe_concurrency: default_stun_nat_probe_concurrency(),
middle_proxy_pool_size: default_pool_size(),
middle_proxy_warm_standby: 16,
me_keepalive_enabled: true,
middle_proxy_warm_standby: default_middle_proxy_warm_standby(),
me_keepalive_enabled: default_true(),
me_keepalive_interval_secs: default_keepalive_interval(),
me_keepalive_jitter_secs: default_keepalive_jitter(),
me_keepalive_payload_random: true,
me_warmup_stagger_enabled: true,
me_keepalive_payload_random: default_true(),
me_warmup_stagger_enabled: default_true(),
me_warmup_step_delay_ms: default_warmup_step_delay_ms(),
me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(),
me_reconnect_max_concurrent_per_dc: 8,
me_reconnect_max_concurrent_per_dc: default_me_reconnect_max_concurrent_per_dc(),
me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(),
me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(),
me_reconnect_fast_retry_count: 8,
me_reconnect_fast_retry_count: default_me_reconnect_fast_retry_count(),
stun_iface_mismatch_ignore: false,
unknown_dc_log_path: default_unknown_dc_log_path(),
log_level: LogLevel::Normal,
@@ -383,9 +406,13 @@ impl Default for GeneralConfig {
crypto_pending_buffer: default_crypto_pending_buffer(),
max_client_frame: default_max_client_frame(),
desync_all_full: default_desync_all_full(),
beobachten: default_true(),
beobachten_minutes: default_beobachten_minutes(),
beobachten_flush_secs: default_beobachten_flush_secs(),
beobachten_file: default_beobachten_file(),
hardswap: default_hardswap(),
fast_mode_min_tls_record: default_fast_mode_min_tls_record(),
update_every: Some(default_update_every_secs()),
update_every: default_update_every(),
me_reinit_every_secs: default_me_reinit_every_secs(),
me_hardswap_warmup_delay_min_ms: default_me_hardswap_warmup_delay_min_ms(),
me_hardswap_warmup_delay_max_ms: default_me_hardswap_warmup_delay_max_ms(),
@@ -403,7 +430,7 @@ impl Default for GeneralConfig {
proxy_config_auto_reload_secs: default_proxy_config_reload_secs(),
ntp_check: default_ntp_check(),
ntp_servers: default_ntp_servers(),
auto_degradation_enabled: true,
auto_degradation_enabled: default_true(),
degradation_min_unavailable_dc_groups: default_degradation_min_unavailable_dc_groups(),
}
}
@@ -430,11 +457,11 @@ impl GeneralConfig {
}
/// `[general.links]` — proxy link generation settings.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LinksConfig {
/// List of usernames whose tg:// links to display at startup.
/// `"*"` = all users, `["alice", "bob"]` = specific users.
#[serde(default)]
#[serde(default = "default_links_show")]
pub show: ShowLink,
/// Public hostname/IP for tg:// link generation (overrides detected IP).
@@ -446,15 +473,25 @@ pub struct LinksConfig {
pub public_port: Option<u16>,
}
impl Default for LinksConfig {
fn default() -> Self {
Self {
show: default_links_show(),
public_host: None,
public_port: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
#[serde(default = "default_port")]
pub port: u16,
#[serde(default)]
#[serde(default = "default_listen_addr_ipv4")]
pub listen_addr_ipv4: Option<String>,
#[serde(default)]
#[serde(default = "default_listen_addr_ipv6_opt")]
pub listen_addr_ipv6: Option<String>,
#[serde(default)]
@@ -489,8 +526,8 @@ impl Default for ServerConfig {
fn default() -> Self {
Self {
port: default_port(),
listen_addr_ipv4: Some(default_listen_addr()),
listen_addr_ipv6: Some("::".to_string()),
listen_addr_ipv4: default_listen_addr_ipv4(),
listen_addr_ipv6: default_listen_addr_ipv6_opt(),
listen_unix_sock: None,
listen_unix_sock_perm: None,
listen_tcp: None,
@@ -563,7 +600,7 @@ pub struct AntiCensorshipConfig {
pub fake_cert_len: usize,
/// Enable TLS certificate emulation using cached real certificates.
#[serde(default)]
#[serde(default = "default_true")]
pub tls_emulation: bool,
/// Directory to store TLS front cache (on disk).
@@ -591,6 +628,12 @@ pub struct AntiCensorshipConfig {
/// Enforce ALPN echo of client preference.
#[serde(default = "default_alpn_enforce")]
pub alpn_enforce: bool,
/// Send PROXY protocol header when connecting to mask_host.
/// 0 = disabled, 1 = v1 (text), 2 = v2 (binary).
/// Allows the backend to see the real client IP.
#[serde(default)]
pub mask_proxy_protocol: u8,
}
impl Default for AntiCensorshipConfig {
@@ -598,25 +641,26 @@ impl Default for AntiCensorshipConfig {
Self {
tls_domain: default_tls_domain(),
tls_domains: Vec::new(),
mask: true,
mask: default_true(),
mask_host: None,
mask_port: default_mask_port(),
mask_unix_sock: None,
fake_cert_len: default_fake_cert_len(),
tls_emulation: false,
tls_emulation: true,
tls_front_dir: default_tls_front_dir(),
server_hello_delay_min_ms: default_server_hello_delay_min_ms(),
server_hello_delay_max_ms: default_server_hello_delay_max_ms(),
tls_new_session_tickets: default_tls_new_session_tickets(),
tls_full_cert_ttl_secs: default_tls_full_cert_ttl_secs(),
alpn_enforce: default_alpn_enforce(),
mask_proxy_protocol: 0,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AccessConfig {
#[serde(default)]
#[serde(default = "default_access_users")]
pub users: HashMap<String, String>,
#[serde(default)]
@@ -643,13 +687,8 @@ pub struct AccessConfig {
impl Default for AccessConfig {
fn default() -> Self {
let mut users = HashMap::new();
users.insert(
"default".to_string(),
"00000000000000000000000000000000".to_string(),
);
Self {
users,
users: default_access_users(),
user_max_tcp_conns: HashMap::new(),
user_expirations: HashMap::new(),
user_data_quota: HashMap::new(),
@@ -731,7 +770,7 @@ pub struct ListenerConfig {
/// In TOML, this can be:
/// - `show_link = "*"` — show links for all users
/// - `show_link = ["a", "b"]` — show links for specific users
/// - omitted — show no links (default)
/// - omitted — default depends on the owning config field
#[derive(Debug, Clone, Default)]
pub enum ShowLink {
/// Don't show any links (default when omitted).
@@ -743,6 +782,10 @@ pub enum ShowLink {
Specific(Vec<String>),
}
fn default_links_show() -> ShowLink {
ShowLink::All
}
impl ShowLink {
/// Returns true if no links should be shown.
pub fn is_empty(&self) -> bool {

View File

@@ -35,6 +35,7 @@ use crate::crypto::SecureRandom;
use crate::ip_tracker::UserIpTracker;
use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe};
use crate::proxy::ClientHandler;
use crate::stats::beobachten::BeobachtenStore;
use crate::stats::{ReplayChecker, Stats};
use crate::stream::BufferPool;
use crate::transport::middle_proxy::{
@@ -159,6 +160,15 @@ fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {
info!(target: "telemt::links", "------------------------");
}
async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result<()> {
if let Some(parent) = std::path::Path::new(path).parent()
&& !parent.as_os_str().is_empty()
{
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(path, payload).await
}
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let (config_path, cli_silent, cli_log_level) = parse_cli();
@@ -193,14 +203,14 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
};
let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info"));
// Configure color output based on config
let fmt_layer = if config.general.disable_colors {
fmt::Layer::default().with_ansi(false)
} else {
fmt::Layer::default().with_ansi(true)
};
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
@@ -244,10 +254,137 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
warn!("Using default tls_domain. Consider setting a custom domain.");
}
let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone()));
let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len());
tls_domains.push(config.censorship.tls_domain.clone());
for d in &config.censorship.tls_domains {
if !tls_domains.contains(d) {
tls_domains.push(d.clone());
}
}
// Start TLS front fetching in background immediately, in parallel with STUN probing.
let tls_cache: Option<Arc<TlsFrontCache>> = if config.censorship.tls_emulation {
let cache = Arc::new(TlsFrontCache::new(
&tls_domains,
config.censorship.fake_cert_len,
&config.censorship.tls_front_dir,
));
cache.load_from_disk().await;
let port = config.censorship.mask_port;
let proxy_protocol = config.censorship.mask_proxy_protocol;
let mask_host = config
.censorship
.mask_host
.clone()
.unwrap_or_else(|| config.censorship.tls_domain.clone());
let fetch_timeout = Duration::from_secs(5);
let cache_initial = cache.clone();
let domains_initial = tls_domains.clone();
let host_initial = mask_host.clone();
let upstream_initial = upstream_manager.clone();
tokio::spawn(async move {
let mut join = tokio::task::JoinSet::new();
for domain in domains_initial {
let cache_domain = cache_initial.clone();
let host_domain = host_initial.clone();
let upstream_domain = upstream_initial.clone();
join.spawn(async move {
match crate::tls_front::fetcher::fetch_real_tls(
&host_domain,
port,
&domain,
fetch_timeout,
Some(upstream_domain),
proxy_protocol,
)
.await
{
Ok(res) => cache_domain.update_from_fetch(&domain, res).await,
Err(e) => {
warn!(domain = %domain, error = %e, "TLS emulation initial fetch failed")
}
}
});
}
while let Some(res) = join.join_next().await {
if let Err(e) = res {
warn!(error = %e, "TLS emulation initial fetch task join failed");
}
}
});
let cache_timeout = cache.clone();
let domains_timeout = tls_domains.clone();
let fake_cert_len = config.censorship.fake_cert_len;
tokio::spawn(async move {
tokio::time::sleep(fetch_timeout).await;
for domain in domains_timeout {
let cached = cache_timeout.get(&domain).await;
if cached.domain == "default" {
warn!(
domain = %domain,
timeout_secs = fetch_timeout.as_secs(),
fake_cert_len,
"TLS-front fetch not ready within timeout; using cache/default fake cert fallback"
);
}
}
});
// Periodic refresh with jitter.
let cache_refresh = cache.clone();
let domains_refresh = tls_domains.clone();
let host_refresh = mask_host.clone();
let upstream_refresh = upstream_manager.clone();
tokio::spawn(async move {
loop {
let base_secs = rand::rng().random_range(4 * 3600..=6 * 3600);
let jitter_secs = rand::rng().random_range(0..=7200);
tokio::time::sleep(Duration::from_secs(base_secs + jitter_secs)).await;
let mut join = tokio::task::JoinSet::new();
for domain in domains_refresh.clone() {
let cache_domain = cache_refresh.clone();
let host_domain = host_refresh.clone();
let upstream_domain = upstream_refresh.clone();
join.spawn(async move {
match crate::tls_front::fetcher::fetch_real_tls(
&host_domain,
port,
&domain,
fetch_timeout,
Some(upstream_domain),
proxy_protocol,
)
.await
{
Ok(res) => cache_domain.update_from_fetch(&domain, res).await,
Err(e) => warn!(domain = %domain, error = %e, "TLS emulation refresh failed"),
}
});
}
while let Some(res) = join.join_next().await {
if let Err(e) = res {
warn!(error = %e, "TLS emulation refresh task join failed");
}
}
}
});
Some(cache)
} else {
None
};
let probe = run_probe(
&config.network,
config.general.middle_proxy_nat_stun.clone(),
config.general.middle_proxy_nat_probe,
config.general.stun_nat_probe_concurrency,
)
.await?;
let decision = decide_network_capabilities(&config.network, &probe);
@@ -256,6 +393,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let prefer_ipv6 = decision.prefer_ipv6();
let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me);
let stats = Arc::new(Stats::new());
let beobachten = Arc::new(BeobachtenStore::new());
let rng = Arc::new(SecureRandom::new());
// IP Tracker initialization
@@ -347,8 +485,9 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
proxy_secret,
config.general.middle_proxy_nat_ip,
config.general.middle_proxy_nat_probe,
config.general.middle_proxy_nat_stun.clone(),
config.general.middle_proxy_nat_stun_servers.clone(),
None,
config.network.stun_servers.clone(),
config.general.stun_nat_probe_concurrency,
probe.detected_ipv6,
config.timeouts.me_one_retry,
config.timeouts.me_one_timeout_ms,
@@ -380,26 +519,33 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
);
let pool_size = config.general.middle_proxy_pool_size.max(1);
match pool.init(pool_size, &rng).await {
Ok(()) => {
info!("Middle-End pool initialized successfully");
loop {
match pool.init(pool_size, &rng).await {
Ok(()) => {
info!("Middle-End pool initialized successfully");
// Phase 4: Start health monitor
let pool_clone = pool.clone();
let rng_clone = rng.clone();
let min_conns = pool_size;
tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
pool_clone, rng_clone, min_conns,
)
.await;
});
// Phase 4: Start health monitor
let pool_clone = pool.clone();
let rng_clone = rng.clone();
let min_conns = pool_size;
tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
pool_clone, rng_clone, min_conns,
)
.await;
});
Some(pool)
}
Err(e) => {
error!(error = %e, "Failed to initialize ME pool. Falling back to direct mode.");
None
break Some(pool);
}
Err(e) => {
warn!(
error = %e,
retry_in_secs = 2,
"ME pool is not ready yet; retrying startup initialization"
);
pool.reset_stun_state();
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
}
@@ -431,77 +577,8 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
Duration::from_secs(config.access.replay_window_secs),
));
let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone()));
let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096));
// TLS front cache (optional emulation)
let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len());
tls_domains.push(config.censorship.tls_domain.clone());
for d in &config.censorship.tls_domains {
if !tls_domains.contains(d) {
tls_domains.push(d.clone());
}
}
let tls_cache: Option<Arc<TlsFrontCache>> = if config.censorship.tls_emulation {
let cache = Arc::new(TlsFrontCache::new(
&tls_domains,
config.censorship.fake_cert_len,
&config.censorship.tls_front_dir,
));
cache.load_from_disk().await;
let port = config.censorship.mask_port;
let mask_host = config.censorship.mask_host.clone()
.unwrap_or_else(|| config.censorship.tls_domain.clone());
// Initial synchronous fetch to warm cache before serving clients.
for domain in tls_domains.clone() {
match crate::tls_front::fetcher::fetch_real_tls(
&mask_host,
port,
&domain,
Duration::from_secs(5),
Some(upstream_manager.clone()),
)
.await
{
Ok(res) => cache.update_from_fetch(&domain, res).await,
Err(e) => warn!(domain = %domain, error = %e, "TLS emulation fetch failed"),
}
}
// Periodic refresh with jitter.
let cache_clone = cache.clone();
let domains = tls_domains.clone();
let upstream_for_task = upstream_manager.clone();
tokio::spawn(async move {
loop {
let base_secs = rand::rng().random_range(4 * 3600..=6 * 3600);
let jitter_secs = rand::rng().random_range(0..=7200);
tokio::time::sleep(Duration::from_secs(base_secs + jitter_secs)).await;
for domain in &domains {
match crate::tls_front::fetcher::fetch_real_tls(
&mask_host,
port,
domain,
Duration::from_secs(5),
Some(upstream_for_task.clone()),
)
.await
{
Ok(res) => cache_clone.update_from_fetch(domain, res).await,
Err(e) => warn!(domain = %domain, error = %e, "TLS emulation refresh failed"),
}
}
}
});
Some(cache)
} else {
None
};
// Middle-End ping before DC connectivity
if let Some(ref pool) = me_pool {
let me_results = run_me_ping(pool, &rng).await;
@@ -595,9 +672,9 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
info!(" IPv4 in use / IPv6 is fallback");
}
} else if v6_works && !v4_works {
info!(" IPv6 only / IPv4 unavailable)");
info!(" IPv6 only / IPv4 unavailable");
} else if v4_works && !v6_works {
info!(" IPv4 only / IPv6 unavailable)");
info!(" IPv4 only / IPv6 unavailable");
} else if !v6_works && !v4_works {
info!(" No DC connectivity");
}
@@ -666,14 +743,8 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
rc_clone.run_periodic_cleanup().await;
});
let detected_ip_v4: Option<std::net::IpAddr> = probe
.reflected_ipv4
.map(|s| s.ip())
.or_else(|| probe.detected_ipv4.map(std::net::IpAddr::V4));
let detected_ip_v6: Option<std::net::IpAddr> = probe
.reflected_ipv6
.map(|s| s.ip())
.or_else(|| probe.detected_ipv6.map(std::net::IpAddr::V6));
let detected_ip_v4: Option<std::net::IpAddr> = probe.detected_ipv4.map(std::net::IpAddr::V4);
let detected_ip_v6: Option<std::net::IpAddr> = probe.detected_ipv6.map(std::net::IpAddr::V6);
debug!(
"Detected IPs: v4={:?} v6={:?}",
detected_ip_v4, detected_ip_v6
@@ -692,6 +763,26 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
detected_ip_v6,
);
let beobachten_writer = beobachten.clone();
let config_rx_beobachten = config_rx.clone();
tokio::spawn(async move {
loop {
let cfg = config_rx_beobachten.borrow().clone();
let sleep_secs = cfg.general.beobachten_flush_secs.max(1);
if cfg.general.beobachten {
let ttl = Duration::from_secs(cfg.general.beobachten_minutes.saturating_mul(60));
let path = cfg.general.beobachten_file.clone();
let snapshot = beobachten_writer.snapshot_text(ttl);
if let Err(e) = write_beobachten_snapshot(&path, &snapshot).await {
warn!(error = %e, path = %path, "Failed to flush beobachten snapshot");
}
}
tokio::time::sleep(Duration::from_secs(sleep_secs)).await;
}
});
if let Some(ref pool) = me_pool {
let pool_clone = pool.clone();
let rng_clone = rng.clone();
@@ -860,6 +951,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
let beobachten = beobachten.clone();
let max_connections_unix = max_connections.clone();
tokio::spawn(async move {
@@ -887,6 +979,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
let beobachten = beobachten.clone();
let proxy_protocol_enabled = config.server.proxy_protocol;
tokio::spawn(async move {
@@ -894,7 +987,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
if let Err(e) = crate::proxy::client::handle_client_stream(
stream, fake_peer, config, stats,
upstream_manager, replay_checker, buffer_pool, rng,
me_pool, tls_cache, ip_tracker, proxy_protocol_enabled,
me_pool, tls_cache, ip_tracker, beobachten, proxy_protocol_enabled,
).await {
debug!(error = %e, "Unix socket connection error");
}
@@ -942,9 +1035,11 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
if let Some(port) = config.server.metrics_port {
let stats = stats.clone();
let beobachten = beobachten.clone();
let config_rx_metrics = config_rx.clone();
let whitelist = config.server.metrics_whitelist.clone();
tokio::spawn(async move {
metrics::serve(port, stats, whitelist).await;
metrics::serve(port, stats, beobachten, config_rx_metrics, whitelist).await;
});
}
@@ -958,6 +1053,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
let beobachten = beobachten.clone();
let max_connections_tcp = max_connections.clone();
tokio::spawn(async move {
@@ -980,6 +1076,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
let beobachten = beobachten.clone();
let proxy_protocol_enabled = listener_proxy_protocol;
tokio::spawn(async move {
@@ -996,6 +1093,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
me_pool,
tls_cache,
ip_tracker,
beobachten,
proxy_protocol_enabled,
)
.run()

View File

@@ -1,6 +1,7 @@
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use http_body_util::Full;
use hyper::body::Bytes;
@@ -11,9 +12,17 @@ use ipnetwork::IpNetwork;
use tokio::net::TcpListener;
use tracing::{info, warn, debug};
use crate::config::ProxyConfig;
use crate::stats::beobachten::BeobachtenStore;
use crate::stats::Stats;
pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) {
pub async fn serve(
port: u16,
stats: Arc<Stats>,
beobachten: Arc<BeobachtenStore>,
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
whitelist: Vec<IpNetwork>,
) {
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = match TcpListener::bind(addr).await {
Ok(l) => l,
@@ -22,7 +31,7 @@ pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) {
return;
}
};
info!("Metrics endpoint: http://{}/metrics", addr);
info!("Metrics endpoint: http://{}/metrics and /beobachten", addr);
loop {
let (stream, peer) = match listener.accept().await {
@@ -39,10 +48,14 @@ pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) {
}
let stats = stats.clone();
let beobachten = beobachten.clone();
let config_rx_conn = config_rx.clone();
tokio::spawn(async move {
let svc = service_fn(move |req| {
let stats = stats.clone();
async move { handle(req, &stats) }
let beobachten = beobachten.clone();
let config = config_rx_conn.borrow().clone();
async move { handle(req, &stats, &beobachten, &config) }
});
if let Err(e) = http1::Builder::new()
.serve_connection(hyper_util::rt::TokioIo::new(stream), svc)
@@ -54,24 +67,48 @@ pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) {
}
}
fn handle<B>(req: Request<B>, stats: &Stats) -> Result<Response<Full<Bytes>>, Infallible> {
if req.uri().path() != "/metrics" {
fn handle<B>(
req: Request<B>,
stats: &Stats,
beobachten: &BeobachtenStore,
config: &ProxyConfig,
) -> Result<Response<Full<Bytes>>, Infallible> {
if req.uri().path() == "/metrics" {
let body = render_metrics(stats);
let resp = Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Full::new(Bytes::from("Not Found\n")))
.status(StatusCode::OK)
.header("content-type", "text/plain; version=0.0.4; charset=utf-8")
.body(Full::new(Bytes::from(body)))
.unwrap();
return Ok(resp);
}
if req.uri().path() == "/beobachten" {
let body = render_beobachten(beobachten, config);
let resp = Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/plain; charset=utf-8")
.body(Full::new(Bytes::from(body)))
.unwrap();
return Ok(resp);
}
let body = render_metrics(stats);
let resp = Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/plain; version=0.0.4; charset=utf-8")
.body(Full::new(Bytes::from(body)))
.status(StatusCode::NOT_FOUND)
.body(Full::new(Bytes::from("Not Found\n")))
.unwrap();
Ok(resp)
}
fn render_beobachten(beobachten: &BeobachtenStore, config: &ProxyConfig) -> String {
if !config.general.beobachten {
return "beobachten disabled\n".to_string();
}
let ttl = Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60));
beobachten.snapshot_text(ttl)
}
fn render_metrics(stats: &Stats) -> String {
use std::fmt::Write;
let mut out = String::with_capacity(4096);
@@ -199,6 +236,95 @@ fn render_metrics(stats: &Stats) -> String {
stats.get_pool_stale_pick_total()
);
let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals");
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter");
let _ = writeln!(
out,
"telemt_me_writer_removed_total {}",
stats.get_me_writer_removed_total()
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_removed_unexpected_total Unexpected ME writer removals that triggered refill"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_unexpected_total counter");
let _ = writeln!(
out,
"telemt_me_writer_removed_unexpected_total {}",
stats.get_me_writer_removed_unexpected_total()
);
let _ = writeln!(out, "# HELP telemt_me_refill_triggered_total Immediate ME refill runs started");
let _ = writeln!(out, "# TYPE telemt_me_refill_triggered_total counter");
let _ = writeln!(
out,
"telemt_me_refill_triggered_total {}",
stats.get_me_refill_triggered_total()
);
let _ = writeln!(
out,
"# HELP telemt_me_refill_skipped_inflight_total Immediate ME refill skips due to inflight dedup"
);
let _ = writeln!(out, "# TYPE telemt_me_refill_skipped_inflight_total counter");
let _ = writeln!(
out,
"telemt_me_refill_skipped_inflight_total {}",
stats.get_me_refill_skipped_inflight_total()
);
let _ = writeln!(out, "# HELP telemt_me_refill_failed_total Immediate ME refill failures");
let _ = writeln!(out, "# TYPE telemt_me_refill_failed_total counter");
let _ = writeln!(
out,
"telemt_me_refill_failed_total {}",
stats.get_me_refill_failed_total()
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_restored_same_endpoint_total Refilled ME writer restored on the same endpoint"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_restored_same_endpoint_total counter");
let _ = writeln!(
out,
"telemt_me_writer_restored_same_endpoint_total {}",
stats.get_me_writer_restored_same_endpoint_total()
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_restored_fallback_total Refilled ME writer restored via fallback endpoint"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_restored_fallback_total counter");
let _ = writeln!(
out,
"telemt_me_writer_restored_fallback_total {}",
stats.get_me_writer_restored_fallback_total()
);
let unresolved_writer_losses = stats
.get_me_writer_removed_unexpected_total()
.saturating_sub(
stats
.get_me_writer_restored_same_endpoint_total()
.saturating_add(stats.get_me_writer_restored_fallback_total()),
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_removed_unexpected_minus_restored_total Unexpected writer removals not yet compensated by restore"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
);
let _ = writeln!(
out,
"telemt_me_writer_removed_unexpected_minus_restored_total {}",
unresolved_writer_losses
);
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");
@@ -229,6 +355,7 @@ fn render_metrics(stats: &Stats) -> String {
#[cfg(test)]
mod tests {
use super::*;
use std::net::IpAddr;
use http_body_util::BodyExt;
#[test]
@@ -277,11 +404,17 @@ mod tests {
assert!(output.contains("# TYPE telemt_connections_total counter"));
assert!(output.contains("# TYPE telemt_connections_bad_total counter"));
assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
assert!(output.contains(
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
));
}
#[tokio::test]
async fn test_endpoint_integration() {
let stats = Arc::new(Stats::new());
let beobachten = Arc::new(BeobachtenStore::new());
let mut config = ProxyConfig::default();
stats.increment_connects_all();
stats.increment_connects_all();
stats.increment_connects_all();
@@ -290,16 +423,34 @@ mod tests {
.uri("/metrics")
.body(())
.unwrap();
let resp = handle(req, &stats).unwrap();
let resp = handle(req, &stats, &beobachten, &config).unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes();
assert!(std::str::from_utf8(body.as_ref()).unwrap().contains("telemt_connections_total 3"));
config.general.beobachten = true;
config.general.beobachten_minutes = 10;
beobachten.record(
"TLS-scanner",
"203.0.113.10".parse::<IpAddr>().unwrap(),
Duration::from_secs(600),
);
let req_beob = Request::builder()
.uri("/beobachten")
.body(())
.unwrap();
let resp_beob = handle(req_beob, &stats, &beobachten, &config).unwrap();
assert_eq!(resp_beob.status(), StatusCode::OK);
let body_beob = resp_beob.into_body().collect().await.unwrap().to_bytes();
let beob_text = std::str::from_utf8(body_beob.as_ref()).unwrap();
assert!(beob_text.contains("[TLS-scanner]"));
assert!(beob_text.contains("203.0.113.10-1"));
let req404 = Request::builder()
.uri("/other")
.body(())
.unwrap();
let resp404 = handle(req404, &stats).unwrap();
let resp404 = handle(req404, &stats, &beobachten, &config).unwrap();
assert_eq!(resp404.status(), StatusCode::NOT_FOUND);
}
}

View File

@@ -1,12 +1,16 @@
#![allow(dead_code)]
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::time::Duration;
use tracing::{info, warn};
use tokio::task::JoinSet;
use tokio::time::timeout;
use tracing::{debug, info, warn};
use crate::config::NetworkConfig;
use crate::error::Result;
use crate::network::stun::{stun_probe_dual, DualStunResult, IpFamily};
use crate::network::stun::{stun_probe_dual, DualStunResult, IpFamily, StunProbeResult};
#[derive(Debug, Clone, Default)]
pub struct NetworkProbe {
@@ -49,7 +53,13 @@ impl NetworkDecision {
}
}
pub async fn run_probe(config: &NetworkConfig, stun_addr: Option<String>, nat_probe: bool) -> Result<NetworkProbe> {
const STUN_BATCH_TIMEOUT: Duration = Duration::from_secs(5);
pub async fn run_probe(
config: &NetworkConfig,
nat_probe: bool,
stun_nat_probe_concurrency: usize,
) -> Result<NetworkProbe> {
let mut probe = NetworkProbe::default();
probe.detected_ipv4 = detect_local_ip_v4();
@@ -58,14 +68,17 @@ pub async fn run_probe(config: &NetworkConfig, stun_addr: Option<String>, nat_pr
probe.ipv4_is_bogon = probe.detected_ipv4.map(is_bogon_v4).unwrap_or(false);
probe.ipv6_is_bogon = probe.detected_ipv6.map(is_bogon_v6).unwrap_or(false);
let stun_server = stun_addr.unwrap_or_else(|| "stun.l.google.com:19302".to_string());
let stun_res = if nat_probe {
match stun_probe_dual(&stun_server).await {
Ok(res) => res,
Err(e) => {
warn!(error = %e, "STUN probe failed, continuing without reflection");
DualStunResult::default()
}
let servers = collect_stun_servers(config);
if servers.is_empty() {
warn!("STUN probe is enabled but network.stun_servers is empty");
DualStunResult::default()
} else {
probe_stun_servers_parallel(
&servers,
stun_nat_probe_concurrency.max(1),
)
.await
}
} else {
DualStunResult::default()
@@ -73,6 +86,17 @@ pub async fn run_probe(config: &NetworkConfig, stun_addr: Option<String>, nat_pr
probe.reflected_ipv4 = stun_res.v4.map(|r| r.reflected_addr);
probe.reflected_ipv6 = stun_res.v6.map(|r| r.reflected_addr);
// If STUN is blocked but IPv4 is private, try HTTP public-IP fallback.
if nat_probe
&& probe.reflected_ipv4.is_none()
&& probe.detected_ipv4.map(is_bogon_v4).unwrap_or(false)
{
if let Some(public_ip) = detect_public_ipv4_http(&config.http_ip_detect_urls).await {
probe.reflected_ipv4 = Some(SocketAddr::new(IpAddr::V4(public_ip), 0));
info!(public_ip = %public_ip, "STUN unavailable, using HTTP public IPv4 fallback");
}
}
probe.ipv4_nat_detected = match (probe.detected_ipv4, probe.reflected_ipv4) {
(Some(det), Some(reflected)) => det != reflected.ip(),
_ => false,
@@ -94,6 +118,111 @@ pub async fn run_probe(config: &NetworkConfig, stun_addr: Option<String>, nat_pr
Ok(probe)
}
async fn detect_public_ipv4_http(urls: &[String]) -> Option<Ipv4Addr> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(3))
.build()
.ok()?;
for url in urls {
let response = match client.get(url).send().await {
Ok(response) => response,
Err(_) => continue,
};
let body = match response.text().await {
Ok(body) => body,
Err(_) => continue,
};
let Ok(ip) = body.trim().parse::<Ipv4Addr>() else {
continue;
};
if !is_bogon_v4(ip) {
return Some(ip);
}
}
None
}
fn collect_stun_servers(config: &NetworkConfig) -> Vec<String> {
let mut out = Vec::new();
for s in &config.stun_servers {
if !s.is_empty() && !out.contains(s) {
out.push(s.clone());
}
}
out
}
async fn probe_stun_servers_parallel(
servers: &[String],
concurrency: usize,
) -> DualStunResult {
let mut join_set = JoinSet::new();
let mut next_idx = 0usize;
let mut best_v4_by_ip: HashMap<IpAddr, (usize, StunProbeResult)> = HashMap::new();
let mut best_v6_by_ip: HashMap<IpAddr, (usize, StunProbeResult)> = HashMap::new();
while next_idx < servers.len() || !join_set.is_empty() {
while next_idx < servers.len() && join_set.len() < concurrency {
let stun_addr = servers[next_idx].clone();
next_idx += 1;
join_set.spawn(async move {
let res = timeout(STUN_BATCH_TIMEOUT, stun_probe_dual(&stun_addr)).await;
(stun_addr, res)
});
}
let Some(task) = join_set.join_next().await else {
break;
};
match task {
Ok((stun_addr, Ok(Ok(result)))) => {
if let Some(v4) = result.v4 {
let entry = best_v4_by_ip.entry(v4.reflected_addr.ip()).or_insert((0, v4));
entry.0 += 1;
}
if let Some(v6) = result.v6 {
let entry = best_v6_by_ip.entry(v6.reflected_addr.ip()).or_insert((0, v6));
entry.0 += 1;
}
if result.v4.is_some() || result.v6.is_some() {
debug!(stun = %stun_addr, "STUN server responded within probe timeout");
}
}
Ok((stun_addr, Ok(Err(e)))) => {
debug!(error = %e, stun = %stun_addr, "STUN probe failed");
}
Ok((stun_addr, Err(_))) => {
debug!(stun = %stun_addr, "STUN probe timeout");
}
Err(e) => {
debug!(error = %e, "STUN probe task join failed");
}
}
}
let mut out = DualStunResult::default();
if let Some((_, best)) = best_v4_by_ip
.into_values()
.max_by_key(|(count, _)| *count)
{
info!("STUN-Quorum reached, IP: {}", best.reflected_addr.ip());
out.v4 = Some(best);
}
if let Some((_, best)) = best_v6_by_ip
.into_values()
.max_by_key(|(count, _)| *count)
{
info!("STUN-Quorum reached, IP: {}", best.reflected_addr.ip());
out.v6 = Some(best);
}
out
}
pub fn decide_network_capabilities(config: &NetworkConfig, probe: &NetworkProbe) -> NetworkDecision {
let ipv4_dc = config.ipv4 && probe.detected_ipv4.is_some();
let ipv6_dc = config.ipv6.unwrap_or(probe.detected_ipv6.is_some()) && probe.detected_ipv6.is_some();

View File

@@ -1,7 +1,7 @@
//! Client Handler
use std::future::Future;
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -27,6 +27,7 @@ use crate::error::{HandshakeResult, ProxyError, Result};
use crate::ip_tracker::UserIpTracker;
use crate::protocol::constants::*;
use crate::protocol::tls;
use crate::stats::beobachten::BeobachtenStore;
use crate::stats::{ReplayChecker, Stats};
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
use crate::transport::middle_proxy::MePool;
@@ -39,6 +40,36 @@ use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle
use crate::proxy::masking::handle_bad_client;
use crate::proxy::middle_relay::handle_via_middle_proxy;
fn beobachten_ttl(config: &ProxyConfig) -> Duration {
Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60))
}
fn record_beobachten_class(
beobachten: &BeobachtenStore,
config: &ProxyConfig,
peer_ip: IpAddr,
class: &str,
) {
if !config.general.beobachten {
return;
}
beobachten.record(class, peer_ip, beobachten_ttl(config));
}
fn record_handshake_failure_class(
beobachten: &BeobachtenStore,
config: &ProxyConfig,
peer_ip: IpAddr,
error: &ProxyError,
) {
let class = if error.to_string().contains("expected 64 bytes, got 0") {
"expected_64_got_0"
} else {
"other"
};
record_beobachten_class(beobachten, config, peer_ip, class);
}
pub async fn handle_client_stream<S>(
mut stream: S,
peer: SocketAddr,
@@ -51,6 +82,7 @@ pub async fn handle_client_stream<S>(
me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>,
beobachten: Arc<BeobachtenStore>,
proxy_protocol_enabled: bool,
) -> Result<()>
where
@@ -73,6 +105,7 @@ where
Err(e) => {
stats.increment_connects_bad();
warn!(peer = %peer, error = %e, "Invalid PROXY protocol header");
record_beobachten_class(&beobachten, &config, peer.ip(), "other");
return Err(e);
}
}
@@ -82,6 +115,9 @@ where
let handshake_timeout = Duration::from_secs(config.timeouts.client_handshake);
let stats_for_timeout = stats.clone();
let config_for_timeout = config.clone();
let beobachten_for_timeout = beobachten.clone();
let peer_for_timeout = real_peer.ip();
// For non-TCP streams, use a synthetic local address
let local_addr: SocketAddr = format!("0.0.0.0:{}", config.server.port)
@@ -103,7 +139,15 @@ where
debug!(peer = %real_peer, tls_len = tls_len, "TLS handshake too short");
stats.increment_connects_bad();
let (reader, writer) = tokio::io::split(stream);
handle_bad_client(reader, writer, &first_bytes, &config).await;
handle_bad_client(
reader,
writer,
&first_bytes,
real_peer,
&config,
&beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled);
}
@@ -120,7 +164,15 @@ where
HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad();
handle_bad_client(reader, writer, &handshake, &config).await;
handle_bad_client(
reader,
writer,
&handshake,
real_peer,
&config,
&beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled);
}
HandshakeResult::Error(e) => return Err(e),
@@ -156,7 +208,15 @@ where
debug!(peer = %real_peer, "Non-TLS modes disabled");
stats.increment_connects_bad();
let (reader, writer) = tokio::io::split(stream);
handle_bad_client(reader, writer, &first_bytes, &config).await;
handle_bad_client(
reader,
writer,
&first_bytes,
real_peer,
&config,
&beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled);
}
@@ -173,7 +233,15 @@ where
HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad();
handle_bad_client(reader, writer, &handshake, &config).await;
handle_bad_client(
reader,
writer,
&handshake,
real_peer,
&config,
&beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled);
}
HandshakeResult::Error(e) => return Err(e),
@@ -200,11 +268,23 @@ where
Ok(Ok(outcome)) => outcome,
Ok(Err(e)) => {
debug!(peer = %peer, error = %e, "Handshake failed");
record_handshake_failure_class(
&beobachten_for_timeout,
&config_for_timeout,
peer_for_timeout,
&e,
);
return Err(e);
}
Err(_) => {
stats_for_timeout.increment_handshake_timeouts();
debug!(peer = %peer, "Handshake timeout");
record_beobachten_class(
&beobachten_for_timeout,
&config_for_timeout,
peer_for_timeout,
"other",
);
return Err(ProxyError::TgHandshakeTimeout);
}
};
@@ -230,6 +310,7 @@ pub struct RunningClientHandler {
me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>,
beobachten: Arc<BeobachtenStore>,
proxy_protocol_enabled: bool,
}
@@ -246,6 +327,7 @@ impl ClientHandler {
me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>,
beobachten: Arc<BeobachtenStore>,
proxy_protocol_enabled: bool,
) -> RunningClientHandler {
RunningClientHandler {
@@ -260,6 +342,7 @@ impl ClientHandler {
me_pool,
tls_cache,
ip_tracker,
beobachten,
proxy_protocol_enabled,
}
}
@@ -284,17 +367,32 @@ impl RunningClientHandler {
let handshake_timeout = Duration::from_secs(self.config.timeouts.client_handshake);
let stats = self.stats.clone();
let config_for_timeout = self.config.clone();
let beobachten_for_timeout = self.beobachten.clone();
let peer_for_timeout = peer.ip();
// Phase 1: handshake (with timeout)
let outcome = match timeout(handshake_timeout, self.do_handshake()).await {
Ok(Ok(outcome)) => outcome,
Ok(Err(e)) => {
debug!(peer = %peer, error = %e, "Handshake failed");
record_handshake_failure_class(
&beobachten_for_timeout,
&config_for_timeout,
peer_for_timeout,
&e,
);
return Err(e);
}
Err(_) => {
stats.increment_handshake_timeouts();
debug!(peer = %peer, "Handshake timeout");
record_beobachten_class(
&beobachten_for_timeout,
&config_for_timeout,
peer_for_timeout,
"other",
);
return Err(ProxyError::TgHandshakeTimeout);
}
};
@@ -321,6 +419,12 @@ impl RunningClientHandler {
Err(e) => {
self.stats.increment_connects_bad();
warn!(peer = %self.peer, error = %e, "Invalid PROXY protocol header");
record_beobachten_class(
&self.beobachten,
&self.config,
self.peer.ip(),
"other",
);
return Err(e);
}
}
@@ -354,7 +458,15 @@ impl RunningClientHandler {
debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short");
self.stats.increment_connects_bad();
let (reader, writer) = self.stream.into_split();
handle_bad_client(reader, writer, &first_bytes, &self.config).await;
handle_bad_client(
reader,
writer,
&first_bytes,
peer,
&self.config,
&self.beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled);
}
@@ -385,7 +497,15 @@ impl RunningClientHandler {
HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad();
handle_bad_client(reader, writer, &handshake, &config).await;
handle_bad_client(
reader,
writer,
&handshake,
peer,
&config,
&self.beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled);
}
HandshakeResult::Error(e) => return Err(e),
@@ -446,7 +566,15 @@ impl RunningClientHandler {
debug!(peer = %peer, "Non-TLS modes disabled");
self.stats.increment_connects_bad();
let (reader, writer) = self.stream.into_split();
handle_bad_client(reader, writer, &first_bytes, &self.config).await;
handle_bad_client(
reader,
writer,
&first_bytes,
peer,
&self.config,
&self.beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled);
}
@@ -476,7 +604,15 @@ impl RunningClientHandler {
HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad();
handle_bad_client(reader, writer, &handshake, &config).await;
handle_bad_client(
reader,
writer,
&handshake,
peer,
&config,
&self.beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled);
}
HandshakeResult::Error(e) => return Err(e),

View File

@@ -1,6 +1,7 @@
//! Masking - forward unrecognized traffic to mask host
use std::str;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpStream;
#[cfg(unix)]
@@ -9,6 +10,8 @@ use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt};
use tokio::time::timeout;
use tracing::debug;
use crate::config::ProxyConfig;
use crate::stats::beobachten::BeobachtenStore;
use crate::transport::proxy_protocol::{ProxyProtocolV1Builder, ProxyProtocolV2Builder};
const MASK_TIMEOUT: Duration = Duration::from_secs(5);
/// Maximum duration for the entire masking relay.
@@ -50,20 +53,26 @@ pub async fn handle_bad_client<R, W>(
reader: R,
writer: W,
initial_data: &[u8],
peer: SocketAddr,
config: &ProxyConfig,
beobachten: &BeobachtenStore,
)
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
let client_type = detect_client_type(initial_data);
if config.general.beobachten {
let ttl = Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60));
beobachten.record(client_type, peer.ip(), ttl);
}
if !config.censorship.mask {
// Masking disabled, just consume data
consume_client_data(reader).await;
return;
}
let client_type = detect_client_type(initial_data);
// Connect via Unix socket or TCP
#[cfg(unix)]
if let Some(ref sock_path) = config.censorship.mask_unix_sock {
@@ -111,7 +120,37 @@ where
let connect_result = timeout(MASK_TIMEOUT, TcpStream::connect(&mask_addr)).await;
match connect_result {
Ok(Ok(stream)) => {
let (mask_read, mask_write) = stream.into_split();
let proxy_header: Option<Vec<u8>> = match config.censorship.mask_proxy_protocol {
0 => None,
version => {
let header = if let Ok(local_addr) = stream.local_addr() {
match version {
2 => ProxyProtocolV2Builder::new().with_addrs(peer, local_addr).build(),
_ => match (peer, local_addr) {
(SocketAddr::V4(src), SocketAddr::V4(dst)) =>
ProxyProtocolV1Builder::new().tcp4(src.into(), dst.into()).build(),
(SocketAddr::V6(src), SocketAddr::V6(dst)) =>
ProxyProtocolV1Builder::new().tcp6(src.into(), dst.into()).build(),
_ =>
ProxyProtocolV1Builder::new().build(),
},
}
} else {
match version {
2 => ProxyProtocolV2Builder::new().build(),
_ => ProxyProtocolV1Builder::new().build(),
}
};
Some(header)
}
};
let (mask_read, mut mask_write) = stream.into_split();
if let Some(header) = proxy_header {
if mask_write.write_all(&header).await.is_err() {
return;
}
}
if timeout(MASK_RELAY_TIMEOUT, relay_to_mask(reader, writer, mask_read, mask_write, initial_data)).await.is_err() {
debug!("Mask relay timed out");
}

117
src/stats/beobachten.rs Normal file
View File

@@ -0,0 +1,117 @@
//! Per-IP forensic buckets for scanner and handshake failure observation.
use std::collections::{BTreeMap, HashMap};
use std::net::IpAddr;
use std::time::{Duration, Instant};
use parking_lot::Mutex;
const CLEANUP_INTERVAL: Duration = Duration::from_secs(30);
#[derive(Default)]
struct BeobachtenInner {
entries: HashMap<(String, IpAddr), BeobachtenEntry>,
last_cleanup: Option<Instant>,
}
#[derive(Clone, Copy)]
struct BeobachtenEntry {
tries: u64,
last_seen: Instant,
}
/// In-memory, TTL-scoped per-IP counters keyed by source class.
pub struct BeobachtenStore {
inner: Mutex<BeobachtenInner>,
}
impl Default for BeobachtenStore {
fn default() -> Self {
Self::new()
}
}
impl BeobachtenStore {
pub fn new() -> Self {
Self {
inner: Mutex::new(BeobachtenInner::default()),
}
}
pub fn record(&self, class: &str, ip: IpAddr, ttl: Duration) {
if class.is_empty() || ttl.is_zero() {
return;
}
let now = Instant::now();
let mut guard = self.inner.lock();
Self::cleanup_if_needed(&mut guard, now, ttl);
let key = (class.to_string(), ip);
let entry = guard.entries.entry(key).or_insert(BeobachtenEntry {
tries: 0,
last_seen: now,
});
entry.tries = entry.tries.saturating_add(1);
entry.last_seen = now;
}
pub fn snapshot_text(&self, ttl: Duration) -> String {
if ttl.is_zero() {
return "beobachten disabled\n".to_string();
}
let now = Instant::now();
let mut guard = self.inner.lock();
Self::cleanup(&mut guard, now, ttl);
guard.last_cleanup = Some(now);
let mut grouped = BTreeMap::<String, Vec<(IpAddr, u64)>>::new();
for ((class, ip), entry) in &guard.entries {
grouped
.entry(class.clone())
.or_default()
.push((*ip, entry.tries));
}
if grouped.is_empty() {
return "empty\n".to_string();
}
let mut out = String::with_capacity(grouped.len() * 64);
for (class, entries) in &mut grouped {
out.push('[');
out.push_str(class);
out.push_str("]\n");
entries.sort_by(|(ip_a, tries_a), (ip_b, tries_b)| {
tries_b
.cmp(tries_a)
.then_with(|| ip_a.to_string().cmp(&ip_b.to_string()))
});
for (ip, tries) in entries {
out.push_str(&format!("{ip}-{tries}\n"));
}
}
out
}
fn cleanup_if_needed(inner: &mut BeobachtenInner, now: Instant, ttl: Duration) {
let should_cleanup = match inner.last_cleanup {
Some(last) => now.saturating_duration_since(last) >= CLEANUP_INTERVAL,
None => true,
};
if should_cleanup {
Self::cleanup(inner, now, ttl);
inner.last_cleanup = Some(now);
}
}
fn cleanup(inner: &mut BeobachtenInner, now: Instant, ttl: Duration) {
inner.entries.retain(|_, entry| {
now.saturating_duration_since(entry.last_seen) <= ttl
});
}
}

View File

@@ -2,6 +2,8 @@
#![allow(dead_code)]
pub mod beobachten;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Instant, Duration};
use dashmap::DashMap;
@@ -43,6 +45,13 @@ pub struct Stats {
pool_drain_active: AtomicU64,
pool_force_close_total: AtomicU64,
pool_stale_pick_total: AtomicU64,
me_writer_removed_total: AtomicU64,
me_writer_removed_unexpected_total: AtomicU64,
me_refill_triggered_total: AtomicU64,
me_refill_skipped_inflight_total: AtomicU64,
me_refill_failed_total: AtomicU64,
me_writer_restored_same_endpoint_total: AtomicU64,
me_writer_restored_fallback_total: AtomicU64,
user_stats: DashMap<String, UserStats>,
start_time: parking_lot::RwLock<Option<Instant>>,
}
@@ -142,6 +151,27 @@ impl Stats {
pub fn increment_pool_stale_pick_total(&self) {
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_writer_removed_total(&self) {
self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_writer_removed_unexpected_total(&self) {
self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_refill_triggered_total(&self) {
self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_refill_skipped_inflight_total(&self) {
self.me_refill_skipped_inflight_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_refill_failed_total(&self) {
self.me_refill_failed_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_writer_restored_same_endpoint_total(&self) {
self.me_writer_restored_same_endpoint_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_writer_restored_fallback_total(&self) {
self.me_writer_restored_fallback_total.fetch_add(1, Ordering::Relaxed);
}
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
@@ -195,6 +225,27 @@ impl Stats {
pub fn get_pool_stale_pick_total(&self) -> u64 {
self.pool_stale_pick_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_removed_total(&self) -> u64 {
self.me_writer_removed_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_removed_unexpected_total(&self) -> u64 {
self.me_writer_removed_unexpected_total.load(Ordering::Relaxed)
}
pub fn get_me_refill_triggered_total(&self) -> u64 {
self.me_refill_triggered_total.load(Ordering::Relaxed)
}
pub fn get_me_refill_skipped_inflight_total(&self) -> u64 {
self.me_refill_skipped_inflight_total.load(Ordering::Relaxed)
}
pub fn get_me_refill_failed_total(&self) -> u64 {
self.me_refill_failed_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_restored_same_endpoint_total(&self) -> u64 {
self.me_writer_restored_same_endpoint_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_restored_fallback_total(&self) -> u64 {
self.me_writer_restored_fallback_total.load(Ordering::Relaxed)
}
pub fn increment_user_connects(&self, user: &str) {
self.user_stats.entry(user.to_string()).or_default()

View File

@@ -19,6 +19,7 @@ use x509_parser::certificate::X509Certificate;
use crate::crypto::SecureRandom;
use crate::protocol::constants::{TLS_RECORD_APPLICATION, TLS_RECORD_HANDSHAKE};
use crate::transport::proxy_protocol::{ProxyProtocolV1Builder, ProxyProtocolV2Builder};
use crate::tls_front::types::{
ParsedCertificateInfo,
ParsedServerHello,
@@ -366,6 +367,7 @@ async fn fetch_via_raw_tls(
port: u16,
sni: &str,
connect_timeout: Duration,
proxy_protocol: u8,
) -> Result<TlsFetchResult> {
let addr = format!("{host}:{port}");
let mut stream = timeout(connect_timeout, TcpStream::connect(addr)).await??;
@@ -373,6 +375,13 @@ async fn fetch_via_raw_tls(
let rng = SecureRandom::new();
let client_hello = build_client_hello(sni, &rng);
timeout(connect_timeout, async {
if proxy_protocol > 0 {
let header = match proxy_protocol {
2 => ProxyProtocolV2Builder::new().build(),
_ => ProxyProtocolV1Builder::new().build(),
};
stream.write_all(&header).await?;
}
stream.write_all(&client_hello).await?;
stream.flush().await?;
Ok::<(), std::io::Error>(())
@@ -424,9 +433,10 @@ async fn fetch_via_rustls(
sni: &str,
connect_timeout: Duration,
upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>,
proxy_protocol: u8,
) -> Result<TlsFetchResult> {
// rustls handshake path for certificate and basic negotiated metadata.
let stream = if let Some(manager) = upstream {
let mut stream = if let Some(manager) = upstream {
// Resolve host to SocketAddr
if let Ok(mut addrs) = tokio::net::lookup_host((host, port)).await {
if let Some(addr) = addrs.find(|a| a.is_ipv4()) {
@@ -447,6 +457,15 @@ async fn fetch_via_rustls(
timeout(connect_timeout, TcpStream::connect((host, port))).await??
};
if proxy_protocol > 0 {
let header = match proxy_protocol {
2 => ProxyProtocolV2Builder::new().build(),
_ => ProxyProtocolV1Builder::new().build(),
};
stream.write_all(&header).await?;
stream.flush().await?;
}
let config = build_client_config();
let connector = TlsConnector::from(config);
@@ -527,8 +546,9 @@ pub async fn fetch_real_tls(
sni: &str,
connect_timeout: Duration,
upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>,
proxy_protocol: u8,
) -> Result<TlsFetchResult> {
let raw_result = match fetch_via_raw_tls(host, port, sni, connect_timeout).await {
let raw_result = match fetch_via_raw_tls(host, port, sni, connect_timeout, proxy_protocol).await {
Ok(res) => Some(res),
Err(e) => {
warn!(sni = %sni, error = %e, "Raw TLS fetch failed");
@@ -536,7 +556,7 @@ pub async fn fetch_real_tls(
}
};
match fetch_via_rustls(host, port, sni, connect_timeout, upstream).await {
match fetch_via_rustls(host, port, sni, connect_timeout, upstream, proxy_protocol).await {
Ok(rustls_result) => {
if let Some(mut raw) = raw_result {
raw.cert_info = rustls_result.cert_info;

View File

@@ -22,6 +22,7 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
let mut inflight: HashMap<(i32, IpFamily), usize> = HashMap::new();
loop {
tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
pool.prune_closed_writers().await;
check_family(
IpFamily::V4,
&pool,

View File

@@ -1,17 +1,22 @@
//! Middle Proxy RPC transport.
mod codec;
mod config_updater;
mod handshake;
mod health;
mod pool;
mod pool_config;
mod pool_init;
mod pool_nat;
mod pool_refill;
mod pool_reinit;
mod pool_writer;
mod ping;
mod reader;
mod registry;
mod rotation;
mod send;
mod secret;
mod rotation;
mod config_updater;
mod wire;
use bytes::Bytes;

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,81 @@
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
use tracing::warn;
use super::pool::MePool;
impl MePool {
pub async fn update_proxy_maps(
&self,
new_v4: HashMap<i32, Vec<(IpAddr, u16)>>,
new_v6: Option<HashMap<i32, Vec<(IpAddr, u16)>>>,
) -> bool {
let mut changed = false;
{
let mut guard = self.proxy_map_v4.write().await;
if !new_v4.is_empty() && *guard != new_v4 {
*guard = new_v4;
changed = true;
}
}
if let Some(v6) = new_v6 {
let mut guard = self.proxy_map_v6.write().await;
if !v6.is_empty() && *guard != v6 {
*guard = v6;
changed = true;
}
}
// Ensure negative DC entries mirror positives when absent (Telegram convention).
{
let mut guard = self.proxy_map_v4.write().await;
let keys: Vec<i32> = guard.keys().cloned().collect();
for k in keys.iter().cloned().filter(|k| *k > 0) {
if !guard.contains_key(&-k)
&& let Some(addrs) = guard.get(&k).cloned()
{
guard.insert(-k, addrs);
}
}
}
{
let mut guard = self.proxy_map_v6.write().await;
let keys: Vec<i32> = guard.keys().cloned().collect();
for k in keys.iter().cloned().filter(|k| *k > 0) {
if !guard.contains_key(&-k)
&& let Some(addrs) = guard.get(&k).cloned()
{
guard.insert(-k, addrs);
}
}
}
changed
}
pub async fn update_secret(self: &Arc<Self>, new_secret: Vec<u8>) -> bool {
if new_secret.len() < 32 {
warn!(len = new_secret.len(), "proxy-secret update ignored (too short)");
return false;
}
let mut guard = self.proxy_secret.write().await;
if *guard != new_secret {
*guard = new_secret;
drop(guard);
self.reconnect_all().await;
return true;
}
false
}
pub async fn reconnect_all(self: &Arc<Self>) {
let ws = self.writers.read().await.clone();
for w in ws {
if let Ok(()) = self.connect_one(w.addr, self.rng.as_ref()).await {
self.mark_writer_draining(w.id).await;
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
}

View File

@@ -0,0 +1,201 @@
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use rand::Rng;
use rand::seq::SliceRandom;
use tracing::{debug, info, warn};
use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result};
use super::pool::MePool;
impl MePool {
pub async fn init(self: &Arc<Self>, pool_size: usize, rng: &Arc<SecureRandom>) -> Result<()> {
let family_order = self.family_order();
let ks = self.key_selector().await;
info!(
me_servers = self.proxy_map_v4.read().await.len(),
pool_size,
key_selector = format_args!("0x{ks:08x}"),
secret_len = self.proxy_secret.read().await.len(),
"Initializing ME pool"
);
for family in family_order {
let map = self.proxy_map_for_family(family).await;
let mut grouped_dc_addrs: HashMap<i32, Vec<(IpAddr, u16)>> = HashMap::new();
for (dc, addrs) in map {
if addrs.is_empty() {
continue;
}
grouped_dc_addrs.entry(dc.abs()).or_default().extend(addrs);
}
let mut dc_addrs: Vec<(i32, Vec<(IpAddr, u16)>)> = grouped_dc_addrs
.into_iter()
.map(|(dc, mut addrs)| {
addrs.sort_unstable();
addrs.dedup();
(dc, addrs)
})
.collect();
dc_addrs.sort_unstable_by_key(|(dc, _)| *dc);
// Ensure at least one live writer per DC group; run missing DCs in parallel.
let mut join = tokio::task::JoinSet::new();
for (dc, addrs) in dc_addrs.iter().cloned() {
if addrs.is_empty() {
continue;
}
let endpoints: HashSet<SocketAddr> = addrs
.iter()
.map(|(ip, port)| SocketAddr::new(*ip, *port))
.collect();
if self.active_writer_count_for_endpoints(&endpoints).await > 0 {
continue;
}
let pool = Arc::clone(self);
let rng_clone = Arc::clone(rng);
join.spawn(async move { pool.connect_primary_for_dc(dc, addrs, rng_clone).await });
}
while join.join_next().await.is_some() {}
let mut missing_dcs = Vec::new();
for (dc, addrs) in &dc_addrs {
let endpoints: HashSet<SocketAddr> = addrs
.iter()
.map(|(ip, port)| SocketAddr::new(*ip, *port))
.collect();
if self.active_writer_count_for_endpoints(&endpoints).await == 0 {
missing_dcs.push(*dc);
}
}
if !missing_dcs.is_empty() {
return Err(ProxyError::Proxy(format!(
"ME init incomplete: no live writers for DC groups {missing_dcs:?}"
)));
}
// Warm reserve writers asynchronously so startup does not block after first working pool is ready.
let pool = Arc::clone(self);
let rng_clone = Arc::clone(rng);
let dc_addrs_bg = dc_addrs.clone();
tokio::spawn(async move {
if pool.me_warmup_stagger_enabled {
for (dc, addrs) in &dc_addrs_bg {
for (ip, port) in addrs {
if pool.connection_count() >= pool_size {
break;
}
let addr = SocketAddr::new(*ip, *port);
let jitter = rand::rng()
.random_range(0..=pool.me_warmup_step_jitter.as_millis() as u64);
let delay_ms = pool.me_warmup_step_delay.as_millis() as u64 + jitter;
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
if let Err(e) = pool.connect_one(addr, rng_clone.as_ref()).await {
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed (staggered)");
}
}
}
} else {
for (dc, addrs) in &dc_addrs_bg {
for (ip, port) in addrs {
if pool.connection_count() >= pool_size {
break;
}
let addr = SocketAddr::new(*ip, *port);
if let Err(e) = pool.connect_one(addr, rng_clone.as_ref()).await {
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed");
}
}
if pool.connection_count() >= pool_size {
break;
}
}
}
debug!(
target_pool_size = pool_size,
current_pool_size = pool.connection_count(),
"Background ME reserve warmup finished"
);
});
if !self.decision.effective_multipath && self.connection_count() > 0 {
break;
}
}
if self.writers.read().await.is_empty() {
return Err(ProxyError::Proxy("No ME connections".into()));
}
info!(
active_writers = self.connection_count(),
"ME primary pool ready; reserve warmup continues in background"
);
Ok(())
}
async fn connect_primary_for_dc(
self: Arc<Self>,
dc: i32,
mut addrs: Vec<(IpAddr, u16)>,
rng: Arc<SecureRandom>,
) -> bool {
if addrs.is_empty() {
return false;
}
addrs.shuffle(&mut rand::rng());
if addrs.len() > 1 {
let concurrency = 2usize;
let mut join = tokio::task::JoinSet::new();
let mut next_idx = 0usize;
while next_idx < addrs.len() || !join.is_empty() {
while next_idx < addrs.len() && join.len() < concurrency {
let (ip, port) = addrs[next_idx];
next_idx += 1;
let addr = SocketAddr::new(ip, port);
let pool = Arc::clone(&self);
let rng_clone = Arc::clone(&rng);
join.spawn(async move {
(addr, pool.connect_one(addr, rng_clone.as_ref()).await)
});
}
let Some(res) = join.join_next().await else {
break;
};
match res {
Ok((addr, Ok(()))) => {
info!(%addr, dc = %dc, "ME connected");
join.abort_all();
while join.join_next().await.is_some() {}
return true;
}
Ok((addr, Err(e))) => {
warn!(%addr, dc = %dc, error = %e, "ME connect failed, trying next");
}
Err(e) => {
warn!(dc = %dc, error = %e, "ME connect task failed");
}
}
}
warn!(dc = %dc, "All ME servers for DC failed at init");
return false;
}
for (ip, port) in addrs {
let addr = SocketAddr::new(ip, port);
match self.connect_one(addr, rng.as_ref()).await {
Ok(()) => {
info!(%addr, dc = %dc, "ME connected");
return true;
}
Err(e) => warn!(%addr, dc = %dc, error = %e, "ME connect failed, trying next"),
}
}
warn!(dc = %dc, "All ME servers for DC failed at init");
false
}
}

View File

@@ -1,7 +1,10 @@
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr};
use std::time::Duration;
use tracing::{info, warn};
use tokio::task::JoinSet;
use tokio::time::timeout;
use tracing::{debug, info, warn};
use crate::error::{ProxyError, Result};
use crate::network::probe::is_bogon;
@@ -10,9 +13,19 @@ use crate::network::stun::{stun_probe_dual, IpFamily, StunProbeResult};
use super::MePool;
use std::time::Instant;
const STUN_BATCH_TIMEOUT: Duration = Duration::from_secs(5);
#[allow(dead_code)]
pub async fn stun_probe(stun_addr: Option<String>) -> Result<crate::network::stun::DualStunResult> {
let stun_addr = stun_addr.unwrap_or_else(|| "stun.l.google.com:19302".to_string());
let stun_addr = stun_addr.unwrap_or_else(|| {
crate::config::defaults::default_stun_servers()
.into_iter()
.next()
.unwrap_or_default()
});
if stun_addr.is_empty() {
return Err(ProxyError::Proxy("STUN server is not configured".to_string()));
}
stun_probe_dual(&stun_addr).await
}
@@ -22,6 +35,101 @@ pub async fn detect_public_ip() -> Option<IpAddr> {
}
impl MePool {
fn configured_stun_servers(&self) -> Vec<String> {
if !self.nat_stun_servers.is_empty() {
return self.nat_stun_servers.clone();
}
if let Some(s) = &self.nat_stun
&& !s.trim().is_empty()
{
return vec![s.clone()];
}
Vec::new()
}
async fn probe_stun_batch_for_family(
&self,
servers: &[String],
family: IpFamily,
attempt: u8,
) -> (Vec<String>, Option<std::net::SocketAddr>) {
let mut join_set = JoinSet::new();
let mut next_idx = 0usize;
let mut live_servers = Vec::new();
let mut best_by_ip: HashMap<IpAddr, (usize, std::net::SocketAddr)> = HashMap::new();
let concurrency = self.nat_probe_concurrency.max(1);
while next_idx < servers.len() || !join_set.is_empty() {
while next_idx < servers.len() && join_set.len() < concurrency {
let stun_addr = servers[next_idx].clone();
next_idx += 1;
join_set.spawn(async move {
let res = timeout(STUN_BATCH_TIMEOUT, stun_probe_dual(&stun_addr)).await;
(stun_addr, res)
});
}
let Some(task) = join_set.join_next().await else {
break;
};
match task {
Ok((stun_addr, Ok(Ok(res)))) => {
let picked: Option<StunProbeResult> = match family {
IpFamily::V4 => res.v4,
IpFamily::V6 => res.v6,
};
if let Some(result) = picked {
live_servers.push(stun_addr.clone());
let entry = best_by_ip
.entry(result.reflected_addr.ip())
.or_insert((0, result.reflected_addr));
entry.0 += 1;
debug!(
local = %result.local_addr,
reflected = %result.reflected_addr,
family = ?family,
stun = %stun_addr,
"NAT probe: reflected address"
);
}
}
Ok((stun_addr, Ok(Err(e)))) => {
debug!(
error = %e,
stun = %stun_addr,
attempt = attempt + 1,
"NAT probe failed, trying next server"
);
}
Ok((stun_addr, Err(_))) => {
debug!(
stun = %stun_addr,
attempt = attempt + 1,
"NAT probe timeout, trying next server"
);
}
Err(e) => {
debug!(
error = %e,
attempt = attempt + 1,
"NAT probe task join failed"
);
}
}
}
live_servers.sort_unstable();
live_servers.dedup();
let best_reflected = best_by_ip
.into_values()
.max_by_key(|(count, _)| *count)
.map(|(_, addr)| addr);
(live_servers, best_reflected)
}
pub(super) fn translate_ip_for_nat(&self, ip: IpAddr) -> IpAddr {
let nat_ip = self
.nat_ip_cfg
@@ -128,39 +236,51 @@ impl MePool {
}
let attempt = self.nat_probe_attempts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let servers = if !self.nat_stun_servers.is_empty() {
self.nat_stun_servers.clone()
} else if let Some(s) = &self.nat_stun {
vec![s.clone()]
let configured_servers = self.configured_stun_servers();
let live_snapshot = self.nat_stun_live_servers.read().await.clone();
let primary_servers = if live_snapshot.is_empty() {
configured_servers.clone()
} else {
vec!["stun.l.google.com:19302".to_string()]
live_snapshot
};
for stun_addr in servers {
match stun_probe_dual(&stun_addr).await {
Ok(res) => {
let picked: Option<StunProbeResult> = match family {
IpFamily::V4 => res.v4,
IpFamily::V6 => res.v6,
};
if let Some(result) = picked {
info!(local = %result.local_addr, reflected = %result.reflected_addr, family = ?family, stun = %stun_addr, "NAT probe: reflected address");
self.nat_probe_attempts.store(0, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut cache) = self.nat_reflection_cache.try_lock() {
let slot = match family {
IpFamily::V4 => &mut cache.v4,
IpFamily::V6 => &mut cache.v6,
};
*slot = Some((Instant::now(), result.reflected_addr));
}
return Some(result.reflected_addr);
}
}
Err(e) => {
warn!(error = %e, stun = %stun_addr, attempt = attempt + 1, "NAT probe failed, trying next server");
}
}
let (mut live_servers, mut selected_reflected) = self
.probe_stun_batch_for_family(&primary_servers, family, attempt)
.await;
if selected_reflected.is_none() && !configured_servers.is_empty() && primary_servers != configured_servers {
let (rediscovered_live, rediscovered_reflected) = self
.probe_stun_batch_for_family(&configured_servers, family, attempt)
.await;
live_servers = rediscovered_live;
selected_reflected = rediscovered_reflected;
}
let live_server_count = live_servers.len();
if !live_servers.is_empty() {
*self.nat_stun_live_servers.write().await = live_servers;
} else {
self.nat_stun_live_servers.write().await.clear();
}
if let Some(reflected_addr) = selected_reflected {
self.nat_probe_attempts.store(0, std::sync::atomic::Ordering::Relaxed);
info!(
family = ?family,
live_servers = live_server_count,
"STUN-Quorum reached, IP: {}",
reflected_addr.ip()
);
if let Ok(mut cache) = self.nat_reflection_cache.try_lock() {
let slot = match family {
IpFamily::V4 => &mut cache.v4,
IpFamily::V6 => &mut cache.v6,
};
*slot = Some((Instant::now(), reflected_addr));
}
return Some(reflected_addr);
}
let backoff = Duration::from_secs(60 * 2u64.pow((attempt as u32).min(6)));
*self.stun_backoff_until.write().await = Some(Instant::now() + backoff);
None

View File

@@ -0,0 +1,159 @@
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use tracing::{debug, info, warn};
use crate::crypto::SecureRandom;
use super::pool::MePool;
impl MePool {
pub(super) async fn connect_endpoints_round_robin(
self: &Arc<Self>,
endpoints: &[SocketAddr],
rng: &SecureRandom,
) -> bool {
if endpoints.is_empty() {
return false;
}
let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % endpoints.len();
for offset in 0..endpoints.len() {
let idx = (start + offset) % endpoints.len();
let addr = endpoints[idx];
match self.connect_one(addr, rng).await {
Ok(()) => return true,
Err(e) => debug!(%addr, error = %e, "ME connect failed during round-robin warmup"),
}
}
false
}
async fn endpoints_for_same_dc(&self, addr: SocketAddr) -> Vec<SocketAddr> {
let mut target_dc = HashSet::<i32>::new();
let mut endpoints = HashSet::<SocketAddr>::new();
if self.decision.ipv4_me {
let map = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in &map {
if addrs
.iter()
.any(|(ip, port)| SocketAddr::new(*ip, *port) == addr)
{
target_dc.insert(dc.abs());
}
}
for dc in &target_dc {
for key in [*dc, -*dc] {
if let Some(addrs) = map.get(&key) {
for (ip, port) in addrs {
endpoints.insert(SocketAddr::new(*ip, *port));
}
}
}
}
}
if self.decision.ipv6_me {
let map = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in &map {
if addrs
.iter()
.any(|(ip, port)| SocketAddr::new(*ip, *port) == addr)
{
target_dc.insert(dc.abs());
}
}
for dc in &target_dc {
for key in [*dc, -*dc] {
if let Some(addrs) = map.get(&key) {
for (ip, port) in addrs {
endpoints.insert(SocketAddr::new(*ip, *port));
}
}
}
}
}
let mut sorted: Vec<SocketAddr> = endpoints.into_iter().collect();
sorted.sort_unstable();
sorted
}
async fn refill_writer_after_loss(self: &Arc<Self>, addr: SocketAddr) -> bool {
let fast_retries = self.me_reconnect_fast_retry_count.max(1);
for attempt in 0..fast_retries {
self.stats.increment_me_reconnect_attempt();
match self.connect_one(addr, self.rng.as_ref()).await {
Ok(()) => {
self.stats.increment_me_reconnect_success();
self.stats.increment_me_writer_restored_same_endpoint_total();
info!(
%addr,
attempt = attempt + 1,
"ME writer restored on the same endpoint"
);
return true;
}
Err(e) => {
debug!(
%addr,
attempt = attempt + 1,
error = %e,
"ME immediate same-endpoint reconnect failed"
);
}
}
}
let dc_endpoints = self.endpoints_for_same_dc(addr).await;
if dc_endpoints.is_empty() {
self.stats.increment_me_refill_failed_total();
return false;
}
for attempt in 0..fast_retries {
self.stats.increment_me_reconnect_attempt();
if self
.connect_endpoints_round_robin(&dc_endpoints, self.rng.as_ref())
.await
{
self.stats.increment_me_reconnect_success();
self.stats.increment_me_writer_restored_fallback_total();
info!(
%addr,
attempt = attempt + 1,
"ME writer restored via DC fallback endpoint"
);
return true;
}
}
self.stats.increment_me_refill_failed_total();
false
}
pub(crate) fn trigger_immediate_refill(self: &Arc<Self>, addr: SocketAddr) {
let pool = Arc::clone(self);
tokio::spawn(async move {
{
let mut guard = pool.refill_inflight.lock().await;
if !guard.insert(addr) {
pool.stats.increment_me_refill_skipped_inflight_total();
return;
}
}
pool.stats.increment_me_refill_triggered_total();
let restored = pool.refill_writer_after_loss(addr).await;
if !restored {
warn!(%addr, "ME immediate refill failed");
}
let mut guard = pool.refill_inflight.lock().await;
guard.remove(&addr);
});
}
}

View File

@@ -0,0 +1,383 @@
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;
use rand::Rng;
use rand::seq::SliceRandom;
use tracing::{debug, info, warn};
use crate::crypto::SecureRandom;
use super::pool::MePool;
impl MePool {
fn coverage_ratio(
desired_by_dc: &HashMap<i32, HashSet<SocketAddr>>,
active_writer_addrs: &HashSet<SocketAddr>,
) -> (f32, Vec<i32>) {
if desired_by_dc.is_empty() {
return (1.0, Vec::new());
}
let mut missing_dc = Vec::<i32>::new();
let mut covered = 0usize;
for (dc, endpoints) in desired_by_dc {
if endpoints.is_empty() {
continue;
}
if endpoints
.iter()
.any(|addr| active_writer_addrs.contains(addr))
{
covered += 1;
} else {
missing_dc.push(*dc);
}
}
missing_dc.sort_unstable();
let total = desired_by_dc.len().max(1);
let ratio = (covered as f32) / (total as f32);
(ratio, missing_dc)
}
pub async fn reconcile_connections(self: &Arc<Self>, rng: &SecureRandom) {
let writers = self.writers.read().await;
let current: HashSet<SocketAddr> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.map(|w| w.addr)
.collect();
drop(writers);
for family in self.family_order() {
let map = self.proxy_map_for_family(family).await;
for (_dc, addrs) in &map {
let dc_addrs: Vec<SocketAddr> = addrs
.iter()
.map(|(ip, port)| SocketAddr::new(*ip, *port))
.collect();
if !dc_addrs.iter().any(|a| current.contains(a)) {
let mut shuffled = dc_addrs.clone();
shuffled.shuffle(&mut rand::rng());
for addr in shuffled {
if self.connect_one(addr, rng).await.is_ok() {
break;
}
}
}
}
if !self.decision.effective_multipath && !current.is_empty() {
break;
}
}
}
async fn desired_dc_endpoints(&self) -> HashMap<i32, HashSet<SocketAddr>> {
let mut out: HashMap<i32, HashSet<SocketAddr>> = HashMap::new();
if self.decision.ipv4_me {
let map_v4 = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map_v4 {
let entry = out.entry(dc.abs()).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
}
if self.decision.ipv6_me {
let map_v6 = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map_v6 {
let entry = out.entry(dc.abs()).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
}
out
}
pub(super) fn required_writers_for_dc(endpoint_count: usize) -> usize {
endpoint_count.max(3)
}
fn hardswap_warmup_connect_delay_ms(&self) -> u64 {
let min_ms = self.me_hardswap_warmup_delay_min_ms.load(Ordering::Relaxed);
let max_ms = self.me_hardswap_warmup_delay_max_ms.load(Ordering::Relaxed);
let (min_ms, max_ms) = if min_ms <= max_ms {
(min_ms, max_ms)
} else {
(max_ms, min_ms)
};
if min_ms == max_ms {
return min_ms;
}
rand::rng().random_range(min_ms..=max_ms)
}
fn hardswap_warmup_backoff_ms(&self, pass_idx: usize) -> u64 {
let base_ms = self
.me_hardswap_warmup_pass_backoff_base_ms
.load(Ordering::Relaxed);
let cap_ms = (self.me_reconnect_backoff_cap.as_millis() as u64).max(base_ms);
let shift = (pass_idx as u32).min(20);
let scaled = base_ms.saturating_mul(1u64 << shift);
let core = scaled.min(cap_ms);
let jitter = (core / 2).max(1);
core.saturating_add(rand::rng().random_range(0..=jitter))
}
async fn fresh_writer_count_for_endpoints(
&self,
generation: u64,
endpoints: &HashSet<SocketAddr>,
) -> usize {
let ws = self.writers.read().await;
ws.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| w.generation == generation)
.filter(|w| endpoints.contains(&w.addr))
.count()
}
pub(super) async fn active_writer_count_for_endpoints(
&self,
endpoints: &HashSet<SocketAddr>,
) -> usize {
let ws = self.writers.read().await;
ws.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| endpoints.contains(&w.addr))
.count()
}
async fn warmup_generation_for_all_dcs(
self: &Arc<Self>,
rng: &SecureRandom,
generation: u64,
desired_by_dc: &HashMap<i32, HashSet<SocketAddr>>,
) {
let extra_passes = self
.me_hardswap_warmup_extra_passes
.load(Ordering::Relaxed)
.min(10) as usize;
let total_passes = 1 + extra_passes;
for (dc, endpoints) in desired_by_dc {
if endpoints.is_empty() {
continue;
}
let mut endpoint_list: Vec<SocketAddr> = endpoints.iter().copied().collect();
endpoint_list.sort_unstable();
let required = Self::required_writers_for_dc(endpoint_list.len());
let mut completed = false;
let mut last_fresh_count = self
.fresh_writer_count_for_endpoints(generation, endpoints)
.await;
for pass_idx in 0..total_passes {
if last_fresh_count >= required {
completed = true;
break;
}
let missing = required.saturating_sub(last_fresh_count);
debug!(
dc = *dc,
pass = pass_idx + 1,
total_passes,
fresh_count = last_fresh_count,
required,
missing,
endpoint_count = endpoint_list.len(),
"ME hardswap warmup pass started"
);
for attempt_idx in 0..missing {
let delay_ms = self.hardswap_warmup_connect_delay_ms();
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
let connected = self.connect_endpoints_round_robin(&endpoint_list, rng).await;
debug!(
dc = *dc,
pass = pass_idx + 1,
total_passes,
attempt = attempt_idx + 1,
delay_ms,
connected,
"ME hardswap warmup connect attempt finished"
);
}
last_fresh_count = self
.fresh_writer_count_for_endpoints(generation, endpoints)
.await;
if last_fresh_count >= required {
completed = true;
info!(
dc = *dc,
pass = pass_idx + 1,
total_passes,
fresh_count = last_fresh_count,
required,
"ME hardswap warmup floor reached for DC"
);
break;
}
if pass_idx + 1 < total_passes {
let backoff_ms = self.hardswap_warmup_backoff_ms(pass_idx);
debug!(
dc = *dc,
pass = pass_idx + 1,
total_passes,
fresh_count = last_fresh_count,
required,
backoff_ms,
"ME hardswap warmup pass incomplete, delaying next pass"
);
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
}
}
if !completed {
warn!(
dc = *dc,
fresh_count = last_fresh_count,
required,
endpoint_count = endpoint_list.len(),
total_passes,
"ME warmup stopped: unable to reach required writer floor for DC"
);
}
}
}
pub async fn zero_downtime_reinit_after_map_change(self: &Arc<Self>, rng: &SecureRandom) {
let desired_by_dc = self.desired_dc_endpoints().await;
if desired_by_dc.is_empty() {
warn!("ME endpoint map is empty; skipping stale writer drain");
return;
}
let previous_generation = self.current_generation();
let generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1;
let hardswap = self.hardswap.load(Ordering::Relaxed);
if hardswap {
self.warmup_generation_for_all_dcs(rng, generation, &desired_by_dc)
.await;
} else {
self.reconcile_connections(rng).await;
}
let writers = self.writers.read().await;
let active_writer_addrs: HashSet<SocketAddr> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.map(|w| w.addr)
.collect();
let min_ratio = Self::permille_to_ratio(
self.me_pool_min_fresh_ratio_permille
.load(Ordering::Relaxed),
);
let (coverage_ratio, missing_dc) = Self::coverage_ratio(&desired_by_dc, &active_writer_addrs);
if !hardswap && coverage_ratio < min_ratio {
warn!(
previous_generation,
generation,
coverage_ratio = format_args!("{coverage_ratio:.3}"),
min_ratio = format_args!("{min_ratio:.3}"),
missing_dc = ?missing_dc,
"ME reinit coverage below threshold; keeping stale writers"
);
return;
}
if hardswap {
let mut fresh_missing_dc = Vec::<(i32, usize, usize)>::new();
for (dc, endpoints) in &desired_by_dc {
if endpoints.is_empty() {
continue;
}
let required = Self::required_writers_for_dc(endpoints.len());
let fresh_count = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| w.generation == generation)
.filter(|w| endpoints.contains(&w.addr))
.count();
if fresh_count < required {
fresh_missing_dc.push((*dc, fresh_count, required));
}
}
if !fresh_missing_dc.is_empty() {
warn!(
previous_generation,
generation,
missing_dc = ?fresh_missing_dc,
"ME hardswap pending: fresh generation coverage incomplete"
);
return;
}
} else if !missing_dc.is_empty() {
warn!(
missing_dc = ?missing_dc,
// Keep stale writers alive when fresh coverage is incomplete.
"ME reinit coverage incomplete; keeping stale writers"
);
return;
}
let desired_addrs: HashSet<SocketAddr> = desired_by_dc
.values()
.flat_map(|set| set.iter().copied())
.collect();
let stale_writer_ids: Vec<u64> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| {
if hardswap {
w.generation < generation
} else {
!desired_addrs.contains(&w.addr)
}
})
.map(|w| w.id)
.collect();
drop(writers);
if stale_writer_ids.is_empty() {
debug!("ME reinit cycle completed with no stale writers");
return;
}
let drain_timeout = self.force_close_timeout();
let drain_timeout_secs = drain_timeout.map(|d| d.as_secs()).unwrap_or(0);
info!(
stale_writers = stale_writer_ids.len(),
previous_generation,
generation,
hardswap,
coverage_ratio = format_args!("{coverage_ratio:.3}"),
min_ratio = format_args!("{min_ratio:.3}"),
drain_timeout_secs,
"ME reinit cycle covered; draining stale writers"
);
self.stats.increment_pool_swap_total();
for writer_id in stale_writer_ids {
self.mark_writer_draining_with_timeout(writer_id, drain_timeout, !hardswap)
.await;
}
}
pub async fn zero_downtime_reinit_periodic(self: &Arc<Self>, rng: &SecureRandom) {
self.zero_downtime_reinit_after_map_change(rng).await;
}
}

View File

@@ -0,0 +1,366 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use bytes::BytesMut;
use rand::Rng;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result};
use crate::protocol::constants::RPC_PING_U32;
use super::codec::{RpcWriter, WriterCommand};
use super::pool::{MePool, MeWriter};
use super::reader::reader_loop;
use super::registry::BoundConn;
const ME_ACTIVE_PING_SECS: u64 = 25;
const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5;
impl MePool {
pub(crate) async fn prune_closed_writers(self: &Arc<Self>) {
let closed_writer_ids: Vec<u64> = {
let ws = self.writers.read().await;
ws.iter().filter(|w| w.tx.is_closed()).map(|w| w.id).collect()
};
if closed_writer_ids.is_empty() {
return;
}
for writer_id in closed_writer_ids {
if self.registry.is_writer_empty(writer_id).await {
let _ = self.remove_writer_only(writer_id).await;
} else {
let _ = self.remove_writer_and_close_clients(writer_id).await;
}
}
}
pub(crate) async fn connect_one(self: &Arc<Self>, addr: SocketAddr, rng: &SecureRandom) -> Result<()> {
let secret_len = self.proxy_secret.read().await.len();
if secret_len < 32 {
return Err(ProxyError::Proxy("proxy-secret too short for ME auth".into()));
}
let (stream, _connect_ms) = self.connect_tcp(addr).await?;
let hs = self.handshake_only(stream, addr, rng).await?;
let writer_id = self.next_writer_id.fetch_add(1, Ordering::Relaxed);
let generation = self.current_generation();
let cancel = CancellationToken::new();
let degraded = Arc::new(AtomicBool::new(false));
let draining = Arc::new(AtomicBool::new(false));
let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0));
let allow_drain_fallback = Arc::new(AtomicBool::new(false));
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
let mut rpc_writer = RpcWriter {
writer: hs.wr,
key: hs.write_key,
iv: hs.write_iv,
seq_no: 0,
crc_mode: hs.crc_mode,
};
let cancel_wr = cancel.clone();
tokio::spawn(async move {
loop {
tokio::select! {
cmd = rx.recv() => {
match cmd {
Some(WriterCommand::Data(payload)) => {
if rpc_writer.send(&payload).await.is_err() { break; }
}
Some(WriterCommand::DataAndFlush(payload)) => {
if rpc_writer.send_and_flush(&payload).await.is_err() { break; }
}
Some(WriterCommand::Close) | None => break,
}
}
_ = cancel_wr.cancelled() => break,
}
}
});
let writer = MeWriter {
id: writer_id,
addr,
generation,
tx: tx.clone(),
cancel: cancel.clone(),
degraded: degraded.clone(),
draining: draining.clone(),
draining_started_at_epoch_secs: draining_started_at_epoch_secs.clone(),
allow_drain_fallback: allow_drain_fallback.clone(),
};
self.writers.write().await.push(writer.clone());
self.conn_count.fetch_add(1, Ordering::Relaxed);
self.writer_available.notify_one();
let reg = self.registry.clone();
let writers_arc = self.writers_arc();
let ping_tracker = self.ping_tracker.clone();
let ping_tracker_reader = ping_tracker.clone();
let rtt_stats = self.rtt_stats.clone();
let stats_reader = self.stats.clone();
let stats_ping = self.stats.clone();
let pool = Arc::downgrade(self);
let cancel_ping = cancel.clone();
let tx_ping = tx.clone();
let ping_tracker_ping = ping_tracker.clone();
let cleanup_done = Arc::new(AtomicBool::new(false));
let cleanup_for_reader = cleanup_done.clone();
let cleanup_for_ping = cleanup_done.clone();
let keepalive_enabled = self.me_keepalive_enabled;
let keepalive_interval = self.me_keepalive_interval;
let keepalive_jitter = self.me_keepalive_jitter;
let cancel_reader_token = cancel.clone();
let cancel_ping_token = cancel_ping.clone();
tokio::spawn(async move {
let res = reader_loop(
hs.rd,
hs.read_key,
hs.read_iv,
hs.crc_mode,
reg.clone(),
BytesMut::new(),
BytesMut::new(),
tx.clone(),
ping_tracker_reader,
rtt_stats.clone(),
stats_reader,
writer_id,
degraded.clone(),
cancel_reader_token.clone(),
)
.await;
if let Some(pool) = pool.upgrade()
&& cleanup_for_reader
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
}
if let Err(e) = res {
warn!(error = %e, "ME reader ended");
}
let mut ws = writers_arc.write().await;
ws.retain(|w| w.id != writer_id);
info!(remaining = ws.len(), "Dead ME writer removed from pool");
});
let pool_ping = Arc::downgrade(self);
tokio::spawn(async move {
let mut ping_id: i64 = rand::random::<i64>();
let idle_interval_cap = Duration::from_secs(ME_IDLE_KEEPALIVE_MAX_SECS);
// Per-writer jittered start to avoid phase sync.
let startup_jitter = if keepalive_enabled {
let mut interval = keepalive_interval;
if let Some(pool) = pool_ping.upgrade() {
if pool.registry.is_writer_empty(writer_id).await {
interval = interval.min(idle_interval_cap);
}
} else {
return;
}
let jitter_cap_ms = interval.as_millis() / 2;
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))
} else {
let jitter = rand::rng().random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
Duration::from_secs(wait)
};
tokio::select! {
_ = cancel_ping_token.cancelled() => return,
_ = tokio::time::sleep(startup_jitter) => {}
}
loop {
let wait = if keepalive_enabled {
let mut interval = keepalive_interval;
if let Some(pool) = pool_ping.upgrade() {
if pool.registry.is_writer_empty(writer_id).await {
interval = interval.min(idle_interval_cap);
}
} else {
break;
}
let jitter_cap_ms = interval.as_millis() / 2;
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))
} else {
let jitter = rand::rng().random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
let secs = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
Duration::from_secs(secs)
};
tokio::select! {
_ = cancel_ping_token.cancelled() => {
break;
}
_ = tokio::time::sleep(wait) => {}
}
let sent_id = ping_id;
let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_PING_U32.to_le_bytes());
p.extend_from_slice(&sent_id.to_le_bytes());
{
let mut tracker = ping_tracker_ping.lock().await;
let before = tracker.len();
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
let expired = before.saturating_sub(tracker.len());
if expired > 0 {
stats_ping.increment_me_keepalive_timeout_by(expired as u64);
}
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
}
ping_id = ping_id.wrapping_add(1);
stats_ping.increment_me_keepalive_sent();
if tx_ping.send(WriterCommand::DataAndFlush(p)).await.is_err() {
stats_ping.increment_me_keepalive_failed();
debug!("ME ping failed, removing dead writer");
cancel_ping.cancel();
if let Some(pool) = pool_ping.upgrade()
&& cleanup_for_ping
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
}
break;
}
}
});
Ok(())
}
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
let conns = self.remove_writer_only(writer_id).await;
for bound in conns {
let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await;
let _ = self.registry.unregister(bound.conn_id).await;
}
}
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
let mut close_tx: Option<mpsc::Sender<WriterCommand>> = None;
let mut removed_addr: Option<SocketAddr> = None;
let mut trigger_refill = false;
{
let mut ws = self.writers.write().await;
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
let w = ws.remove(pos);
let was_draining = w.draining.load(Ordering::Relaxed);
if was_draining {
self.stats.decrement_pool_drain_active();
}
self.stats.increment_me_writer_removed_total();
w.cancel.cancel();
removed_addr = Some(w.addr);
trigger_refill = !was_draining;
if trigger_refill {
self.stats.increment_me_writer_removed_unexpected_total();
}
close_tx = Some(w.tx.clone());
self.conn_count.fetch_sub(1, Ordering::Relaxed);
}
}
if let Some(tx) = close_tx {
let _ = tx.send(WriterCommand::Close).await;
}
if trigger_refill
&& let Some(addr) = removed_addr
{
self.trigger_immediate_refill(addr);
}
self.rtt_stats.lock().await.remove(&writer_id);
self.registry.writer_lost(writer_id).await
}
pub(crate) async fn mark_writer_draining_with_timeout(
self: &Arc<Self>,
writer_id: u64,
timeout: Option<Duration>,
allow_drain_fallback: bool,
) {
let timeout = timeout.filter(|d| !d.is_zero());
let found = {
let mut ws = self.writers.write().await;
if let Some(w) = ws.iter_mut().find(|w| w.id == writer_id) {
let already_draining = w.draining.swap(true, Ordering::Relaxed);
w.allow_drain_fallback
.store(allow_drain_fallback, Ordering::Relaxed);
w.draining_started_at_epoch_secs
.store(Self::now_epoch_secs(), Ordering::Relaxed);
if !already_draining {
self.stats.increment_pool_drain_active();
}
w.draining.store(true, Ordering::Relaxed);
true
} else {
false
}
};
if !found {
return;
}
let timeout_secs = timeout.map(|d| d.as_secs()).unwrap_or(0);
debug!(
writer_id,
timeout_secs,
allow_drain_fallback,
"ME writer marked draining"
);
let pool = Arc::downgrade(self);
tokio::spawn(async move {
let deadline = timeout.map(|t| Instant::now() + t);
while let Some(p) = pool.upgrade() {
if let Some(deadline_at) = deadline
&& Instant::now() >= deadline_at
{
warn!(writer_id, "Drain timeout, force-closing");
p.stats.increment_pool_force_close_total();
let _ = p.remove_writer_and_close_clients(writer_id).await;
break;
}
if p.registry.is_writer_empty(writer_id).await {
let _ = p.remove_writer_only(writer_id).await;
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}
pub(crate) async fn mark_writer_draining(self: &Arc<Self>, writer_id: u64) {
self.mark_writer_draining_with_timeout(writer_id, Some(Duration::from_secs(300)), false)
.await;
}
pub(super) fn writer_accepts_new_binding(&self, writer: &MeWriter) -> bool {
if !writer.draining.load(Ordering::Relaxed) {
return true;
}
if !writer.allow_drain_fallback.load(Ordering::Relaxed) {
return false;
}
let ttl_secs = self.me_pool_drain_ttl_secs.load(Ordering::Relaxed);
if ttl_secs == 0 {
return true;
}
let started = writer.draining_started_at_epoch_secs.load(Ordering::Relaxed);
if started == 0 {
return false;
}
Self::now_epoch_secs().saturating_sub(started) <= ttl_secs
}
}

View File

@@ -233,14 +233,12 @@ async fn parse_v2<R: AsyncRead + Unpin>(
}
/// Builder for PROXY protocol v1 header
#[allow(dead_code)]
pub struct ProxyProtocolV1Builder {
family: &'static str,
src_addr: Option<SocketAddr>,
dst_addr: Option<SocketAddr>,
}
#[allow(dead_code)]
impl ProxyProtocolV1Builder {
pub fn new() -> Self {
Self {
@@ -288,13 +286,17 @@ impl Default for ProxyProtocolV1Builder {
}
/// Builder for PROXY protocol v2 header
#[allow(dead_code)]
pub struct ProxyProtocolV2Builder {
src: Option<SocketAddr>,
dst: Option<SocketAddr>,
}
#[allow(dead_code)]
impl Default for ProxyProtocolV2Builder {
fn default() -> Self {
Self::new()
}
}
impl ProxyProtocolV2Builder {
pub fn new() -> Self {
Self { src: None, dst: None }