Compare commits

..

24 Commits

Author SHA1 Message Date
Alexey
03ce267865 Update config.toml 2026-02-25 22:33:38 +03:00
Alexey
a6bfa3309e Create config.toml 2026-02-25 22:32:02 +03:00
Alexey
79a3720fd5 Rename config.toml to config.full.toml 2026-02-25 22:22:04 +03:00
Alexey
89543aed35 Merge pull request #247 from telemt/config-tuning
Update config.toml
2026-02-25 21:47:26 +03:00
Alexey
06292ff833 Update config.toml 2026-02-25 21:33:06 +03:00
Alexey
427294b103 Defaults in-place: merge pull request #245 from telemt/flow-tuning
Defaults in-place
2026-02-25 18:09:20 +03:00
Alexey
fed9346444 New config.toml + tls_emulation enabled by default 2026-02-25 17:49:54 +03:00
Alexey
f40b645c05 Defaults in-place 2026-02-25 17:28:06 +03:00
Alexey
a66d5d56bb Merge pull request #243 from vladon/add-proxy-secret-to-gitignore
Add proxy-secret to .gitignore
2026-02-25 14:16:31 +03:00
Vladislav Yaroslavlev
1b1bdfe99a Add proxy-secret to .gitignore
The proxy-secret file contains sensitive authentication data
that should never be committed to version control.
2026-02-25 14:00:50 +03:00
Alexey
49fc11ddfa Merge pull request #242 from telemt/flow-link
Detected_IP in Links
2026-02-25 13:42:41 +03:00
Alexey
5558900c44 Update main.rs 2026-02-25 13:29:46 +03:00
Alexey
5b1d976392 Merge pull request #239 from twocolors/fix-info-bracket
fix: remove bracket in info
2026-02-25 10:22:22 +03:00
D
206f87fe64 fix: remove bracket in info 2026-02-25 09:22:26 +03:00
Alexey
5a09d30e1c Update Cargo.toml 2026-02-25 03:09:02 +03:00
Alexey
f83e23c521 Update defaults.rs 2026-02-25 03:08:34 +03:00
Alexey
f9e9ddd0f7 Merge pull request #238 from telemt/flow-mep
ME Pool Beobachter
2026-02-25 02:24:07 +03:00
Alexey
6b8619d3c9 Create beobachten.rs 2026-02-25 02:17:48 +03:00
Alexey
618b7a1837 ME Pool Beobachter 2026-02-25 02:10:14 +03:00
Alexey
16f166cec8 Update README.md 2026-02-25 02:07:58 +03:00
Alexey
6efcbe9bbf Update README.md 2026-02-25 02:05:32 +03:00
Alexey
e5ad27e26e Merge pull request #237 from Dimasssss/main
Update config.toml
2026-02-25 01:50:19 +03:00
Dimasssss
53ec96b040 Update config.toml 2026-02-25 01:37:55 +03:00
Alexey
c6c3d71b08 ME Pool Flap-Detect in statistics 2026-02-25 01:26:01 +03:00
15 changed files with 929 additions and 229 deletions

6
.gitignore vendored
View File

@@ -19,7 +19,5 @@ target
# and can be added to the global gitignore or merged into this file. For a more nuclear # and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ #.idea/
*.rs
target proxy-secret
Cargo.lock
src

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.0.15" version = "3.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@@ -10,28 +10,18 @@
### 🇷🇺 RU ### 🇷🇺 RU
#### Драфтинг LTS и текущие улучшения #### Релиз 3.0.15 — 25 февраля
С 21 февраля мы начали подготовку LTS-версии. 25 февраля мы выпустили версию **3.0.15**
Мы внимательно анализируем весь доступный фидбек. Мы предполагаем, что она станет завершающей версией поколения 3.0 и уже сейчас мы рассматриваем её как **LTS-кандидата** для версии **3.1.0**!
Наша цель — сделать LTS-кандидаты максимально стабильными, тщательно отлаженными и готовыми к long-run и highload production-сценариям.
--- После нескольких дней детального анализа особенностей работы Middle-End мы спроектировали и реализовали продуманный режим **ротации ME Writer**. Данный режим позволяет поддерживать стабильно высокую производительность в long-run сценариях без возникновения ошибок, связанных с некорректной конфигурацией прокси
#### Улучшения от 23 февраля Будем рады вашему фидбеку и предложениям по улучшению — особенно в части **статистики** и **UX**
23 февраля были внесены улучшения производительности в режимах **DC** и **Middle-End (ME)**, с акцентом на обратный канал (путь клиент → DC / ME).
Дополнительно реализован ряд изменений, направленных на повышение устойчивости системы:
- Смягчение сетевой нестабильности
- Повышение устойчивости к десинхронизации криптографии
- Снижение дрейфа сессий при неблагоприятных условиях
- Улучшение обработки ошибок в edge-case транспортных сценариях
Релиз: Релиз:
[3.0.12](https://github.com/telemt/telemt/releases/tag/3.0.12) [3.0.15](https://github.com/telemt/telemt/releases/tag/3.0.15)
--- ---
@@ -48,28 +38,18 @@
### 🇬🇧 EN ### 🇬🇧 EN
#### LTS Drafting and Ongoing Improvements #### Release 3.0.15 — February 25
Starting February 21, we began drafting the upcoming LTS version. On February 25, we released version **3.0.15**
We are carefully reviewing and analyzing all available feedback. We expect this to become the final release of the 3.0 generation and at this point, we already see it as a strong **LTS candidate** for the upcoming **3.1.0** release!
The goal is to ensure that LTS candidates are максимально stable, thoroughly debugged, and ready for long-run and high-load production scenarios.
--- After several days of deep analysis of Middle-End behavior, we designed and implemented a well-engineered **ME Writer rotation mode**. This mode enables sustained high throughput in long-run scenarios while preventing proxy misconfiguration errors
#### February 23 Improvements We are looking forward to your feedback and improvement proposals — especially regarding **statistics** and **UX**
On February 23, we introduced performance improvements for both **DC** and **Middle-End (ME)** modes, specifically optimizing the reverse channel (client → DC / ME data path).
Additionally, we implemented a set of robustness enhancements designed to:
- Mitigate network-related instability
- Improve resilience against cryptographic desynchronization
- Reduce session drift under adverse conditions
- Improve error handling in edge-case transport scenarios
Release: Release:
[3.0.12](https://github.com/telemt/telemt/releases/tag/3.0.12) [3.0.15](https://github.com/telemt/telemt/releases/tag/3.0.15)
--- ---

204
config.full.toml Normal file
View File

@@ -0,0 +1,204 @@
# Telemt full config with default values.
# Examples are kept in comments after '#'.
# Top-level legacy field.
show_link = [] # example: "*" or ["alice", "bob"]
# default_dc = 2 # example: default DC for unmapped non-standard DCs
[general]
fast_mode = true
use_middle_proxy = false
# ad_tag = "00000000000000000000000000000000" # example
# proxy_secret_path = "proxy-secret" # example custom path
# middle_proxy_nat_ip = "203.0.113.10" # example public NAT IP override
middle_proxy_nat_probe = true
# middle_proxy_nat_stun = "stun.l.google.com:19302" # example
# middle_proxy_nat_stun_servers = [] # example: ["stun1.l.google.com:19302", "stun2.l.google.com:19302"]
middle_proxy_pool_size = 8
middle_proxy_warm_standby = 16
me_keepalive_enabled = true
me_keepalive_interval_secs = 25
me_keepalive_jitter_secs = 5
me_keepalive_payload_random = true
crypto_pending_buffer = 262144
max_client_frame = 16777216
desync_all_full = false
beobachten = true
beobachten_minutes = 10
beobachten_flush_secs = 15
beobachten_file = "cache/beobachten.txt"
hardswap = true
me_warmup_stagger_enabled = true
me_warmup_step_delay_ms = 500
me_warmup_step_jitter_ms = 300
me_reconnect_max_concurrent_per_dc = 8
me_reconnect_backoff_base_ms = 500
me_reconnect_backoff_cap_ms = 30000
me_reconnect_fast_retry_count = 12
stun_iface_mismatch_ignore = false
unknown_dc_log_path = "unknown-dc.txt" # to disable: set to null
log_level = "normal" # debug | verbose | normal | silent
disable_colors = false
fast_mode_min_tls_record = 0
update_every = 300
me_reinit_every_secs = 900
me_hardswap_warmup_delay_min_ms = 1000
me_hardswap_warmup_delay_max_ms = 2000
me_hardswap_warmup_extra_passes = 3
me_hardswap_warmup_pass_backoff_base_ms = 500
me_config_stable_snapshots = 2
me_config_apply_cooldown_secs = 300
proxy_secret_stable_snapshots = 2
proxy_secret_rotate_runtime = true
proxy_secret_len_max = 256
me_pool_drain_ttl_secs = 90
me_pool_min_fresh_ratio = 0.8
me_reinit_drain_timeout_secs = 120
# Legacy compatibility fields used when update_every is omitted.
proxy_secret_auto_reload_secs = 3600
proxy_config_auto_reload_secs = 3600
ntp_check = true
ntp_servers = ["pool.ntp.org"] # example: ["pool.ntp.org", "time.cloudflare.com"]
auto_degradation_enabled = true
degradation_min_unavailable_dc_groups = 2
[general.modes]
classic = false
secure = false
tls = true
[general.links]
show ="*" # example: "*" or ["alice", "bob"]
# public_host = "proxy.example.com" # example explicit host/IP for tg:// links
# public_port = 443 # example explicit port for tg:// links
[network]
ipv4 = true
ipv6 = false # set true to enable IPv6
prefer = 4 # 4 or 6
multipath = false
stun_servers = [
"stun.l.google.com:5349",
"stun1.l.google.com:3478",
"stun.gmx.net:3478",
"stun.l.google.com:19302",
"stun.1und1.de:3478",
"stun1.l.google.com:19302",
"stun2.l.google.com:19302",
"stun3.l.google.com:19302",
"stun4.l.google.com:19302",
"stun.services.mozilla.com:3478",
"stun.stunprotocol.org:3478",
"stun.nextcloud.com:3478",
"stun.voip.eutelia.it:3478",
]
stun_tcp_fallback = true
http_ip_detect_urls = ["https://ifconfig.me/ip", "https://api.ipify.org"]
cache_public_ip_path = "cache/public_ip.txt"
[server]
port = 443
listen_addr_ipv4 = "0.0.0.0"
listen_addr_ipv6 = "::"
# listen_unix_sock = "/var/run/telemt.sock" # example
# listen_unix_sock_perm = "0660" # example unix socket mode
# listen_tcp = true # example explicit override (auto-detected when omitted)
proxy_protocol = false
# metrics_port = 9090 # example
metrics_whitelist = ["127.0.0.1/32", "::1/128"]
# Example explicit listeners (default: omitted, auto-generated from listen_addr_*):
# [[server.listeners]]
# ip = "0.0.0.0"
# announce = "proxy-v4.example.com"
# # announce_ip = "203.0.113.10" # deprecated alias
# proxy_protocol = false
# reuse_allow = false
#
# [[server.listeners]]
# ip = "::"
# announce = "proxy-v6.example.com"
# proxy_protocol = false
# reuse_allow = false
[timeouts]
client_handshake = 15
tg_connect = 10
client_keepalive = 60
client_ack = 300
me_one_retry = 3
me_one_timeout_ms = 1500
[censorship]
tls_domain = "petrovich.ru"
# tls_domains = ["example.com", "cdn.example.net"] # Additional domains for EE links
mask = true
# mask_host = "www.google.com" # example, defaults to tls_domain when both mask_host/mask_unix_sock are unset
# mask_unix_sock = "/var/run/nginx.sock" # example, mutually exclusive with mask_host
mask_port = 443
fake_cert_len = 2048 # if tls_emulation=false and default value is used, loader may randomize this value at runtime
tls_emulation = true
tls_front_dir = "tlsfront"
server_hello_delay_min_ms = 0
server_hello_delay_max_ms = 0
tls_new_session_tickets = 0
tls_full_cert_ttl_secs = 90
alpn_enforce = true
[access]
replay_check_len = 65536
replay_window_secs = 1800
ignore_time_skew = false
[access.users]
# format: "username" = "32_hex_chars_secret"
hello = "00000000000000000000000000000000"
# alice = "11111111111111111111111111111111" # example
[access.user_max_tcp_conns]
# alice = 100 # example
[access.user_expirations]
# alice = "2078-01-01T00:00:00Z" # example
[access.user_data_quota]
# hello = 10737418240 # example bytes
# alice = 10737418240 # example bytes
[access.user_max_unique_ips]
# hello = 10 # example
# alice = 100 # example
# Default behavior if [[upstreams]] is omitted: loader injects one direct upstream.
# Example explicit upstreams:
# [[upstreams]]
# type = "direct"
# interface = "eth0"
# bind_addresses = ["192.0.2.10"]
# weight = 1
# enabled = true
# scopes = "*"
#
# [[upstreams]]
# type = "socks4"
# address = "198.51.100.20:1080"
# interface = "eth0"
# user_id = "telemt"
# weight = 1
# enabled = true
# scopes = "*"
#
# [[upstreams]]
# type = "socks5"
# address = "198.51.100.30:1080"
# interface = "eth0"
# username = "proxy-user"
# password = "proxy-pass"
# weight = 1
# enabled = true
# scopes = "*"
# === DC Address Overrides ===
# [dc_overrides]
# "201" = "149.154.175.50:443" # example
# "202" = ["149.154.167.51:443", "149.154.175.100:443"] # example
# "203" = "91.105.192.100:443" # loader auto-adds this one when omitted

View File

@@ -1,11 +1,7 @@
# === General Settings === # === General Settings ===
[general] [general]
fast_mode = true
use_middle_proxy = true use_middle_proxy = true
# ad_tag = "00000000000000000000000000000000" # ad_tag = "00000000000000000000000000000000"
# Path to proxy-secret binary (auto-downloaded if missing).
proxy_secret_path = "proxy-secret"
# disable_colors = false # Disable colored output in logs (useful for files/systemd)
# === Log Level === # === Log Level ===
# Log level: debug | verbose | normal | silent # Log level: debug | verbose | normal | silent
@@ -13,150 +9,29 @@ proxy_secret_path = "proxy-secret"
# RUST_LOG env var takes absolute priority over all of these # RUST_LOG env var takes absolute priority over all of these
log_level = "normal" log_level = "normal"
# === Middle Proxy - ME ===
# Public IP override for ME KDF when behind NAT; leave unset to auto-detect.
# middle_proxy_nat_ip = "203.0.113.10"
# Enable STUN probing to discover public IP:port for ME.
middle_proxy_nat_probe = true
# Primary STUN server (host:port); defaults to Telegram STUN when empty.
middle_proxy_nat_stun = "stun.l.google.com:19302"
# Optional fallback STUN servers list.
middle_proxy_nat_stun_servers = ["stun1.l.google.com:19302", "stun2.l.google.com:19302"]
# Desired number of concurrent ME writers in pool.
middle_proxy_pool_size = 8
# Pre-initialized warm-standby ME connections kept idle.
middle_proxy_warm_standby = 8
# Ignore STUN/interface mismatch and keep ME enabled even if IP differs.
stun_iface_mismatch_ignore = false
# Keepalive padding frames - fl==4
me_keepalive_enabled = true
me_keepalive_interval_secs = 25 # Period between keepalives
me_keepalive_jitter_secs = 5 # Jitter added to interval
me_keepalive_payload_random = true # Randomize 4-byte payload (vs zeros)
# Stagger extra ME connections on warmup to de-phase lifecycles.
me_warmup_stagger_enabled = true
me_warmup_step_delay_ms = 500 # Base delay between extra connects
me_warmup_step_jitter_ms = 300 # Jitter for warmup delay
# Reconnect policy knobs.
me_reconnect_max_concurrent_per_dc = 4 # Parallel reconnects per DC - EXPERIMENTAL! UNSTABLE!
me_reconnect_backoff_base_ms = 500 # Backoff start
me_reconnect_backoff_cap_ms = 30000 # Backoff cap
me_reconnect_fast_retry_count = 11 # Quick retries before backoff
update_every = 7200 # Resolve the active updater interval for ME infrastructure refresh tasks.
crypto_pending_buffer = 262144 # Max pending ciphertext buffer per client writer (bytes). Controls FakeTLS backpressure vs throughput.
max_client_frame = 16777216 # Maximum allowed client MTProto frame size (bytes).
desync_all_full = false # Emit full crypto-desync forensic logs for every event. When false, full forensic details are emitted once per key window.
auto_degradation_enabled = true # Enable auto-degradation from ME to Direct-DC.
degradation_min_unavailable_dc_groups = 2 # Minimum unavailable ME DC groups before degrading.
hardswap = true # Enable C-like hard-swap for ME pool generations. When true, Telemt prewarms a new generation and switches once full coverage is reached.
me_pool_drain_ttl_secs = 90 # Drain-TTL in seconds for stale ME writers after endpoint map changes. During TTL, stale writers may be used only as fallback for new bindings.
me_pool_min_fresh_ratio = 0.8 # Minimum desired-DC coverage ratio required before draining stale writers. Range: 0.0..=1.0.
me_reinit_drain_timeout_secs = 120 # Drain timeout in seconds for stale ME writers after endpoint map changes. Set to 0 to keep stale writers draining indefinitely (no force-close).
me_config_stable_snapshots = 2 # Number of identical getProxyConfig snapshots required before applying ME map updates.
me_config_apply_cooldown_secs = 300 # Cooldown in seconds between applied ME map updates.
proxy_secret_rotate_runtime = true # Enable runtime proxy-secret rotation from getProxySecret.
proxy_secret_stable_snapshots = 2 # Number of identical getProxySecret snapshots required before runtime secret rotation.
proxy_secret_len_max = 256 # Maximum allowed proxy-secret length in bytes for startup and runtime refresh.
[general.modes] [general.modes]
classic = false classic = false
secure = false secure = false
tls = true tls = true
[general.links]
show = "*"
# show = ["alice", "bob"] # Only show links for alice and bob
# show = "*" # Show links for all users
# public_host = "proxy.example.com" # Host (IP or domain) for tg:// links
# public_port = 443 # Port for tg:// links (default: server.port)
# === Network Parameters ===
[network]
# Enable/disable families: true/false/auto(None)
ipv4 = true
ipv6 = false # UNSTABLE WITH ME
# prefer = 4 or 6
prefer = 4
multipath = false # EXPERIMENTAL!
# === Server Binding === # === Server Binding ===
[server] [server]
port = 443 port = 443
listen_addr_ipv4 = "0.0.0.0"
listen_addr_ipv6 = "::"
# listen_unix_sock = "/var/run/telemt.sock" # Unix socket
# listen_unix_sock_perm = "0666" # Socket file permissions
# proxy_protocol = false # Enable if behind HAProxy/nginx with PROXY protocol # proxy_protocol = false # Enable if behind HAProxy/nginx with PROXY protocol
# metrics_port = 9090 metrics_port = 9090
# metrics_whitelist = ["127.0.0.1", "::1"] metrics_whitelist = ["127.0.0.1", "::1", "0.0.0.0/0"]
# Listen on multiple interfaces/IPs - IPv4 # Listen on multiple interfaces/IPs - IPv4
[[server.listeners]] [[server.listeners]]
ip = "0.0.0.0" ip = "0.0.0.0"
# Listen on multiple interfaces/IPs - IPv6
[[server.listeners]]
ip = "::"
# === Timeouts (in seconds) ===
[timeouts]
client_handshake = 30
tg_connect = 10
client_keepalive = 60
client_ack = 300
# Quick ME reconnects for single-address DCs (count and per-attempt timeout, ms).
me_one_retry = 12
me_one_timeout_ms = 1200
# === Anti-Censorship & Masking === # === Anti-Censorship & Masking ===
[censorship] [censorship]
tls_domain = "petrovich.ru" tls_domain = "petrovich.ru"
# tls_domains = ["example.com", "cdn.example.net"] # Additional domains for EE links
mask = true mask = true
mask_port = 443 tls_emulation = true # Fetch real cert lengths and emulate TLS records
# mask_host = "petrovich.ru" # Defaults to tls_domain if not set tls_front_dir = "tlsfront" # Cache directory for TLS emulation
# mask_unix_sock = "/var/run/nginx.sock" # Unix socket (mutually exclusive with mask_host)
fake_cert_len = 2048
# tls_emulation = false # Fetch real cert lengths and emulate TLS records
# tls_front_dir = "tlsfront" # Cache directory for TLS emulation
# === Access Control & Users ===
[access]
replay_check_len = 65536
replay_window_secs = 1800
ignore_time_skew = false
[access.users] [access.users]
# format: "username" = "32_hex_chars_secret" # format: "username" = "32_hex_chars_secret"
hello = "00000000000000000000000000000000" hello = "00000000000000000000000000000000"
# [access.user_max_tcp_conns]
# hello = 50
# [access.user_max_unique_ips]
# hello = 5
# [access.user_data_quota]
# hello = 1073741824 # 1 GB
# [access.user_expirations]
# format: username = "[year]-[month]-[day]T[hour]:[minute]:[second]Z" UTC
# hello = "2027-01-01T00:00:00Z"
# === Upstreams & Routing ===
[[upstreams]]
type = "direct"
enabled = true
weight = 10
# interface = "192.168.1.100" # Bind outgoing to specific IP or iface name
# bind_addresses = ["192.168.1.100"] # List for round-robin binding (family must match target)
# [[upstreams]]
# type = "socks5"
# address = "127.0.0.1:1080"
# enabled = false
# weight = 1
# === DC Address Overrides ===
# [dc_overrides]
# "203" = "91.105.192.100:443"

View File

@@ -3,6 +3,15 @@ use ipnetwork::IpNetwork;
use serde::Deserialize; use serde::Deserialize;
// Helper defaults kept private to the config module. // Helper defaults kept private to the config module.
const DEFAULT_NETWORK_IPV6: Option<bool> = Some(false);
const DEFAULT_STUN_TCP_FALLBACK: bool = true;
const DEFAULT_MIDDLE_PROXY_WARM_STANDBY: usize = 16;
const DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC: u32 = 8;
const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 12;
const DEFAULT_LISTEN_ADDR_IPV6: &str = "::";
const DEFAULT_ACCESS_USER: &str = "default";
const DEFAULT_ACCESS_SECRET: &str = "00000000000000000000000000000000";
pub(crate) fn default_true() -> bool { pub(crate) fn default_true() -> bool {
true true
} }
@@ -77,6 +86,14 @@ pub(crate) fn default_prefer_4() -> u8 {
4 4
} }
pub(crate) fn default_network_ipv6() -> Option<bool> {
DEFAULT_NETWORK_IPV6
}
pub(crate) fn default_stun_tcp_fallback() -> bool {
DEFAULT_STUN_TCP_FALLBACK
}
pub(crate) fn default_unknown_dc_log_path() -> Option<String> { pub(crate) fn default_unknown_dc_log_path() -> Option<String> {
Some("unknown-dc.txt".to_string()) Some("unknown-dc.txt".to_string())
} }
@@ -85,6 +102,10 @@ pub(crate) fn default_pool_size() -> usize {
8 8
} }
pub(crate) fn default_middle_proxy_warm_standby() -> usize {
DEFAULT_MIDDLE_PROXY_WARM_STANDBY
}
pub(crate) fn default_keepalive_interval() -> u64 { pub(crate) fn default_keepalive_interval() -> u64 {
25 25
} }
@@ -109,6 +130,14 @@ pub(crate) fn default_reconnect_backoff_cap_ms() -> u64 {
30_000 30_000
} }
pub(crate) fn default_me_reconnect_max_concurrent_per_dc() -> u32 {
DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC
}
pub(crate) fn default_me_reconnect_fast_retry_count() -> u32 {
DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT
}
pub(crate) fn default_crypto_pending_buffer() -> usize { pub(crate) fn default_crypto_pending_buffer() -> usize {
256 * 1024 256 * 1024
} }
@@ -121,6 +150,18 @@ pub(crate) fn default_desync_all_full() -> bool {
false false
} }
pub(crate) fn default_beobachten_minutes() -> u64 {
10
}
pub(crate) fn default_beobachten_flush_secs() -> u64 {
15
}
pub(crate) fn default_beobachten_file() -> String {
"cache/beobachten.txt".to_string()
}
pub(crate) fn default_tls_new_session_tickets() -> u8 { pub(crate) fn default_tls_new_session_tickets() -> u8 {
0 0
} }
@@ -179,7 +220,11 @@ pub(crate) fn default_proxy_config_reload_secs() -> u64 {
} }
pub(crate) fn default_update_every_secs() -> u64 { pub(crate) fn default_update_every_secs() -> u64 {
30 * 60 5 * 60
}
pub(crate) fn default_update_every() -> Option<u64> {
Some(default_update_every_secs())
} }
pub(crate) fn default_me_reinit_every_secs() -> u64 { pub(crate) fn default_me_reinit_every_secs() -> u64 {
@@ -254,6 +299,17 @@ pub(crate) fn default_degradation_min_unavailable_dc_groups() -> u8 {
2 2
} }
pub(crate) fn default_listen_addr_ipv6() -> String {
DEFAULT_LISTEN_ADDR_IPV6.to_string()
}
pub(crate) fn default_access_users() -> HashMap<String, String> {
HashMap::from([(
DEFAULT_ACCESS_USER.to_string(),
DEFAULT_ACCESS_SECRET.to_string(),
)])
}
// Custom deserializer helpers // Custom deserializer helpers
#[derive(Deserialize)] #[derive(Deserialize)]

View File

@@ -153,6 +153,24 @@ impl ProxyConfig {
)); ));
} }
if config.general.beobachten_minutes == 0 {
return Err(ProxyError::Config(
"general.beobachten_minutes must be > 0".to_string(),
));
}
if config.general.beobachten_flush_secs == 0 {
return Err(ProxyError::Config(
"general.beobachten_flush_secs must be > 0".to_string(),
));
}
if config.general.beobachten_file.trim().is_empty() {
return Err(ProxyError::Config(
"general.beobachten_file cannot be empty".to_string(),
));
}
if config.general.me_hardswap_warmup_delay_max_ms == 0 { if config.general.me_hardswap_warmup_delay_max_ms == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.me_hardswap_warmup_delay_max_ms must be > 0".to_string(), "general.me_hardswap_warmup_delay_max_ms must be > 0".to_string(),
@@ -409,6 +427,55 @@ impl ProxyConfig {
mod tests { mod tests {
use super::*; use super::*;
#[test]
fn serde_defaults_remain_unchanged_for_present_sections() {
let toml = r#"
[network]
[general]
[server]
[access]
"#;
let cfg: ProxyConfig = toml::from_str(toml).unwrap();
assert_eq!(cfg.network.ipv6, None);
assert!(!cfg.network.stun_tcp_fallback);
assert_eq!(cfg.general.middle_proxy_warm_standby, 0);
assert_eq!(cfg.general.me_reconnect_max_concurrent_per_dc, 0);
assert_eq!(cfg.general.me_reconnect_fast_retry_count, 0);
assert_eq!(cfg.general.update_every, None);
assert_eq!(cfg.server.listen_addr_ipv4, None);
assert_eq!(cfg.server.listen_addr_ipv6, None);
assert!(cfg.access.users.is_empty());
}
#[test]
fn impl_defaults_are_sourced_from_default_helpers() {
let network = NetworkConfig::default();
assert_eq!(network.ipv6, default_network_ipv6());
assert_eq!(network.stun_tcp_fallback, default_stun_tcp_fallback());
let general = GeneralConfig::default();
assert_eq!(
general.middle_proxy_warm_standby,
default_middle_proxy_warm_standby()
);
assert_eq!(
general.me_reconnect_max_concurrent_per_dc,
default_me_reconnect_max_concurrent_per_dc()
);
assert_eq!(
general.me_reconnect_fast_retry_count,
default_me_reconnect_fast_retry_count()
);
assert_eq!(general.update_every, default_update_every());
let server = ServerConfig::default();
assert_eq!(server.listen_addr_ipv6, Some(default_listen_addr_ipv6()));
let access = AccessConfig::default();
assert_eq!(access.users, default_access_users());
}
#[test] #[test]
fn dc_overrides_allow_string_and_array() { fn dc_overrides_allow_string_and_array() {
let toml = r#" let toml = r#"

View File

@@ -76,7 +76,7 @@ impl Default for ProxyModes {
Self { Self {
classic: false, classic: false,
secure: false, secure: false,
tls: true, tls: default_true(),
} }
} }
} }
@@ -117,12 +117,12 @@ pub struct NetworkConfig {
impl Default for NetworkConfig { impl Default for NetworkConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
ipv4: true, ipv4: default_true(),
ipv6: Some(false), ipv6: default_network_ipv6(),
prefer: 4, prefer: default_prefer_4(),
multipath: false, multipath: false,
stun_servers: default_stun_servers(), stun_servers: default_stun_servers(),
stun_tcp_fallback: true, stun_tcp_fallback: default_stun_tcp_fallback(),
http_ip_detect_urls: default_http_ip_detect_urls(), http_ip_detect_urls: default_http_ip_detect_urls(),
cache_public_ip_path: default_cache_public_ip_path(), cache_public_ip_path: default_cache_public_ip_path(),
} }
@@ -206,6 +206,22 @@ pub struct GeneralConfig {
#[serde(default = "default_desync_all_full")] #[serde(default = "default_desync_all_full")]
pub desync_all_full: bool, pub desync_all_full: bool,
/// Enable per-IP forensic observation buckets for scanners and handshake failures.
#[serde(default)]
pub beobachten: bool,
/// Observation retention window in minutes for per-IP forensic buckets.
#[serde(default = "default_beobachten_minutes")]
pub beobachten_minutes: u64,
/// Snapshot flush interval in seconds for beob output file.
#[serde(default = "default_beobachten_flush_secs")]
pub beobachten_flush_secs: u64,
/// Snapshot file path for beob output.
#[serde(default = "default_beobachten_file")]
pub beobachten_file: String,
/// Enable C-like hard-swap for ME pool generations. /// Enable C-like hard-swap for ME pool generations.
/// When true, Telemt prewarms a new generation and switches once full coverage is reached. /// When true, Telemt prewarms a new generation and switches once full coverage is reached.
#[serde(default = "default_hardswap")] #[serde(default = "default_hardswap")]
@@ -354,27 +370,27 @@ impl Default for GeneralConfig {
Self { Self {
modes: ProxyModes::default(), modes: ProxyModes::default(),
prefer_ipv6: false, prefer_ipv6: false,
fast_mode: true, fast_mode: default_true(),
use_middle_proxy: false, use_middle_proxy: false,
ad_tag: None, ad_tag: None,
proxy_secret_path: None, proxy_secret_path: None,
middle_proxy_nat_ip: None, middle_proxy_nat_ip: None,
middle_proxy_nat_probe: false, middle_proxy_nat_probe: true,
middle_proxy_nat_stun: None, middle_proxy_nat_stun: None,
middle_proxy_nat_stun_servers: Vec::new(), middle_proxy_nat_stun_servers: Vec::new(),
middle_proxy_pool_size: default_pool_size(), middle_proxy_pool_size: default_pool_size(),
middle_proxy_warm_standby: 16, middle_proxy_warm_standby: default_middle_proxy_warm_standby(),
me_keepalive_enabled: true, me_keepalive_enabled: default_true(),
me_keepalive_interval_secs: default_keepalive_interval(), me_keepalive_interval_secs: default_keepalive_interval(),
me_keepalive_jitter_secs: default_keepalive_jitter(), me_keepalive_jitter_secs: default_keepalive_jitter(),
me_keepalive_payload_random: true, me_keepalive_payload_random: default_true(),
me_warmup_stagger_enabled: true, me_warmup_stagger_enabled: default_true(),
me_warmup_step_delay_ms: default_warmup_step_delay_ms(), me_warmup_step_delay_ms: default_warmup_step_delay_ms(),
me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(), me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(),
me_reconnect_max_concurrent_per_dc: 8, me_reconnect_max_concurrent_per_dc: default_me_reconnect_max_concurrent_per_dc(),
me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(), me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(),
me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(), me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(),
me_reconnect_fast_retry_count: 8, me_reconnect_fast_retry_count: default_me_reconnect_fast_retry_count(),
stun_iface_mismatch_ignore: false, stun_iface_mismatch_ignore: false,
unknown_dc_log_path: default_unknown_dc_log_path(), unknown_dc_log_path: default_unknown_dc_log_path(),
log_level: LogLevel::Normal, log_level: LogLevel::Normal,
@@ -383,9 +399,13 @@ impl Default for GeneralConfig {
crypto_pending_buffer: default_crypto_pending_buffer(), crypto_pending_buffer: default_crypto_pending_buffer(),
max_client_frame: default_max_client_frame(), max_client_frame: default_max_client_frame(),
desync_all_full: default_desync_all_full(), desync_all_full: default_desync_all_full(),
beobachten: true,
beobachten_minutes: default_beobachten_minutes(),
beobachten_flush_secs: default_beobachten_flush_secs(),
beobachten_file: default_beobachten_file(),
hardswap: default_hardswap(), hardswap: default_hardswap(),
fast_mode_min_tls_record: default_fast_mode_min_tls_record(), fast_mode_min_tls_record: default_fast_mode_min_tls_record(),
update_every: Some(default_update_every_secs()), update_every: default_update_every(),
me_reinit_every_secs: default_me_reinit_every_secs(), me_reinit_every_secs: default_me_reinit_every_secs(),
me_hardswap_warmup_delay_min_ms: default_me_hardswap_warmup_delay_min_ms(), me_hardswap_warmup_delay_min_ms: default_me_hardswap_warmup_delay_min_ms(),
me_hardswap_warmup_delay_max_ms: default_me_hardswap_warmup_delay_max_ms(), me_hardswap_warmup_delay_max_ms: default_me_hardswap_warmup_delay_max_ms(),
@@ -403,7 +423,7 @@ impl Default for GeneralConfig {
proxy_config_auto_reload_secs: default_proxy_config_reload_secs(), proxy_config_auto_reload_secs: default_proxy_config_reload_secs(),
ntp_check: default_ntp_check(), ntp_check: default_ntp_check(),
ntp_servers: default_ntp_servers(), ntp_servers: default_ntp_servers(),
auto_degradation_enabled: true, auto_degradation_enabled: default_true(),
degradation_min_unavailable_dc_groups: default_degradation_min_unavailable_dc_groups(), degradation_min_unavailable_dc_groups: default_degradation_min_unavailable_dc_groups(),
} }
} }
@@ -490,7 +510,7 @@ impl Default for ServerConfig {
Self { Self {
port: default_port(), port: default_port(),
listen_addr_ipv4: Some(default_listen_addr()), listen_addr_ipv4: Some(default_listen_addr()),
listen_addr_ipv6: Some("::".to_string()), listen_addr_ipv6: Some(default_listen_addr_ipv6()),
listen_unix_sock: None, listen_unix_sock: None,
listen_unix_sock_perm: None, listen_unix_sock_perm: None,
listen_tcp: None, listen_tcp: None,
@@ -598,12 +618,12 @@ impl Default for AntiCensorshipConfig {
Self { Self {
tls_domain: default_tls_domain(), tls_domain: default_tls_domain(),
tls_domains: Vec::new(), tls_domains: Vec::new(),
mask: true, mask: default_true(),
mask_host: None, mask_host: None,
mask_port: default_mask_port(), mask_port: default_mask_port(),
mask_unix_sock: None, mask_unix_sock: None,
fake_cert_len: default_fake_cert_len(), fake_cert_len: default_fake_cert_len(),
tls_emulation: false, tls_emulation: true,
tls_front_dir: default_tls_front_dir(), tls_front_dir: default_tls_front_dir(),
server_hello_delay_min_ms: default_server_hello_delay_min_ms(), server_hello_delay_min_ms: default_server_hello_delay_min_ms(),
server_hello_delay_max_ms: default_server_hello_delay_max_ms(), server_hello_delay_max_ms: default_server_hello_delay_max_ms(),
@@ -643,13 +663,8 @@ pub struct AccessConfig {
impl Default for AccessConfig { impl Default for AccessConfig {
fn default() -> Self { fn default() -> Self {
let mut users = HashMap::new();
users.insert(
"default".to_string(),
"00000000000000000000000000000000".to_string(),
);
Self { Self {
users, users: default_access_users(),
user_max_tcp_conns: HashMap::new(), user_max_tcp_conns: HashMap::new(),
user_expirations: HashMap::new(), user_expirations: HashMap::new(),
user_data_quota: HashMap::new(), user_data_quota: HashMap::new(),

View File

@@ -35,6 +35,7 @@ use crate::crypto::SecureRandom;
use crate::ip_tracker::UserIpTracker; use crate::ip_tracker::UserIpTracker;
use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe}; use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe};
use crate::proxy::ClientHandler; use crate::proxy::ClientHandler;
use crate::stats::beobachten::BeobachtenStore;
use crate::stats::{ReplayChecker, Stats}; use crate::stats::{ReplayChecker, Stats};
use crate::stream::BufferPool; use crate::stream::BufferPool;
use crate::transport::middle_proxy::{ use crate::transport::middle_proxy::{
@@ -159,6 +160,15 @@ fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {
info!(target: "telemt::links", "------------------------"); info!(target: "telemt::links", "------------------------");
} }
async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result<()> {
if let Some(parent) = std::path::Path::new(path).parent()
&& !parent.as_os_str().is_empty()
{
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(path, payload).await
}
#[tokio::main] #[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> { async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let (config_path, cli_silent, cli_log_level) = parse_cli(); let (config_path, cli_silent, cli_log_level) = parse_cli();
@@ -193,14 +203,14 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
}; };
let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info")); let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info"));
// Configure color output based on config // Configure color output based on config
let fmt_layer = if config.general.disable_colors { let fmt_layer = if config.general.disable_colors {
fmt::Layer::default().with_ansi(false) fmt::Layer::default().with_ansi(false)
} else { } else {
fmt::Layer::default().with_ansi(true) fmt::Layer::default().with_ansi(true)
}; };
tracing_subscriber::registry() tracing_subscriber::registry()
.with(filter_layer) .with(filter_layer)
.with(fmt_layer) .with(fmt_layer)
@@ -256,6 +266,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let prefer_ipv6 = decision.prefer_ipv6(); let prefer_ipv6 = decision.prefer_ipv6();
let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me); let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me);
let stats = Arc::new(Stats::new()); let stats = Arc::new(Stats::new());
let beobachten = Arc::new(BeobachtenStore::new());
let rng = Arc::new(SecureRandom::new()); let rng = Arc::new(SecureRandom::new());
// IP Tracker initialization // IP Tracker initialization
@@ -595,9 +606,9 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
info!(" IPv4 in use / IPv6 is fallback"); info!(" IPv4 in use / IPv6 is fallback");
} }
} else if v6_works && !v4_works { } else if v6_works && !v4_works {
info!(" IPv6 only / IPv4 unavailable)"); info!(" IPv6 only / IPv4 unavailable");
} else if v4_works && !v6_works { } else if v4_works && !v6_works {
info!(" IPv4 only / IPv6 unavailable)"); info!(" IPv4 only / IPv6 unavailable");
} else if !v6_works && !v4_works { } else if !v6_works && !v4_works {
info!(" No DC connectivity"); info!(" No DC connectivity");
} }
@@ -666,14 +677,8 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
rc_clone.run_periodic_cleanup().await; rc_clone.run_periodic_cleanup().await;
}); });
let detected_ip_v4: Option<std::net::IpAddr> = probe let detected_ip_v4: Option<std::net::IpAddr> = probe.detected_ipv4.map(std::net::IpAddr::V4);
.reflected_ipv4 let detected_ip_v6: Option<std::net::IpAddr> = probe.detected_ipv6.map(std::net::IpAddr::V6);
.map(|s| s.ip())
.or_else(|| probe.detected_ipv4.map(std::net::IpAddr::V4));
let detected_ip_v6: Option<std::net::IpAddr> = probe
.reflected_ipv6
.map(|s| s.ip())
.or_else(|| probe.detected_ipv6.map(std::net::IpAddr::V6));
debug!( debug!(
"Detected IPs: v4={:?} v6={:?}", "Detected IPs: v4={:?} v6={:?}",
detected_ip_v4, detected_ip_v6 detected_ip_v4, detected_ip_v6
@@ -692,6 +697,26 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
detected_ip_v6, detected_ip_v6,
); );
let beobachten_writer = beobachten.clone();
let config_rx_beobachten = config_rx.clone();
tokio::spawn(async move {
loop {
let cfg = config_rx_beobachten.borrow().clone();
let sleep_secs = cfg.general.beobachten_flush_secs.max(1);
if cfg.general.beobachten {
let ttl = Duration::from_secs(cfg.general.beobachten_minutes.saturating_mul(60));
let path = cfg.general.beobachten_file.clone();
let snapshot = beobachten_writer.snapshot_text(ttl);
if let Err(e) = write_beobachten_snapshot(&path, &snapshot).await {
warn!(error = %e, path = %path, "Failed to flush beobachten snapshot");
}
}
tokio::time::sleep(Duration::from_secs(sleep_secs)).await;
}
});
if let Some(ref pool) = me_pool { if let Some(ref pool) = me_pool {
let pool_clone = pool.clone(); let pool_clone = pool.clone();
let rng_clone = rng.clone(); let rng_clone = rng.clone();
@@ -860,6 +885,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me_pool = me_pool.clone(); let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone(); let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone(); let ip_tracker = ip_tracker.clone();
let beobachten = beobachten.clone();
let max_connections_unix = max_connections.clone(); let max_connections_unix = max_connections.clone();
tokio::spawn(async move { tokio::spawn(async move {
@@ -887,6 +913,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me_pool = me_pool.clone(); let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone(); let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone(); let ip_tracker = ip_tracker.clone();
let beobachten = beobachten.clone();
let proxy_protocol_enabled = config.server.proxy_protocol; let proxy_protocol_enabled = config.server.proxy_protocol;
tokio::spawn(async move { tokio::spawn(async move {
@@ -894,7 +921,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
if let Err(e) = crate::proxy::client::handle_client_stream( if let Err(e) = crate::proxy::client::handle_client_stream(
stream, fake_peer, config, stats, stream, fake_peer, config, stats,
upstream_manager, replay_checker, buffer_pool, rng, upstream_manager, replay_checker, buffer_pool, rng,
me_pool, tls_cache, ip_tracker, proxy_protocol_enabled, me_pool, tls_cache, ip_tracker, beobachten, proxy_protocol_enabled,
).await { ).await {
debug!(error = %e, "Unix socket connection error"); debug!(error = %e, "Unix socket connection error");
} }
@@ -942,9 +969,11 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
if let Some(port) = config.server.metrics_port { if let Some(port) = config.server.metrics_port {
let stats = stats.clone(); let stats = stats.clone();
let beobachten = beobachten.clone();
let config_rx_metrics = config_rx.clone();
let whitelist = config.server.metrics_whitelist.clone(); let whitelist = config.server.metrics_whitelist.clone();
tokio::spawn(async move { tokio::spawn(async move {
metrics::serve(port, stats, whitelist).await; metrics::serve(port, stats, beobachten, config_rx_metrics, whitelist).await;
}); });
} }
@@ -958,6 +987,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me_pool = me_pool.clone(); let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone(); let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone(); let ip_tracker = ip_tracker.clone();
let beobachten = beobachten.clone();
let max_connections_tcp = max_connections.clone(); let max_connections_tcp = max_connections.clone();
tokio::spawn(async move { tokio::spawn(async move {
@@ -980,6 +1010,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me_pool = me_pool.clone(); let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone(); let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone(); let ip_tracker = ip_tracker.clone();
let beobachten = beobachten.clone();
let proxy_protocol_enabled = listener_proxy_protocol; let proxy_protocol_enabled = listener_proxy_protocol;
tokio::spawn(async move { tokio::spawn(async move {
@@ -996,6 +1027,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
me_pool, me_pool,
tls_cache, tls_cache,
ip_tracker, ip_tracker,
beobachten,
proxy_protocol_enabled, proxy_protocol_enabled,
) )
.run() .run()

View File

@@ -1,6 +1,7 @@
use std::convert::Infallible; use std::convert::Infallible;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use http_body_util::Full; use http_body_util::Full;
use hyper::body::Bytes; use hyper::body::Bytes;
@@ -11,9 +12,17 @@ use ipnetwork::IpNetwork;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tracing::{info, warn, debug}; use tracing::{info, warn, debug};
use crate::config::ProxyConfig;
use crate::stats::beobachten::BeobachtenStore;
use crate::stats::Stats; use crate::stats::Stats;
pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) { pub async fn serve(
port: u16,
stats: Arc<Stats>,
beobachten: Arc<BeobachtenStore>,
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
whitelist: Vec<IpNetwork>,
) {
let addr = SocketAddr::from(([0, 0, 0, 0], port)); let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = match TcpListener::bind(addr).await { let listener = match TcpListener::bind(addr).await {
Ok(l) => l, Ok(l) => l,
@@ -22,7 +31,7 @@ pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) {
return; return;
} }
}; };
info!("Metrics endpoint: http://{}/metrics", addr); info!("Metrics endpoint: http://{}/metrics and /beobachten", addr);
loop { loop {
let (stream, peer) = match listener.accept().await { let (stream, peer) = match listener.accept().await {
@@ -39,10 +48,14 @@ pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) {
} }
let stats = stats.clone(); let stats = stats.clone();
let beobachten = beobachten.clone();
let config_rx_conn = config_rx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let svc = service_fn(move |req| { let svc = service_fn(move |req| {
let stats = stats.clone(); let stats = stats.clone();
async move { handle(req, &stats) } let beobachten = beobachten.clone();
let config = config_rx_conn.borrow().clone();
async move { handle(req, &stats, &beobachten, &config) }
}); });
if let Err(e) = http1::Builder::new() if let Err(e) = http1::Builder::new()
.serve_connection(hyper_util::rt::TokioIo::new(stream), svc) .serve_connection(hyper_util::rt::TokioIo::new(stream), svc)
@@ -54,24 +67,48 @@ pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) {
} }
} }
fn handle<B>(req: Request<B>, stats: &Stats) -> Result<Response<Full<Bytes>>, Infallible> { fn handle<B>(
if req.uri().path() != "/metrics" { req: Request<B>,
stats: &Stats,
beobachten: &BeobachtenStore,
config: &ProxyConfig,
) -> Result<Response<Full<Bytes>>, Infallible> {
if req.uri().path() == "/metrics" {
let body = render_metrics(stats);
let resp = Response::builder() let resp = Response::builder()
.status(StatusCode::NOT_FOUND) .status(StatusCode::OK)
.body(Full::new(Bytes::from("Not Found\n"))) .header("content-type", "text/plain; version=0.0.4; charset=utf-8")
.body(Full::new(Bytes::from(body)))
.unwrap();
return Ok(resp);
}
if req.uri().path() == "/beobachten" {
let body = render_beobachten(beobachten, config);
let resp = Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/plain; charset=utf-8")
.body(Full::new(Bytes::from(body)))
.unwrap(); .unwrap();
return Ok(resp); return Ok(resp);
} }
let body = render_metrics(stats);
let resp = Response::builder() let resp = Response::builder()
.status(StatusCode::OK) .status(StatusCode::NOT_FOUND)
.header("content-type", "text/plain; version=0.0.4; charset=utf-8") .body(Full::new(Bytes::from("Not Found\n")))
.body(Full::new(Bytes::from(body)))
.unwrap(); .unwrap();
Ok(resp) Ok(resp)
} }
fn render_beobachten(beobachten: &BeobachtenStore, config: &ProxyConfig) -> String {
if !config.general.beobachten {
return "beobachten disabled\n".to_string();
}
let ttl = Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60));
beobachten.snapshot_text(ttl)
}
fn render_metrics(stats: &Stats) -> String { fn render_metrics(stats: &Stats) -> String {
use std::fmt::Write; use std::fmt::Write;
let mut out = String::with_capacity(4096); let mut out = String::with_capacity(4096);
@@ -199,6 +236,95 @@ fn render_metrics(stats: &Stats) -> String {
stats.get_pool_stale_pick_total() stats.get_pool_stale_pick_total()
); );
let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals");
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter");
let _ = writeln!(
out,
"telemt_me_writer_removed_total {}",
stats.get_me_writer_removed_total()
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_removed_unexpected_total Unexpected ME writer removals that triggered refill"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_unexpected_total counter");
let _ = writeln!(
out,
"telemt_me_writer_removed_unexpected_total {}",
stats.get_me_writer_removed_unexpected_total()
);
let _ = writeln!(out, "# HELP telemt_me_refill_triggered_total Immediate ME refill runs started");
let _ = writeln!(out, "# TYPE telemt_me_refill_triggered_total counter");
let _ = writeln!(
out,
"telemt_me_refill_triggered_total {}",
stats.get_me_refill_triggered_total()
);
let _ = writeln!(
out,
"# HELP telemt_me_refill_skipped_inflight_total Immediate ME refill skips due to inflight dedup"
);
let _ = writeln!(out, "# TYPE telemt_me_refill_skipped_inflight_total counter");
let _ = writeln!(
out,
"telemt_me_refill_skipped_inflight_total {}",
stats.get_me_refill_skipped_inflight_total()
);
let _ = writeln!(out, "# HELP telemt_me_refill_failed_total Immediate ME refill failures");
let _ = writeln!(out, "# TYPE telemt_me_refill_failed_total counter");
let _ = writeln!(
out,
"telemt_me_refill_failed_total {}",
stats.get_me_refill_failed_total()
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_restored_same_endpoint_total Refilled ME writer restored on the same endpoint"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_restored_same_endpoint_total counter");
let _ = writeln!(
out,
"telemt_me_writer_restored_same_endpoint_total {}",
stats.get_me_writer_restored_same_endpoint_total()
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_restored_fallback_total Refilled ME writer restored via fallback endpoint"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_restored_fallback_total counter");
let _ = writeln!(
out,
"telemt_me_writer_restored_fallback_total {}",
stats.get_me_writer_restored_fallback_total()
);
let unresolved_writer_losses = stats
.get_me_writer_removed_unexpected_total()
.saturating_sub(
stats
.get_me_writer_restored_same_endpoint_total()
.saturating_add(stats.get_me_writer_restored_fallback_total()),
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_removed_unexpected_minus_restored_total Unexpected writer removals not yet compensated by restore"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
);
let _ = writeln!(
out,
"telemt_me_writer_removed_unexpected_minus_restored_total {}",
unresolved_writer_losses
);
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections"); let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter"); let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections"); let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");
@@ -229,6 +355,7 @@ fn render_metrics(stats: &Stats) -> String {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use std::net::IpAddr;
use http_body_util::BodyExt; use http_body_util::BodyExt;
#[test] #[test]
@@ -277,11 +404,17 @@ mod tests {
assert!(output.contains("# TYPE telemt_connections_total counter")); assert!(output.contains("# TYPE telemt_connections_total counter"));
assert!(output.contains("# TYPE telemt_connections_bad_total counter")); assert!(output.contains("# TYPE telemt_connections_bad_total counter"));
assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter")); assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
assert!(output.contains(
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
));
} }
#[tokio::test] #[tokio::test]
async fn test_endpoint_integration() { async fn test_endpoint_integration() {
let stats = Arc::new(Stats::new()); let stats = Arc::new(Stats::new());
let beobachten = Arc::new(BeobachtenStore::new());
let mut config = ProxyConfig::default();
stats.increment_connects_all(); stats.increment_connects_all();
stats.increment_connects_all(); stats.increment_connects_all();
stats.increment_connects_all(); stats.increment_connects_all();
@@ -290,16 +423,34 @@ mod tests {
.uri("/metrics") .uri("/metrics")
.body(()) .body(())
.unwrap(); .unwrap();
let resp = handle(req, &stats).unwrap(); let resp = handle(req, &stats, &beobachten, &config).unwrap();
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes(); let body = resp.into_body().collect().await.unwrap().to_bytes();
assert!(std::str::from_utf8(body.as_ref()).unwrap().contains("telemt_connections_total 3")); assert!(std::str::from_utf8(body.as_ref()).unwrap().contains("telemt_connections_total 3"));
config.general.beobachten = true;
config.general.beobachten_minutes = 10;
beobachten.record(
"TLS-scanner",
"203.0.113.10".parse::<IpAddr>().unwrap(),
Duration::from_secs(600),
);
let req_beob = Request::builder()
.uri("/beobachten")
.body(())
.unwrap();
let resp_beob = handle(req_beob, &stats, &beobachten, &config).unwrap();
assert_eq!(resp_beob.status(), StatusCode::OK);
let body_beob = resp_beob.into_body().collect().await.unwrap().to_bytes();
let beob_text = std::str::from_utf8(body_beob.as_ref()).unwrap();
assert!(beob_text.contains("[TLS-scanner]"));
assert!(beob_text.contains("203.0.113.10-1"));
let req404 = Request::builder() let req404 = Request::builder()
.uri("/other") .uri("/other")
.body(()) .body(())
.unwrap(); .unwrap();
let resp404 = handle(req404, &stats).unwrap(); let resp404 = handle(req404, &stats, &beobachten, &config).unwrap();
assert_eq!(resp404.status(), StatusCode::NOT_FOUND); assert_eq!(resp404.status(), StatusCode::NOT_FOUND);
} }
} }

View File

@@ -1,7 +1,7 @@
//! Client Handler //! Client Handler
use std::future::Future; use std::future::Future;
use std::net::SocketAddr; use std::net::{IpAddr, SocketAddr};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -27,6 +27,7 @@ use crate::error::{HandshakeResult, ProxyError, Result};
use crate::ip_tracker::UserIpTracker; use crate::ip_tracker::UserIpTracker;
use crate::protocol::constants::*; use crate::protocol::constants::*;
use crate::protocol::tls; use crate::protocol::tls;
use crate::stats::beobachten::BeobachtenStore;
use crate::stats::{ReplayChecker, Stats}; use crate::stats::{ReplayChecker, Stats};
use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
use crate::transport::middle_proxy::MePool; use crate::transport::middle_proxy::MePool;
@@ -39,6 +40,36 @@ use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle
use crate::proxy::masking::handle_bad_client; use crate::proxy::masking::handle_bad_client;
use crate::proxy::middle_relay::handle_via_middle_proxy; use crate::proxy::middle_relay::handle_via_middle_proxy;
fn beobachten_ttl(config: &ProxyConfig) -> Duration {
Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60))
}
fn record_beobachten_class(
beobachten: &BeobachtenStore,
config: &ProxyConfig,
peer_ip: IpAddr,
class: &str,
) {
if !config.general.beobachten {
return;
}
beobachten.record(class, peer_ip, beobachten_ttl(config));
}
fn record_handshake_failure_class(
beobachten: &BeobachtenStore,
config: &ProxyConfig,
peer_ip: IpAddr,
error: &ProxyError,
) {
let class = if error.to_string().contains("expected 64 bytes, got 0") {
"expected_64_got_0"
} else {
"other"
};
record_beobachten_class(beobachten, config, peer_ip, class);
}
pub async fn handle_client_stream<S>( pub async fn handle_client_stream<S>(
mut stream: S, mut stream: S,
peer: SocketAddr, peer: SocketAddr,
@@ -51,6 +82,7 @@ pub async fn handle_client_stream<S>(
me_pool: Option<Arc<MePool>>, me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>, tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>, ip_tracker: Arc<UserIpTracker>,
beobachten: Arc<BeobachtenStore>,
proxy_protocol_enabled: bool, proxy_protocol_enabled: bool,
) -> Result<()> ) -> Result<()>
where where
@@ -73,6 +105,7 @@ where
Err(e) => { Err(e) => {
stats.increment_connects_bad(); stats.increment_connects_bad();
warn!(peer = %peer, error = %e, "Invalid PROXY protocol header"); warn!(peer = %peer, error = %e, "Invalid PROXY protocol header");
record_beobachten_class(&beobachten, &config, peer.ip(), "other");
return Err(e); return Err(e);
} }
} }
@@ -82,6 +115,9 @@ where
let handshake_timeout = Duration::from_secs(config.timeouts.client_handshake); let handshake_timeout = Duration::from_secs(config.timeouts.client_handshake);
let stats_for_timeout = stats.clone(); let stats_for_timeout = stats.clone();
let config_for_timeout = config.clone();
let beobachten_for_timeout = beobachten.clone();
let peer_for_timeout = real_peer.ip();
// For non-TCP streams, use a synthetic local address // For non-TCP streams, use a synthetic local address
let local_addr: SocketAddr = format!("0.0.0.0:{}", config.server.port) let local_addr: SocketAddr = format!("0.0.0.0:{}", config.server.port)
@@ -103,7 +139,15 @@ where
debug!(peer = %real_peer, tls_len = tls_len, "TLS handshake too short"); debug!(peer = %real_peer, tls_len = tls_len, "TLS handshake too short");
stats.increment_connects_bad(); stats.increment_connects_bad();
let (reader, writer) = tokio::io::split(stream); let (reader, writer) = tokio::io::split(stream);
handle_bad_client(reader, writer, &first_bytes, &config).await; handle_bad_client(
reader,
writer,
&first_bytes,
real_peer.ip(),
&config,
&beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled); return Ok(HandshakeOutcome::Handled);
} }
@@ -120,7 +164,15 @@ where
HandshakeResult::Success(result) => result, HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader, writer } => { HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad(); stats.increment_connects_bad();
handle_bad_client(reader, writer, &handshake, &config).await; handle_bad_client(
reader,
writer,
&handshake,
real_peer.ip(),
&config,
&beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled); return Ok(HandshakeOutcome::Handled);
} }
HandshakeResult::Error(e) => return Err(e), HandshakeResult::Error(e) => return Err(e),
@@ -156,7 +208,15 @@ where
debug!(peer = %real_peer, "Non-TLS modes disabled"); debug!(peer = %real_peer, "Non-TLS modes disabled");
stats.increment_connects_bad(); stats.increment_connects_bad();
let (reader, writer) = tokio::io::split(stream); let (reader, writer) = tokio::io::split(stream);
handle_bad_client(reader, writer, &first_bytes, &config).await; handle_bad_client(
reader,
writer,
&first_bytes,
real_peer.ip(),
&config,
&beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled); return Ok(HandshakeOutcome::Handled);
} }
@@ -173,7 +233,15 @@ where
HandshakeResult::Success(result) => result, HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader, writer } => { HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad(); stats.increment_connects_bad();
handle_bad_client(reader, writer, &handshake, &config).await; handle_bad_client(
reader,
writer,
&handshake,
real_peer.ip(),
&config,
&beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled); return Ok(HandshakeOutcome::Handled);
} }
HandshakeResult::Error(e) => return Err(e), HandshakeResult::Error(e) => return Err(e),
@@ -200,11 +268,23 @@ where
Ok(Ok(outcome)) => outcome, Ok(Ok(outcome)) => outcome,
Ok(Err(e)) => { Ok(Err(e)) => {
debug!(peer = %peer, error = %e, "Handshake failed"); debug!(peer = %peer, error = %e, "Handshake failed");
record_handshake_failure_class(
&beobachten_for_timeout,
&config_for_timeout,
peer_for_timeout,
&e,
);
return Err(e); return Err(e);
} }
Err(_) => { Err(_) => {
stats_for_timeout.increment_handshake_timeouts(); stats_for_timeout.increment_handshake_timeouts();
debug!(peer = %peer, "Handshake timeout"); debug!(peer = %peer, "Handshake timeout");
record_beobachten_class(
&beobachten_for_timeout,
&config_for_timeout,
peer_for_timeout,
"other",
);
return Err(ProxyError::TgHandshakeTimeout); return Err(ProxyError::TgHandshakeTimeout);
} }
}; };
@@ -230,6 +310,7 @@ pub struct RunningClientHandler {
me_pool: Option<Arc<MePool>>, me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>, tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>, ip_tracker: Arc<UserIpTracker>,
beobachten: Arc<BeobachtenStore>,
proxy_protocol_enabled: bool, proxy_protocol_enabled: bool,
} }
@@ -246,6 +327,7 @@ impl ClientHandler {
me_pool: Option<Arc<MePool>>, me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>, tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>, ip_tracker: Arc<UserIpTracker>,
beobachten: Arc<BeobachtenStore>,
proxy_protocol_enabled: bool, proxy_protocol_enabled: bool,
) -> RunningClientHandler { ) -> RunningClientHandler {
RunningClientHandler { RunningClientHandler {
@@ -260,6 +342,7 @@ impl ClientHandler {
me_pool, me_pool,
tls_cache, tls_cache,
ip_tracker, ip_tracker,
beobachten,
proxy_protocol_enabled, proxy_protocol_enabled,
} }
} }
@@ -284,17 +367,32 @@ impl RunningClientHandler {
let handshake_timeout = Duration::from_secs(self.config.timeouts.client_handshake); let handshake_timeout = Duration::from_secs(self.config.timeouts.client_handshake);
let stats = self.stats.clone(); let stats = self.stats.clone();
let config_for_timeout = self.config.clone();
let beobachten_for_timeout = self.beobachten.clone();
let peer_for_timeout = peer.ip();
// Phase 1: handshake (with timeout) // Phase 1: handshake (with timeout)
let outcome = match timeout(handshake_timeout, self.do_handshake()).await { let outcome = match timeout(handshake_timeout, self.do_handshake()).await {
Ok(Ok(outcome)) => outcome, Ok(Ok(outcome)) => outcome,
Ok(Err(e)) => { Ok(Err(e)) => {
debug!(peer = %peer, error = %e, "Handshake failed"); debug!(peer = %peer, error = %e, "Handshake failed");
record_handshake_failure_class(
&beobachten_for_timeout,
&config_for_timeout,
peer_for_timeout,
&e,
);
return Err(e); return Err(e);
} }
Err(_) => { Err(_) => {
stats.increment_handshake_timeouts(); stats.increment_handshake_timeouts();
debug!(peer = %peer, "Handshake timeout"); debug!(peer = %peer, "Handshake timeout");
record_beobachten_class(
&beobachten_for_timeout,
&config_for_timeout,
peer_for_timeout,
"other",
);
return Err(ProxyError::TgHandshakeTimeout); return Err(ProxyError::TgHandshakeTimeout);
} }
}; };
@@ -321,6 +419,12 @@ impl RunningClientHandler {
Err(e) => { Err(e) => {
self.stats.increment_connects_bad(); self.stats.increment_connects_bad();
warn!(peer = %self.peer, error = %e, "Invalid PROXY protocol header"); warn!(peer = %self.peer, error = %e, "Invalid PROXY protocol header");
record_beobachten_class(
&self.beobachten,
&self.config,
self.peer.ip(),
"other",
);
return Err(e); return Err(e);
} }
} }
@@ -354,7 +458,15 @@ impl RunningClientHandler {
debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short"); debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short");
self.stats.increment_connects_bad(); self.stats.increment_connects_bad();
let (reader, writer) = self.stream.into_split(); let (reader, writer) = self.stream.into_split();
handle_bad_client(reader, writer, &first_bytes, &self.config).await; handle_bad_client(
reader,
writer,
&first_bytes,
peer.ip(),
&self.config,
&self.beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled); return Ok(HandshakeOutcome::Handled);
} }
@@ -385,7 +497,15 @@ impl RunningClientHandler {
HandshakeResult::Success(result) => result, HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader, writer } => { HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad(); stats.increment_connects_bad();
handle_bad_client(reader, writer, &handshake, &config).await; handle_bad_client(
reader,
writer,
&handshake,
peer.ip(),
&config,
&self.beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled); return Ok(HandshakeOutcome::Handled);
} }
HandshakeResult::Error(e) => return Err(e), HandshakeResult::Error(e) => return Err(e),
@@ -446,7 +566,15 @@ impl RunningClientHandler {
debug!(peer = %peer, "Non-TLS modes disabled"); debug!(peer = %peer, "Non-TLS modes disabled");
self.stats.increment_connects_bad(); self.stats.increment_connects_bad();
let (reader, writer) = self.stream.into_split(); let (reader, writer) = self.stream.into_split();
handle_bad_client(reader, writer, &first_bytes, &self.config).await; handle_bad_client(
reader,
writer,
&first_bytes,
peer.ip(),
&self.config,
&self.beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled); return Ok(HandshakeOutcome::Handled);
} }
@@ -476,7 +604,15 @@ impl RunningClientHandler {
HandshakeResult::Success(result) => result, HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader, writer } => { HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad(); stats.increment_connects_bad();
handle_bad_client(reader, writer, &handshake, &config).await; handle_bad_client(
reader,
writer,
&handshake,
peer.ip(),
&config,
&self.beobachten,
)
.await;
return Ok(HandshakeOutcome::Handled); return Ok(HandshakeOutcome::Handled);
} }
HandshakeResult::Error(e) => return Err(e), HandshakeResult::Error(e) => return Err(e),

View File

@@ -1,6 +1,7 @@
//! Masking - forward unrecognized traffic to mask host //! Masking - forward unrecognized traffic to mask host
use std::str; use std::str;
use std::net::IpAddr;
use std::time::Duration; use std::time::Duration;
use tokio::net::TcpStream; use tokio::net::TcpStream;
#[cfg(unix)] #[cfg(unix)]
@@ -9,6 +10,7 @@ use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt};
use tokio::time::timeout; use tokio::time::timeout;
use tracing::debug; use tracing::debug;
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
use crate::stats::beobachten::BeobachtenStore;
const MASK_TIMEOUT: Duration = Duration::from_secs(5); const MASK_TIMEOUT: Duration = Duration::from_secs(5);
/// Maximum duration for the entire masking relay. /// Maximum duration for the entire masking relay.
@@ -50,20 +52,26 @@ pub async fn handle_bad_client<R, W>(
reader: R, reader: R,
writer: W, writer: W,
initial_data: &[u8], initial_data: &[u8],
peer_ip: IpAddr,
config: &ProxyConfig, config: &ProxyConfig,
beobachten: &BeobachtenStore,
) )
where where
R: AsyncRead + Unpin + Send + 'static, R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static,
{ {
let client_type = detect_client_type(initial_data);
if config.general.beobachten {
let ttl = Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60));
beobachten.record(client_type, peer_ip, ttl);
}
if !config.censorship.mask { if !config.censorship.mask {
// Masking disabled, just consume data // Masking disabled, just consume data
consume_client_data(reader).await; consume_client_data(reader).await;
return; return;
} }
let client_type = detect_client_type(initial_data);
// Connect via Unix socket or TCP // Connect via Unix socket or TCP
#[cfg(unix)] #[cfg(unix)]
if let Some(ref sock_path) = config.censorship.mask_unix_sock { if let Some(ref sock_path) = config.censorship.mask_unix_sock {

117
src/stats/beobachten.rs Normal file
View File

@@ -0,0 +1,117 @@
//! Per-IP forensic buckets for scanner and handshake failure observation.
use std::collections::{BTreeMap, HashMap};
use std::net::IpAddr;
use std::time::{Duration, Instant};
use parking_lot::Mutex;
const CLEANUP_INTERVAL: Duration = Duration::from_secs(30);
#[derive(Default)]
struct BeobachtenInner {
entries: HashMap<(String, IpAddr), BeobachtenEntry>,
last_cleanup: Option<Instant>,
}
#[derive(Clone, Copy)]
struct BeobachtenEntry {
tries: u64,
last_seen: Instant,
}
/// In-memory, TTL-scoped per-IP counters keyed by source class.
pub struct BeobachtenStore {
inner: Mutex<BeobachtenInner>,
}
impl Default for BeobachtenStore {
fn default() -> Self {
Self::new()
}
}
impl BeobachtenStore {
pub fn new() -> Self {
Self {
inner: Mutex::new(BeobachtenInner::default()),
}
}
pub fn record(&self, class: &str, ip: IpAddr, ttl: Duration) {
if class.is_empty() || ttl.is_zero() {
return;
}
let now = Instant::now();
let mut guard = self.inner.lock();
Self::cleanup_if_needed(&mut guard, now, ttl);
let key = (class.to_string(), ip);
let entry = guard.entries.entry(key).or_insert(BeobachtenEntry {
tries: 0,
last_seen: now,
});
entry.tries = entry.tries.saturating_add(1);
entry.last_seen = now;
}
pub fn snapshot_text(&self, ttl: Duration) -> String {
if ttl.is_zero() {
return "beobachten disabled\n".to_string();
}
let now = Instant::now();
let mut guard = self.inner.lock();
Self::cleanup(&mut guard, now, ttl);
guard.last_cleanup = Some(now);
let mut grouped = BTreeMap::<String, Vec<(IpAddr, u64)>>::new();
for ((class, ip), entry) in &guard.entries {
grouped
.entry(class.clone())
.or_default()
.push((*ip, entry.tries));
}
if grouped.is_empty() {
return "empty\n".to_string();
}
let mut out = String::with_capacity(grouped.len() * 64);
for (class, entries) in &mut grouped {
out.push('[');
out.push_str(class);
out.push_str("]\n");
entries.sort_by(|(ip_a, tries_a), (ip_b, tries_b)| {
tries_b
.cmp(tries_a)
.then_with(|| ip_a.to_string().cmp(&ip_b.to_string()))
});
for (ip, tries) in entries {
out.push_str(&format!("{ip}-{tries}\n"));
}
}
out
}
fn cleanup_if_needed(inner: &mut BeobachtenInner, now: Instant, ttl: Duration) {
let should_cleanup = match inner.last_cleanup {
Some(last) => now.saturating_duration_since(last) >= CLEANUP_INTERVAL,
None => true,
};
if should_cleanup {
Self::cleanup(inner, now, ttl);
inner.last_cleanup = Some(now);
}
}
fn cleanup(inner: &mut BeobachtenInner, now: Instant, ttl: Duration) {
inner.entries.retain(|_, entry| {
now.saturating_duration_since(entry.last_seen) <= ttl
});
}
}

View File

@@ -2,6 +2,8 @@
#![allow(dead_code)] #![allow(dead_code)]
pub mod beobachten;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
use dashmap::DashMap; use dashmap::DashMap;
@@ -43,6 +45,13 @@ pub struct Stats {
pool_drain_active: AtomicU64, pool_drain_active: AtomicU64,
pool_force_close_total: AtomicU64, pool_force_close_total: AtomicU64,
pool_stale_pick_total: AtomicU64, pool_stale_pick_total: AtomicU64,
me_writer_removed_total: AtomicU64,
me_writer_removed_unexpected_total: AtomicU64,
me_refill_triggered_total: AtomicU64,
me_refill_skipped_inflight_total: AtomicU64,
me_refill_failed_total: AtomicU64,
me_writer_restored_same_endpoint_total: AtomicU64,
me_writer_restored_fallback_total: AtomicU64,
user_stats: DashMap<String, UserStats>, user_stats: DashMap<String, UserStats>,
start_time: parking_lot::RwLock<Option<Instant>>, start_time: parking_lot::RwLock<Option<Instant>>,
} }
@@ -142,6 +151,27 @@ impl Stats {
pub fn increment_pool_stale_pick_total(&self) { pub fn increment_pool_stale_pick_total(&self) {
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed); self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
} }
pub fn increment_me_writer_removed_total(&self) {
self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_writer_removed_unexpected_total(&self) {
self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_refill_triggered_total(&self) {
self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_refill_skipped_inflight_total(&self) {
self.me_refill_skipped_inflight_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_refill_failed_total(&self) {
self.me_refill_failed_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_writer_restored_same_endpoint_total(&self) {
self.me_writer_restored_same_endpoint_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_writer_restored_fallback_total(&self) {
self.me_writer_restored_fallback_total.fetch_add(1, Ordering::Relaxed);
}
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
@@ -195,6 +225,27 @@ impl Stats {
pub fn get_pool_stale_pick_total(&self) -> u64 { pub fn get_pool_stale_pick_total(&self) -> u64 {
self.pool_stale_pick_total.load(Ordering::Relaxed) self.pool_stale_pick_total.load(Ordering::Relaxed)
} }
pub fn get_me_writer_removed_total(&self) -> u64 {
self.me_writer_removed_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_removed_unexpected_total(&self) -> u64 {
self.me_writer_removed_unexpected_total.load(Ordering::Relaxed)
}
pub fn get_me_refill_triggered_total(&self) -> u64 {
self.me_refill_triggered_total.load(Ordering::Relaxed)
}
pub fn get_me_refill_skipped_inflight_total(&self) -> u64 {
self.me_refill_skipped_inflight_total.load(Ordering::Relaxed)
}
pub fn get_me_refill_failed_total(&self) -> u64 {
self.me_refill_failed_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_restored_same_endpoint_total(&self) -> u64 {
self.me_writer_restored_same_endpoint_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_restored_fallback_total(&self) -> u64 {
self.me_writer_restored_fallback_total.load(Ordering::Relaxed)
}
pub fn increment_user_connects(&self, user: &str) { pub fn increment_user_connects(&self, user: &str) {
self.user_stats.entry(user.to_string()).or_default() self.user_stats.entry(user.to_string()).or_default()

View File

@@ -708,6 +708,7 @@ impl MePool {
match self.connect_one(addr, self.rng.as_ref()).await { match self.connect_one(addr, self.rng.as_ref()).await {
Ok(()) => { Ok(()) => {
self.stats.increment_me_reconnect_success(); self.stats.increment_me_reconnect_success();
self.stats.increment_me_writer_restored_same_endpoint_total();
info!( info!(
%addr, %addr,
attempt = attempt + 1, attempt = attempt + 1,
@@ -728,6 +729,7 @@ impl MePool {
let dc_endpoints = self.endpoints_for_same_dc(addr).await; let dc_endpoints = self.endpoints_for_same_dc(addr).await;
if dc_endpoints.is_empty() { if dc_endpoints.is_empty() {
self.stats.increment_me_refill_failed_total();
return false; return false;
} }
@@ -738,6 +740,7 @@ impl MePool {
.await .await
{ {
self.stats.increment_me_reconnect_success(); self.stats.increment_me_reconnect_success();
self.stats.increment_me_writer_restored_fallback_total();
info!( info!(
%addr, %addr,
attempt = attempt + 1, attempt = attempt + 1,
@@ -747,6 +750,7 @@ impl MePool {
} }
} }
self.stats.increment_me_refill_failed_total();
false false
} }
@@ -756,9 +760,11 @@ impl MePool {
{ {
let mut guard = pool.refill_inflight.lock().await; let mut guard = pool.refill_inflight.lock().await;
if !guard.insert(addr) { if !guard.insert(addr) {
pool.stats.increment_me_refill_skipped_inflight_total();
return; return;
} }
} }
pool.stats.increment_me_refill_triggered_total();
let restored = pool.refill_writer_after_loss(addr).await; let restored = pool.refill_writer_after_loss(addr).await;
if !restored { if !restored {
@@ -1189,9 +1195,13 @@ impl MePool {
if was_draining { if was_draining {
self.stats.decrement_pool_drain_active(); self.stats.decrement_pool_drain_active();
} }
self.stats.increment_me_writer_removed_total();
w.cancel.cancel(); w.cancel.cancel();
removed_addr = Some(w.addr); removed_addr = Some(w.addr);
trigger_refill = !was_draining; trigger_refill = !was_draining;
if trigger_refill {
self.stats.increment_me_writer_removed_unexpected_total();
}
close_tx = Some(w.tx.clone()); close_tx = Some(w.tx.clone());
self.conn_count.fetch_sub(1, Ordering::Relaxed); self.conn_count.fetch_sub(1, Ordering::Relaxed);
} }