Merge branch 'fix/stun-ipv6-enetunreach'

This commit is contained in:
ivulit 2026-02-19 16:34:43 +03:00
commit f17ddedea0
No known key found for this signature in database
24 changed files with 828 additions and 313 deletions

View File

@ -25,10 +25,16 @@ jobs:
include: include:
- target: x86_64-unknown-linux-gnu - target: x86_64-unknown-linux-gnu
artifact_name: telemt artifact_name: telemt
asset_name: telemt-x86_64-linux asset_name: telemt-x86_64-linux-gnu
- target: aarch64-unknown-linux-gnu - target: aarch64-unknown-linux-gnu
artifact_name: telemt artifact_name: telemt
asset_name: telemt-aarch64-linux asset_name: telemt-aarch64-linux-gnu
- target: x86_64-unknown-linux-musl
artifact_name: telemt
asset_name: telemt-x86_64-linux-musl
- target: aarch64-unknown-linux-musl
artifact_name: telemt
asset_name: telemt-aarch64-linux-musl
steps: steps:
- name: Checkout repository - name: Checkout repository
@ -56,12 +62,13 @@ jobs:
restore-keys: | restore-keys: |
${{ runner.os }}-${{ matrix.target }}-cargo- ${{ runner.os }}-${{ matrix.target }}-cargo-
- name: Install cross
run: cargo install cross --git https://github.com/cross-rs/cross
- name: Build Release - name: Build Release
uses: actions-rs/cargo@ae10961054e4aa8bff448f48a500763b90d5c550 # v1.0.1 env:
with: RUSTFLAGS: ${{ contains(matrix.target, 'musl') && '-C target-feature=+crt-static' || '' }}
use-cross: true run: cross build --release --target ${{ matrix.target }}
command: build
args: --release --target ${{ matrix.target }}
- name: Package binary - name: Package binary
run: | run: |
@ -85,13 +92,42 @@ jobs:
contents: write contents: write
steps: steps:
- name: Checkout repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
token: ${{ secrets.GITHUB_TOKEN }}
- name: Download all artifacts - name: Download all artifacts
uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8 uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with: with:
path: artifacts path: artifacts
- name: Update version in Cargo.toml and Cargo.lock
run: |
# Extract version from tag (remove 'v' prefix if present)
VERSION="${GITHUB_REF#refs/tags/}"
VERSION="${VERSION#v}"
# Install cargo-edit for version bumping
cargo install cargo-edit
# Update Cargo.toml version
cargo set-version "$VERSION"
# Configure git
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
# Commit and push changes
git add Cargo.toml Cargo.lock
git commit -m "chore: bump version to $VERSION" || echo "No changes to commit"
git push origin HEAD:main
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Create Release - name: Create Release
uses: softprops/action-gh-release@c95fe1489396fe360a41fb53f90de6ddce8c4c8a # v2.2.1 uses: softprops/action-gh-release@v2
with: with:
files: artifacts/**/* files: artifacts/**/*
generate_release_notes: true generate_release_notes: true

5
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,5 @@
## Pull Requests - Rules
- ONLY signed and verified commits
- ONLY from your name
- DO NOT commit with `codex` or `claude` as author/commiter
- PREFER `flow` branch for development, not `main`

View File

@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.0.4" version = "3.0.5"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

17
LICENSING.md Normal file
View File

@ -0,0 +1,17 @@
# LICENSING
## Licenses for Versions
| Version | License |
|---------|---------------|
| 1.0 | NO LICNESE |
| 1.1 | NO LICENSE |
| 1.2 | NO LICENSE |
| 2.0 | NO LICENSE |
| 3.0 | TELEMT UL 1 |
### License Types
- **NO LICENSE** = ***ALL RIGHT RESERVED***
- **TELEMT UL1** - work in progress license for source code of `telemt`, which encourages:
- fair use,
- contributions,
- distribution,
- but prohibits NOT mentioning the authors

View File

@ -10,74 +10,40 @@
### 🇷🇺 RU ### 🇷🇺 RU
15 февраля мы опубликовали `telemt 3` с поддержкой Middle-End Proxy, а значит: 18 февраля мы опубликовали `telemt 3.0.3`, он имеет:
- с функциональными медиа, в том числе с CDN/DC=203 - улучшенный механизм Middle-End Health Check
- с Ad-tag — показывайте спонсорский канал и собирайте статистику через официального бота - высокоскоростное восстановление инициализации Middle-End
- с новым подходом к безопасности и асинхронности - меньше задержек на hot-path
- с высокоточной диагностикой криптографии через `ME_DIAG` - более корректную работу в Dualstack, а именно - IPv6 Middle-End
- аккуратное переподключение клиента без дрифта сессий между Middle-End
- автоматическая деградация на Direct-DC при массовой (>2 ME-DC-групп) недоступности Middle-End
- автодетект IP за NAT, при возможности - будет выполнен хендшейк с ME, при неудаче - автодеградация
- единственный известный специальный DC=203 уже добавлен в код: медиа загружаются с CDN в Direct-DC режиме
Для использования нужно: [Здесь вы можете найти релиз](https://github.com/telemt/telemt/releases/tag/3.0.3)
1. Версия `telemt` ≥3.0.0 Если у вас есть компетенции в асинхронных сетевых приложениях, анализе трафика, реверс-инжиниринге или сетевых расследованиях - мы открыты к идеям и pull requests!
2. Выполнение любого из наборов условий:
- публичный IP для исходящих соединений установлен на интерфейса инстанса с `telemt`
- ЛИБО
- вы используете NAT 1:1 + включили STUN-пробинг
3. В конфиге, в секции `[general]` указать:
```toml
use_middle_proxy = true
```
Если условия из пункта 1 не выполняются:
1. Выключите ME-режим:
- установите `use_middle_proxy = false`
- ЛИБО
- Middle-End Proxy будет выключен автоматически по таймауту, но это займёт больше времени при запуске
2. В конфиге, добавьте в конец:
```toml
[dc_overrides]
"203" = "91.105.192.100:443"
```
Если у вас есть компетенции в асинхронных сетевых приложениях, анализе трафика, реверс-инжиниринге или сетевых расследованиях — мы открыты к идеям и pull requests.
</td> </td>
<td width="50%" valign="top"> <td width="50%" valign="top">
### 🇬🇧 EN ### 🇬🇧 EN
On February 15, we released `telemt 3` with support for Middle-End Proxy, which means: On February 18, we released `telemt 3.0.3`. This version introduces:
- functional media, including CDN/DC=203 - improved Middle-End Health Check method
- Ad-tag support promote a sponsored channel and collect statistics via Telegram bot - high-speed recovery of Middle-End init
- new approach to security and asynchronicity - reduced latency on the hot path
- high-precision cryptography diagnostics via `ME_DIAG` - correct Dualstack support: proper handling of IPv6 Middle-End
- *clean* client reconnection without session "drift" between Middle-End
- automatic degradation to Direct-DC mode in case of large-scale (>2 ME-DC groups) Middle-End unavailability
- automatic public IP detection behind NAT; first - Middle-End handshake is performed, otherwise automatic degradation is applied
- known special DC=203 is now handled natively: media is delivered from the CDN via Direct-DC mode
To use this feature, the following requirements must be met: [Release is available here](https://github.com/telemt/telemt/releases/tag/3.0.3)
1. `telemt` version ≥ 3.0.0
2. One of the following conditions satisfied:
- the instance running `telemt` has a public IP address assigned to its network interface for outbound connections
- OR
- you are using 1:1 NAT and have STUN probing enabled
3. In the config file, under the `[general]` section, specify:
```toml
use_middle_proxy = true
````
If the conditions from step 1 are not satisfied: If you have expertise in asynchronous network applications, traffic analysis, reverse engineering, or network forensics - we welcome ideas and pull requests!
1. Disable Middle-End mode:
- set `use_middle_proxy = false`
- OR
- Middle-End Proxy will be disabled automatically after a timeout, but this will increase startup time
2. In the config file, add the following at the end:
```toml
[dc_overrides]
"203" = "91.105.192.100:443"
```
If you have expertise in asynchronous network applications, traffic analysis, reverse engineering, or network forensics — we welcome ideas, suggestions, and pull requests.
</td> </td>
</tr> </tr>
@ -86,7 +52,9 @@ If you have expertise in asynchronous network applications, traffic analysis, re
# Features # Features
💥 The configuration structure has changed since version 1.1.0.0. change it in your environment! 💥 The configuration structure has changed since version 1.1.0.0. change it in your environment!
⚓ Our implementation of **TLS-fronting** is one of the most deeply debugged, focused, advanced and *almost* **"behaviorally consistent to real"**: we are confident we have it right - [see evidence on our validation and traces](#recognizability-for-dpi-and-crawler) ⚓ Our implementation of **TLS-fronting** is one of the most deeply debugged, focused, advanced and *almost* **"behaviorally consistent to real"**: we are confident we have it right - [see evidence on our validation and traces](#recognizability-for-dpi-and-crawler)
⚓ Our ***Middle-End Pool*** is fastest by design in standard scenarios, compared to other implementations of connecting to the Middle-End Proxy: non dramatically, but usual
# GOTO # GOTO
- [Features](#features) - [Features](#features)
@ -215,6 +183,7 @@ prefer_ipv6 = false
fast_mode = true fast_mode = true
use_middle_proxy = false use_middle_proxy = false
# ad_tag = "..." # ad_tag = "..."
# disable_colors = false # Disable colored output in logs (useful for files/systemd)
[network] [network]
ipv4 = true ipv4 = true
@ -247,7 +216,9 @@ ip = "::"
# Users to show in the startup log (tg:// links) # Users to show in the startup log (tg:// links)
[general.links] [general.links]
show = ["hello"] # Users to show in the startup log (tg:// links) show = ["hello"] # Only show links for user "hello"
# show = ["alice", "bob"] # Only show links for alice and bob
# show = "*" # Show links for all users
# public_host = "proxy.example.com" # Host (IP or domain) for tg:// links # public_host = "proxy.example.com" # Host (IP or domain) for tg:// links
# public_port = 443 # Port for tg:// links (default: server.port) # public_port = 443 # Port for tg:// links (default: server.port)

View File

@ -5,6 +5,38 @@ prefer_ipv6 = false
fast_mode = true fast_mode = true
use_middle_proxy = true use_middle_proxy = true
#ad_tag = "00000000000000000000000000000000" #ad_tag = "00000000000000000000000000000000"
# Path to proxy-secret binary (auto-downloaded if missing).
proxy_secret_path = "proxy-secret"
# === Middle Proxy (ME) ===
# Public IP override for ME KDF when behind NAT; leave unset to auto-detect.
#middle_proxy_nat_ip = "203.0.113.10"
# Enable STUN probing to discover public IP:port for ME.
middle_proxy_nat_probe = true
# Primary STUN server (host:port); defaults to Telegram STUN when empty.
middle_proxy_nat_stun = "stun.l.google.com:19302"
# Optional fallback STUN servers list.
middle_proxy_nat_stun_servers = ["stun1.l.google.com:19302", "stun2.l.google.com:19302"]
# Desired number of concurrent ME writers in pool.
middle_proxy_pool_size = 16
# Pre-initialized warm-standby ME connections kept idle.
middle_proxy_warm_standby = 8
# Ignore STUN/interface mismatch and keep ME enabled even if IP differs.
stun_iface_mismatch_ignore = false
# Keepalive padding frames - fl==4
me_keepalive_enabled = true
me_keepalive_interval_secs = 25 # Period between keepalives
me_keepalive_jitter_secs = 5 # Jitter added to interval
me_keepalive_payload_random = true # Randomize 4-byte payload (vs zeros)
# Stagger extra ME connections on warmup to de-phase lifecycles.
me_warmup_stagger_enabled = true
me_warmup_step_delay_ms = 500 # Base delay between extra connects
me_warmup_step_jitter_ms = 300 # Jitter for warmup delay
# Reconnect policy knobs.
me_reconnect_max_concurrent_per_dc = 1 # Parallel reconnects per DC - EXPERIMENTAL! UNSTABLE!
me_reconnect_backoff_base_ms = 500 # Backoff start
me_reconnect_backoff_cap_ms = 30000 # Backoff cap
me_reconnect_fast_retry_count = 11 # Quick retries before backoff
[network] [network]
# Enable/disable families; ipv6 = true/false/auto(None) # Enable/disable families; ipv6 = true/false/auto(None)
@ -50,10 +82,13 @@ show = ["hello"] # Users to show in the startup log (tg:// links)
# === Timeouts (in seconds) === # === Timeouts (in seconds) ===
[timeouts] [timeouts]
client_handshake = 15 client_handshake = 30
tg_connect = 10 tg_connect = 10
client_keepalive = 60 client_keepalive = 60
client_ack = 300 client_ack = 300
# Quick ME reconnects for single-address DCs (count and per-attempt timeout, ms).
me_one_retry = 12
me_one_timeout_ms = 1200
# === Anti-Censorship & Masking === # === Anti-Censorship & Masking ===
[censorship] [censorship]

View File

@ -74,6 +74,34 @@ pub(crate) fn default_unknown_dc_log_path() -> Option<String> {
Some("unknown-dc.txt".to_string()) Some("unknown-dc.txt".to_string())
} }
pub(crate) fn default_pool_size() -> usize {
2
}
pub(crate) fn default_keepalive_interval() -> u64 {
25
}
pub(crate) fn default_keepalive_jitter() -> u64 {
5
}
pub(crate) fn default_warmup_step_delay_ms() -> u64 {
500
}
pub(crate) fn default_warmup_step_jitter_ms() -> u64 {
300
}
pub(crate) fn default_reconnect_backoff_base_ms() -> u64 {
500
}
pub(crate) fn default_reconnect_backoff_cap_ms() -> u64 {
30_000
}
// Custom deserializer helpers // Custom deserializer helpers
#[derive(Deserialize)] #[derive(Deserialize)]

View File

@ -11,6 +11,32 @@ use crate::error::{ProxyError, Result};
use super::defaults::*; use super::defaults::*;
use super::types::*; use super::types::*;
fn preprocess_includes(content: &str, base_dir: &Path, depth: u8) -> Result<String> {
if depth > 10 {
return Err(ProxyError::Config("Include depth > 10".into()));
}
let mut output = String::with_capacity(content.len());
for line in content.lines() {
let trimmed = line.trim();
if let Some(rest) = trimmed.strip_prefix("include") {
let rest = rest.trim();
if let Some(rest) = rest.strip_prefix('=') {
let path_str = rest.trim().trim_matches('"');
let resolved = base_dir.join(path_str);
let included = std::fs::read_to_string(&resolved)
.map_err(|e| ProxyError::Config(e.to_string()))?;
let included_dir = resolved.parent().unwrap_or(base_dir);
output.push_str(&preprocess_includes(&included, included_dir, depth + 1)?);
output.push('\n');
continue;
}
}
output.push_str(line);
output.push('\n');
}
Ok(output)
}
fn validate_network_cfg(net: &mut NetworkConfig) -> Result<()> { fn validate_network_cfg(net: &mut NetworkConfig) -> Result<()> {
if !net.ipv4 && matches!(net.ipv6, Some(false)) { if !net.ipv4 && matches!(net.ipv6, Some(false)) {
return Err(ProxyError::Config( return Err(ProxyError::Config(
@ -84,10 +110,12 @@ pub struct ProxyConfig {
impl ProxyConfig { impl ProxyConfig {
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self> { pub fn load<P: AsRef<Path>>(path: P) -> Result<Self> {
let content = let content =
std::fs::read_to_string(path).map_err(|e| ProxyError::Config(e.to_string()))?; std::fs::read_to_string(&path).map_err(|e| ProxyError::Config(e.to_string()))?;
let base_dir = path.as_ref().parent().unwrap_or(Path::new("."));
let processed = preprocess_includes(&content, base_dir, 0)?;
let mut config: ProxyConfig = let mut config: ProxyConfig =
toml::from_str(&content).map_err(|e| ProxyError::Config(e.to_string()))?; toml::from_str(&processed).map_err(|e| ProxyError::Config(e.to_string()))?;
// Validate secrets. // Validate secrets.
for (user, secret) in &config.access.users { for (user, secret) in &config.access.users {
@ -151,8 +179,10 @@ impl ProxyConfig {
validate_network_cfg(&mut config.network)?; validate_network_cfg(&mut config.network)?;
// Random fake_cert_len. // Random fake_cert_len only when default is in use.
config.censorship.fake_cert_len = rand::rng().gen_range(1024..4096); if config.censorship.fake_cert_len == default_fake_cert_len() {
config.censorship.fake_cert_len = rand::rng().gen_range(1024..4096);
}
// Resolve listen_tcp: explicit value wins, otherwise auto-detect. // Resolve listen_tcp: explicit value wins, otherwise auto-detect.
// If unix socket is set → TCP only when listen_addr_ipv4 or listeners are explicitly provided. // If unix socket is set → TCP only when listen_addr_ipv4 or listeners are explicitly provided.
@ -208,6 +238,8 @@ impl ProxyConfig {
upstream_type: UpstreamType::Direct { interface: None }, upstream_type: UpstreamType::Direct { interface: None },
weight: 1, weight: 1,
enabled: true, enabled: true,
scopes: String::new(),
selected_scope: String::new(),
}); });
} }

View File

@ -143,6 +143,62 @@ pub struct GeneralConfig {
#[serde(default)] #[serde(default)]
pub middle_proxy_nat_stun: Option<String>, pub middle_proxy_nat_stun: Option<String>,
/// Optional list of STUN servers for NAT probing fallback.
#[serde(default)]
pub middle_proxy_nat_stun_servers: Vec<String>,
/// Desired size of active Middle-Proxy writer pool.
#[serde(default = "default_pool_size")]
pub middle_proxy_pool_size: usize,
/// Number of warm standby ME connections kept pre-initialized.
#[serde(default)]
pub middle_proxy_warm_standby: usize,
/// Enable ME keepalive padding frames.
#[serde(default = "default_true")]
pub me_keepalive_enabled: bool,
/// Keepalive interval in seconds.
#[serde(default = "default_keepalive_interval")]
pub me_keepalive_interval_secs: u64,
/// Keepalive jitter in seconds.
#[serde(default = "default_keepalive_jitter")]
pub me_keepalive_jitter_secs: u64,
/// Keepalive payload randomized (4 bytes); otherwise zeros.
#[serde(default = "default_true")]
pub me_keepalive_payload_random: bool,
/// Enable staggered warmup of extra ME writers.
#[serde(default = "default_true")]
pub me_warmup_stagger_enabled: bool,
/// Base delay between warmup connections in ms.
#[serde(default = "default_warmup_step_delay_ms")]
pub me_warmup_step_delay_ms: u64,
/// Jitter for warmup delay in ms.
#[serde(default = "default_warmup_step_jitter_ms")]
pub me_warmup_step_jitter_ms: u64,
/// Max concurrent reconnect attempts per DC.
#[serde(default)]
pub me_reconnect_max_concurrent_per_dc: u32,
/// Base backoff in ms for reconnect.
#[serde(default = "default_reconnect_backoff_base_ms")]
pub me_reconnect_backoff_base_ms: u64,
/// Cap backoff in ms for reconnect.
#[serde(default = "default_reconnect_backoff_cap_ms")]
pub me_reconnect_backoff_cap_ms: u64,
/// Fast retry attempts before backoff.
#[serde(default)]
pub me_reconnect_fast_retry_count: u32,
/// Ignore STUN/interface IP mismatch (keep using Middle Proxy even if NAT detected). /// Ignore STUN/interface IP mismatch (keep using Middle Proxy even if NAT detected).
#[serde(default)] #[serde(default)]
pub stun_iface_mismatch_ignore: bool, pub stun_iface_mismatch_ignore: bool,
@ -175,6 +231,20 @@ impl Default for GeneralConfig {
middle_proxy_nat_ip: None, middle_proxy_nat_ip: None,
middle_proxy_nat_probe: false, middle_proxy_nat_probe: false,
middle_proxy_nat_stun: None, middle_proxy_nat_stun: None,
middle_proxy_nat_stun_servers: Vec::new(),
middle_proxy_pool_size: default_pool_size(),
middle_proxy_warm_standby: 0,
me_keepalive_enabled: true,
me_keepalive_interval_secs: default_keepalive_interval(),
me_keepalive_jitter_secs: default_keepalive_jitter(),
me_keepalive_payload_random: true,
me_warmup_stagger_enabled: true,
me_warmup_step_delay_ms: default_warmup_step_delay_ms(),
me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(),
me_reconnect_max_concurrent_per_dc: 1,
me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(),
me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(),
me_reconnect_fast_retry_count: 1,
stun_iface_mismatch_ignore: false, stun_iface_mismatch_ignore: false,
unknown_dc_log_path: default_unknown_dc_log_path(), unknown_dc_log_path: default_unknown_dc_log_path(),
log_level: LogLevel::Normal, log_level: LogLevel::Normal,
@ -403,6 +473,10 @@ pub struct UpstreamConfig {
pub weight: u16, pub weight: u16,
#[serde(default = "default_true")] #[serde(default = "default_true")]
pub enabled: bool, pub enabled: bool,
#[serde(default)]
pub scopes: String,
#[serde(skip)]
pub selected_scope: String,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]

View File

@ -11,6 +11,9 @@ pub struct SecureRandom {
inner: Mutex<SecureRandomInner>, inner: Mutex<SecureRandomInner>,
} }
unsafe impl Send for SecureRandom {}
unsafe impl Sync for SecureRandom {}
struct SecureRandomInner { struct SecureRandomInner {
rng: StdRng, rng: StdRng,
cipher: AesCtr, cipher: AesCtr,
@ -211,4 +214,4 @@ mod tests {
assert_ne!(shuffled, original); assert_ne!(shuffled, original);
} }
} }

View File

@ -92,6 +92,10 @@ fn parse_cli() -> (String, bool, Option<String>) {
eprintln!(" --no-start Don't start the service after install"); eprintln!(" --no-start Don't start the service after install");
std::process::exit(0); std::process::exit(0);
} }
"--version" | "-V" => {
println!("telemt {}", env!("CARGO_PKG_VERSION"));
std::process::exit(0);
}
s if !s.starts_with('-') => { s if !s.starts_with('-') => {
config_path = s.to_string(); config_path = s.to_string();
} }
@ -106,18 +110,20 @@ fn parse_cli() -> (String, bool, Option<String>) {
} }
fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {
info!("--- Proxy Links ({}) ---", host); info!(target: "telemt::links", "--- Proxy Links ({}) ---", host);
for user_name in config.general.links.show.resolve_users(&config.access.users) { for user_name in config.general.links.show.resolve_users(&config.access.users) {
if let Some(secret) = config.access.users.get(user_name) { if let Some(secret) = config.access.users.get(user_name) {
info!("User: {}", user_name); info!(target: "telemt::links", "User: {}", user_name);
if config.general.modes.classic { if config.general.modes.classic {
info!( info!(
target: "telemt::links",
" Classic: tg://proxy?server={}&port={}&secret={}", " Classic: tg://proxy?server={}&port={}&secret={}",
host, port, secret host, port, secret
); );
} }
if config.general.modes.secure { if config.general.modes.secure {
info!( info!(
target: "telemt::links",
" DD: tg://proxy?server={}&port={}&secret=dd{}", " DD: tg://proxy?server={}&port={}&secret=dd{}",
host, port, secret host, port, secret
); );
@ -125,15 +131,16 @@ fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {
if config.general.modes.tls { if config.general.modes.tls {
let domain_hex = hex::encode(&config.censorship.tls_domain); let domain_hex = hex::encode(&config.censorship.tls_domain);
info!( info!(
target: "telemt::links",
" EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}", " EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}",
host, port, secret, domain_hex host, port, secret, domain_hex
); );
} }
} else { } else {
warn!("User '{}' in show_link not found", user_name); warn!(target: "telemt::links", "User '{}' in show_link not found", user_name);
} }
} }
info!("------------------------"); info!(target: "telemt::links", "------------------------");
} }
#[tokio::main] #[tokio::main]
@ -317,6 +324,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
config.general.middle_proxy_nat_ip, config.general.middle_proxy_nat_ip,
config.general.middle_proxy_nat_probe, config.general.middle_proxy_nat_probe,
config.general.middle_proxy_nat_stun.clone(), config.general.middle_proxy_nat_stun.clone(),
config.general.middle_proxy_nat_stun_servers.clone(),
probe.detected_ipv6, probe.detected_ipv6,
config.timeouts.me_one_retry, config.timeouts.me_one_retry,
config.timeouts.me_one_timeout_ms, config.timeouts.me_one_timeout_ms,
@ -325,18 +333,32 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
cfg_v4.default_dc.or(cfg_v6.default_dc), cfg_v4.default_dc.or(cfg_v6.default_dc),
decision.clone(), decision.clone(),
rng.clone(), rng.clone(),
stats.clone(),
config.general.me_keepalive_enabled,
config.general.me_keepalive_interval_secs,
config.general.me_keepalive_jitter_secs,
config.general.me_keepalive_payload_random,
config.general.me_warmup_stagger_enabled,
config.general.me_warmup_step_delay_ms,
config.general.me_warmup_step_jitter_ms,
config.general.me_reconnect_max_concurrent_per_dc,
config.general.me_reconnect_backoff_base_ms,
config.general.me_reconnect_backoff_cap_ms,
config.general.me_reconnect_fast_retry_count,
); );
match pool.init(2, &rng).await { let pool_size = config.general.middle_proxy_pool_size.max(1);
match pool.init(pool_size, &rng).await {
Ok(()) => { Ok(()) => {
info!("Middle-End pool initialized successfully"); info!("Middle-End pool initialized successfully");
// Phase 4: Start health monitor // Phase 4: Start health monitor
let pool_clone = pool.clone(); let pool_clone = pool.clone();
let rng_clone = rng.clone(); let rng_clone = rng.clone();
let min_conns = pool_size;
tokio::spawn(async move { tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor( crate::transport::middle_proxy::me_health_monitor(
pool_clone, rng_clone, 2, pool_clone, rng_clone, min_conns,
) )
.await; .await;
}); });
@ -740,6 +762,8 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
// Switch to user-configured log level after startup // Switch to user-configured log level after startup
let runtime_filter = if has_rust_log { let runtime_filter = if has_rust_log {
EnvFilter::from_default_env() EnvFilter::from_default_env()
} else if matches!(effective_log_level, LogLevel::Silent) {
EnvFilter::new("warn,telemt::links=info")
} else { } else {
EnvFilter::new(effective_log_level.to_filter_str()) EnvFilter::new(effective_log_level.to_filter_str())
}; };

View File

@ -91,6 +91,22 @@ fn render_metrics(stats: &Stats) -> String {
let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter"); let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter");
let _ = writeln!(out, "telemt_handshake_timeouts_total {}", stats.get_handshake_timeouts()); let _ = writeln!(out, "telemt_handshake_timeouts_total {}", stats.get_handshake_timeouts());
let _ = writeln!(out, "# HELP telemt_me_keepalive_sent_total ME keepalive frames sent");
let _ = writeln!(out, "# TYPE telemt_me_keepalive_sent_total counter");
let _ = writeln!(out, "telemt_me_keepalive_sent_total {}", stats.get_me_keepalive_sent());
let _ = writeln!(out, "# HELP telemt_me_keepalive_failed_total ME keepalive send failures");
let _ = writeln!(out, "# TYPE telemt_me_keepalive_failed_total counter");
let _ = writeln!(out, "telemt_me_keepalive_failed_total {}", stats.get_me_keepalive_failed());
let _ = writeln!(out, "# HELP telemt_me_reconnect_attempts_total ME reconnect attempts");
let _ = writeln!(out, "# TYPE telemt_me_reconnect_attempts_total counter");
let _ = writeln!(out, "telemt_me_reconnect_attempts_total {}", stats.get_me_reconnect_attempts());
let _ = writeln!(out, "# HELP telemt_me_reconnect_success_total ME reconnect successes");
let _ = writeln!(out, "# TYPE telemt_me_reconnect_success_total counter");
let _ = writeln!(out, "telemt_me_reconnect_success_total {}", stats.get_me_reconnect_success());
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections"); let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter"); let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections"); let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");

View File

@ -45,7 +45,7 @@ where
); );
let tg_stream = upstream_manager let tg_stream = upstream_manager
.connect(dc_addr, Some(success.dc_idx)) .connect(dc_addr, Some(success.dc_idx), user.strip_prefix("scope_").filter(|s| !s.is_empty()))
.await?; .await?;
debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected, performing TG handshake"); debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected, performing TG handshake");

View File

@ -19,6 +19,10 @@ pub struct Stats {
connects_all: AtomicU64, connects_all: AtomicU64,
connects_bad: AtomicU64, connects_bad: AtomicU64,
handshake_timeouts: AtomicU64, handshake_timeouts: AtomicU64,
me_keepalive_sent: AtomicU64,
me_keepalive_failed: AtomicU64,
me_reconnect_attempts: AtomicU64,
me_reconnect_success: AtomicU64,
user_stats: DashMap<String, UserStats>, user_stats: DashMap<String, UserStats>,
start_time: parking_lot::RwLock<Option<Instant>>, start_time: parking_lot::RwLock<Option<Instant>>,
} }
@ -43,8 +47,16 @@ impl Stats {
pub fn increment_connects_all(&self) { self.connects_all.fetch_add(1, Ordering::Relaxed); } pub fn increment_connects_all(&self) { self.connects_all.fetch_add(1, Ordering::Relaxed); }
pub fn increment_connects_bad(&self) { self.connects_bad.fetch_add(1, Ordering::Relaxed); } pub fn increment_connects_bad(&self) { self.connects_bad.fetch_add(1, Ordering::Relaxed); }
pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); } pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_keepalive_sent(&self) { self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_keepalive_failed(&self) { self.me_keepalive_failed.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_reconnect_attempt(&self) { self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_reconnect_success(&self) { self.me_reconnect_success.fetch_add(1, Ordering::Relaxed); }
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) }
pub fn get_me_reconnect_attempts(&self) -> u64 { self.me_reconnect_attempts.load(Ordering::Relaxed) }
pub fn get_me_reconnect_success(&self) -> u64 { self.me_reconnect_success.load(Ordering::Relaxed) }
pub fn increment_user_connects(&self, user: &str) { pub fn increment_user_connects(&self, user: &str) {
self.user_stats.entry(user.to_string()).or_default() self.user_stats.entry(user.to_string()).or_default()

View File

@ -4,6 +4,14 @@ use crate::crypto::{AesCbc, crc32};
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::protocol::constants::*; use crate::protocol::constants::*;
/// Commands sent to dedicated writer tasks to avoid mutex contention on TCP writes.
pub(crate) enum WriterCommand {
Data(Vec<u8>),
DataAndFlush(Vec<u8>),
Keepalive,
Close,
}
pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8]) -> Vec<u8> { pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8]) -> Vec<u8> {
let total_len = (4 + 4 + payload.len() + 4) as u32; let total_len = (4 + 4 + payload.len() + 4) as u32;
let mut frame = Vec::with_capacity(total_len as usize); let mut frame = Vec::with_capacity(total_len as usize);
@ -181,4 +189,12 @@ impl RpcWriter {
self.send(payload).await?; self.send(payload).await?;
self.writer.flush().await.map_err(ProxyError::Io) self.writer.flush().await.map_err(ProxyError::Io)
} }
pub(crate) async fn send_keepalive(&mut self, payload: [u8; 4]) -> Result<()> {
// Keepalive is a frame with fl == 4 and 4 bytes payload.
let mut frame = Vec::with_capacity(8);
frame.extend_from_slice(&4u32.to_le_bytes());
frame.extend_from_slice(&payload);
self.send(&frame).await
}
} }

View File

@ -13,6 +13,24 @@ use super::secret::download_proxy_secret;
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use std::time::SystemTime; use std::time::SystemTime;
async fn retry_fetch(url: &str) -> Option<ProxyConfigData> {
let delays = [1u64, 5, 15];
for (i, d) in delays.iter().enumerate() {
match fetch_proxy_config(url).await {
Ok(cfg) => return Some(cfg),
Err(e) => {
if i == delays.len() - 1 {
warn!(error = %e, url, "fetch_proxy_config failed");
} else {
debug!(error = %e, url, "fetch_proxy_config retrying");
tokio::time::sleep(Duration::from_secs(*d)).await;
}
}
}
}
None
}
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
pub struct ProxyConfigData { pub struct ProxyConfigData {
pub map: HashMap<i32, Vec<(IpAddr, u16)>>, pub map: HashMap<i32, Vec<(IpAddr, u16)>>,
@ -118,7 +136,8 @@ pub async fn me_config_updater(pool: Arc<MePool>, rng: Arc<SecureRandom>, interv
tick.tick().await; tick.tick().await;
// Update proxy config v4 // Update proxy config v4
if let Ok(cfg) = fetch_proxy_config("https://core.telegram.org/getProxyConfig").await { let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await;
if let Some(cfg) = cfg_v4 {
let changed = pool.update_proxy_maps(cfg.map.clone(), None).await; let changed = pool.update_proxy_maps(cfg.map.clone(), None).await;
if let Some(dc) = cfg.default_dc { if let Some(dc) = cfg.default_dc {
pool.default_dc.store(dc, std::sync::atomic::Ordering::Relaxed); pool.default_dc.store(dc, std::sync::atomic::Ordering::Relaxed);
@ -129,14 +148,20 @@ pub async fn me_config_updater(pool: Arc<MePool>, rng: Arc<SecureRandom>, interv
} else { } else {
debug!("ME config v4 unchanged"); debug!("ME config v4 unchanged");
} }
} else {
warn!("getProxyConfig update failed");
} }
// Update proxy config v6 (optional) // Update proxy config v6 (optional)
if let Ok(cfg_v6) = fetch_proxy_config("https://core.telegram.org/getProxyConfigV6").await { let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await;
let _ = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await; if let Some(cfg_v6) = cfg_v6 {
let changed = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await;
if changed {
info!("ME config updated (v6), reconciling connections");
pool.reconcile_connections(&rng).await;
} else {
debug!("ME config v6 unchanged");
}
} }
pool.reset_stun_state();
// Update proxy-secret // Update proxy-secret
match download_proxy_secret().await { match download_proxy_secret().await {

View File

@ -5,25 +5,30 @@ use std::time::{Duration, Instant};
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use rand::Rng;
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::network::IpFamily; use crate::network::IpFamily;
use super::MePool; use super::MePool;
const HEALTH_INTERVAL_SECS: u64 = 1;
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1;
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) { pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut inflight_single: HashSet<(i32, IpFamily)> = HashSet::new(); let mut inflight: HashMap<(i32, IpFamily), usize> = HashMap::new();
loop { loop {
tokio::time::sleep(Duration::from_secs(30)).await; tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
check_family( check_family(
IpFamily::V4, IpFamily::V4,
&pool, &pool,
&rng, &rng,
&mut backoff, &mut backoff,
&mut last_attempt, &mut next_attempt,
&mut inflight_single, &mut inflight,
) )
.await; .await;
check_family( check_family(
@ -31,8 +36,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&pool, &pool,
&rng, &rng,
&mut backoff, &mut backoff,
&mut last_attempt, &mut next_attempt,
&mut inflight_single, &mut inflight,
) )
.await; .await;
} }
@ -43,8 +48,8 @@ async fn check_family(
pool: &Arc<MePool>, pool: &Arc<MePool>,
rng: &Arc<SecureRandom>, rng: &Arc<SecureRandom>,
backoff: &mut HashMap<(i32, IpFamily), u64>, backoff: &mut HashMap<(i32, IpFamily), u64>,
last_attempt: &mut HashMap<(i32, IpFamily), Instant>, next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
inflight_single: &mut HashSet<(i32, IpFamily)>, inflight: &mut HashMap<(i32, IpFamily), usize>,
) { ) {
let enabled = match family { let enabled = match family {
IpFamily::V4 => pool.decision.ipv4_me, IpFamily::V4 => pool.decision.ipv4_me,
@ -80,95 +85,60 @@ async fn check_family(
for (dc, dc_addrs) in entries { for (dc, dc_addrs) in entries {
let has_coverage = dc_addrs.iter().any(|a| writer_addrs.contains(a)); let has_coverage = dc_addrs.iter().any(|a| writer_addrs.contains(a));
if has_coverage { if has_coverage {
inflight_single.remove(&(dc, family));
continue;
}
let key = (dc, family);
let delay = *backoff.get(&key).unwrap_or(&30);
let now = Instant::now();
if let Some(last) = last_attempt.get(&key) {
if now.duration_since(*last).as_secs() < delay {
continue;
}
}
if dc_addrs.len() == 1 {
// Single ME address: fast retries then slower background retries.
if inflight_single.contains(&key) {
continue;
}
inflight_single.insert(key);
let addr = dc_addrs[0];
let dc_id = dc;
let pool_clone = pool.clone();
let rng_clone = rng.clone();
let timeout = pool.me_one_timeout;
let quick_attempts = pool.me_one_retry.max(1);
tokio::spawn(async move {
let mut success = false;
for _ in 0..quick_attempts {
let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await;
match res {
Ok(Ok(())) => {
info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage");
success = true;
break;
}
Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"),
Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"),
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
if success {
return;
}
let timeout_ms = timeout.as_millis();
warn!(
dc = %dc_id,
?family,
attempts = quick_attempts,
timeout_ms,
"DC={} has no ME coverage: {} tries * {} ms... retry in 5 seconds...",
dc_id,
quick_attempts,
timeout_ms
);
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await;
match res {
Ok(Ok(())) => {
info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage");
break;
}
Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"),
Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"),
}
}
// will drop inflight flag in outer loop when coverage detected
});
continue; continue;
} }
warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting..."); let key = (dc, family);
let mut shuffled = dc_addrs.clone(); let now = Instant::now();
shuffled.shuffle(&mut rand::rng()); if let Some(ts) = next_attempt.get(&key) {
let mut reconnected = false; if now < *ts {
for addr in shuffled { continue;
match pool.connect_one(addr, rng.as_ref()).await {
Ok(()) => {
info!(%addr, dc = %dc, ?family, "ME reconnected for DC coverage");
backoff.insert(key, 30);
last_attempt.insert(key, now);
reconnected = true;
break;
}
Err(e) => debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed"),
} }
} }
if !reconnected {
let next = (*backoff.get(&key).unwrap_or(&30)).saturating_mul(2).min(300); let max_concurrent = pool.me_reconnect_max_concurrent_per_dc.max(1) as usize;
backoff.insert(key, next); if *inflight.get(&key).unwrap_or(&0) >= max_concurrent {
last_attempt.insert(key, now); return;
}
*inflight.entry(key).or_insert(0) += 1;
let mut shuffled = dc_addrs.clone();
shuffled.shuffle(&mut rand::rng());
let mut success = false;
for addr in shuffled {
let res = tokio::time::timeout(pool.me_one_timeout, pool.connect_one(addr, rng.as_ref())).await;
match res {
Ok(Ok(())) => {
info!(%addr, dc = %dc, ?family, "ME reconnected for DC coverage");
pool.stats.increment_me_reconnect_success();
backoff.insert(key, pool.me_reconnect_backoff_base.as_millis() as u64);
let jitter = pool.me_reconnect_backoff_base.as_millis() as u64 / JITTER_FRAC_NUM;
let wait = pool.me_reconnect_backoff_base
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
next_attempt.insert(key, now + wait);
success = true;
break;
}
Ok(Err(e)) => {
pool.stats.increment_me_reconnect_attempt();
debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed")
}
Err(_) => debug!(%addr, dc = %dc, ?family, "ME reconnect timed out"),
}
}
if !success {
pool.stats.increment_me_reconnect_attempt();
let curr = *backoff.get(&key).unwrap_or(&(pool.me_reconnect_backoff_base.as_millis() as u64));
let next_ms = (curr.saturating_mul(2)).min(pool.me_reconnect_backoff_cap.as_millis() as u64);
backoff.insert(key, next_ms);
let jitter = next_ms / JITTER_FRAC_NUM;
let wait = Duration::from_millis(next_ms)
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
next_attempt.insert(key, now + wait);
warn!(dc = %dc, backoff_ms = next_ms, ?family, "DC has no ME coverage, scheduled reconnect");
}
if let Some(v) = inflight.get_mut(&key) {
*v = v.saturating_sub(1);
} }
} }
} }

View File

@ -1,14 +1,14 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicUsize, Ordering};
use bytes::BytesMut; use bytes::BytesMut;
use rand::Rng; use rand::Rng;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use tokio::sync::{Mutex, RwLock}; use tokio::sync::{Mutex, RwLock, mpsc, Notify};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use std::time::Duration; use std::time::{Duration, Instant};
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
@ -18,18 +18,18 @@ use crate::protocol::constants::*;
use super::ConnRegistry; use super::ConnRegistry;
use super::registry::{BoundConn, ConnMeta}; use super::registry::{BoundConn, ConnMeta};
use super::codec::RpcWriter; use super::codec::{RpcWriter, WriterCommand};
use super::reader::reader_loop; use super::reader::reader_loop;
use super::MeResponse; use super::MeResponse;
const ME_ACTIVE_PING_SECS: u64 = 25; const ME_ACTIVE_PING_SECS: u64 = 25;
const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
const ME_KEEPALIVE_PAYLOAD_LEN: usize = 4;
#[derive(Clone)] #[derive(Clone)]
pub struct MeWriter { pub struct MeWriter {
pub id: u64, pub id: u64,
pub addr: SocketAddr, pub addr: SocketAddr,
pub writer: Arc<Mutex<RpcWriter>>, pub tx: mpsc::Sender<WriterCommand>,
pub cancel: CancellationToken, pub cancel: CancellationToken,
pub degraded: Arc<AtomicBool>, pub degraded: Arc<AtomicBool>,
pub draining: Arc<AtomicBool>, pub draining: Arc<AtomicBool>,
@ -47,11 +47,24 @@ pub struct MePool {
pub(super) nat_ip_detected: Arc<RwLock<Option<IpAddr>>>, pub(super) nat_ip_detected: Arc<RwLock<Option<IpAddr>>>,
pub(super) nat_probe: bool, pub(super) nat_probe: bool,
pub(super) nat_stun: Option<String>, pub(super) nat_stun: Option<String>,
pub(super) nat_stun_servers: Vec<String>,
pub(super) detected_ipv6: Option<Ipv6Addr>, pub(super) detected_ipv6: Option<Ipv6Addr>,
pub(super) nat_probe_attempts: std::sync::atomic::AtomicU8, pub(super) nat_probe_attempts: std::sync::atomic::AtomicU8,
pub(super) nat_probe_disabled: std::sync::atomic::AtomicBool, pub(super) nat_probe_disabled: std::sync::atomic::AtomicBool,
pub(super) stun_backoff_until: Arc<RwLock<Option<Instant>>>,
pub(super) me_one_retry: u8, pub(super) me_one_retry: u8,
pub(super) me_one_timeout: Duration, pub(super) me_one_timeout: Duration,
pub(super) me_keepalive_enabled: bool,
pub(super) me_keepalive_interval: Duration,
pub(super) me_keepalive_jitter: Duration,
pub(super) me_keepalive_payload_random: bool,
pub(super) me_warmup_stagger_enabled: bool,
pub(super) me_warmup_step_delay: Duration,
pub(super) me_warmup_step_jitter: Duration,
pub(super) me_reconnect_max_concurrent_per_dc: u32,
pub(super) me_reconnect_backoff_base: Duration,
pub(super) me_reconnect_backoff_cap: Duration,
pub(super) me_reconnect_fast_retry_count: u32,
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>, pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>, pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) default_dc: AtomicI32, pub(super) default_dc: AtomicI32,
@ -59,6 +72,9 @@ pub struct MePool {
pub(super) ping_tracker: Arc<Mutex<HashMap<i64, (std::time::Instant, u64)>>>, pub(super) ping_tracker: Arc<Mutex<HashMap<i64, (std::time::Instant, u64)>>>,
pub(super) rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>, pub(super) rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>,
pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>, pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>,
pub(super) writer_available: Arc<Notify>,
pub(super) conn_count: AtomicUsize,
pub(super) stats: Arc<crate::stats::Stats>,
pool_size: usize, pool_size: usize,
} }
@ -75,6 +91,7 @@ impl MePool {
nat_ip: Option<IpAddr>, nat_ip: Option<IpAddr>,
nat_probe: bool, nat_probe: bool,
nat_stun: Option<String>, nat_stun: Option<String>,
nat_stun_servers: Vec<String>,
detected_ipv6: Option<Ipv6Addr>, detected_ipv6: Option<Ipv6Addr>,
me_one_retry: u8, me_one_retry: u8,
me_one_timeout_ms: u64, me_one_timeout_ms: u64,
@ -83,6 +100,18 @@ impl MePool {
default_dc: Option<i32>, default_dc: Option<i32>,
decision: NetworkDecision, decision: NetworkDecision,
rng: Arc<SecureRandom>, rng: Arc<SecureRandom>,
stats: Arc<crate::stats::Stats>,
me_keepalive_enabled: bool,
me_keepalive_interval_secs: u64,
me_keepalive_jitter_secs: u64,
me_keepalive_payload_random: bool,
me_warmup_stagger_enabled: bool,
me_warmup_step_delay_ms: u64,
me_warmup_step_jitter_ms: u64,
me_reconnect_max_concurrent_per_dc: u32,
me_reconnect_backoff_base_ms: u64,
me_reconnect_backoff_cap_ms: u64,
me_reconnect_fast_retry_count: u32,
) -> Arc<Self> { ) -> Arc<Self> {
Arc::new(Self { Arc::new(Self {
registry: Arc::new(ConnRegistry::new()), registry: Arc::new(ConnRegistry::new()),
@ -96,11 +125,25 @@ impl MePool {
nat_ip_detected: Arc::new(RwLock::new(None)), nat_ip_detected: Arc::new(RwLock::new(None)),
nat_probe, nat_probe,
nat_stun, nat_stun,
nat_stun_servers,
detected_ipv6, detected_ipv6,
nat_probe_attempts: std::sync::atomic::AtomicU8::new(0), nat_probe_attempts: std::sync::atomic::AtomicU8::new(0),
nat_probe_disabled: std::sync::atomic::AtomicBool::new(false), nat_probe_disabled: std::sync::atomic::AtomicBool::new(false),
stun_backoff_until: Arc::new(RwLock::new(None)),
me_one_retry, me_one_retry,
me_one_timeout: Duration::from_millis(me_one_timeout_ms), me_one_timeout: Duration::from_millis(me_one_timeout_ms),
stats,
me_keepalive_enabled,
me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs),
me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs),
me_keepalive_payload_random,
me_warmup_stagger_enabled,
me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms),
me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms),
me_reconnect_max_concurrent_per_dc,
me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms),
me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms),
me_reconnect_fast_retry_count,
pool_size: 2, pool_size: 2,
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
@ -109,6 +152,8 @@ impl MePool {
ping_tracker: Arc::new(Mutex::new(HashMap::new())), ping_tracker: Arc::new(Mutex::new(HashMap::new())),
rtt_stats: Arc::new(Mutex::new(HashMap::new())), rtt_stats: Arc::new(Mutex::new(HashMap::new())),
nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())), nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())),
writer_available: Arc::new(Notify::new()),
conn_count: AtomicUsize::new(0),
}) })
} }
@ -116,6 +161,11 @@ impl MePool {
self.proxy_tag.is_some() self.proxy_tag.is_some()
} }
pub fn reset_stun_state(&self) {
self.nat_probe_attempts.store(0, Ordering::Relaxed);
self.nat_probe_disabled.store(false, Ordering::Relaxed);
}
pub fn translate_our_addr(&self, addr: SocketAddr) -> SocketAddr { pub fn translate_our_addr(&self, addr: SocketAddr) -> SocketAddr {
let ip = self.translate_ip_for_nat(addr.ip()); let ip = self.translate_ip_for_nat(addr.ip());
SocketAddr::new(ip, addr.port()) SocketAddr::new(ip, addr.port())
@ -132,7 +182,11 @@ impl MePool {
pub async fn reconcile_connections(self: &Arc<Self>, rng: &SecureRandom) { pub async fn reconcile_connections(self: &Arc<Self>, rng: &SecureRandom) {
use std::collections::HashSet; use std::collections::HashSet;
let writers = self.writers.read().await; let writers = self.writers.read().await;
let current: HashSet<SocketAddr> = writers.iter().map(|w| w.addr).collect(); let current: HashSet<SocketAddr> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.map(|w| w.addr)
.collect();
drop(writers); drop(writers);
for family in self.family_order() { for family in self.family_order() {
@ -175,12 +229,36 @@ impl MePool {
let mut guard = self.proxy_map_v6.write().await; let mut guard = self.proxy_map_v6.write().await;
if !v6.is_empty() && *guard != v6 { if !v6.is_empty() && *guard != v6 {
*guard = v6; *guard = v6;
changed = true;
}
}
// Ensure negative DC entries mirror positives when absent (Telegram convention).
{
let mut guard = self.proxy_map_v4.write().await;
let keys: Vec<i32> = guard.keys().cloned().collect();
for k in keys.iter().cloned().filter(|k| *k > 0) {
if !guard.contains_key(&-k) {
if let Some(addrs) = guard.get(&k).cloned() {
guard.insert(-k, addrs);
}
}
}
}
{
let mut guard = self.proxy_map_v6.write().await;
let keys: Vec<i32> = guard.keys().cloned().collect();
for k in keys.iter().cloned().filter(|k| *k > 0) {
if !guard.contains_key(&-k) {
if let Some(addrs) = guard.get(&k).cloned() {
guard.insert(-k, addrs);
}
}
} }
} }
changed changed
} }
pub async fn update_secret(&self, new_secret: Vec<u8>) -> bool { pub async fn update_secret(self: &Arc<Self>, new_secret: Vec<u8>) -> bool {
if new_secret.len() < 32 { if new_secret.len() < 32 {
warn!(len = new_secret.len(), "proxy-secret update ignored (too short)"); warn!(len = new_secret.len(), "proxy-secret update ignored (too short)");
return false; return false;
@ -195,10 +273,14 @@ impl MePool {
false false
} }
pub async fn reconnect_all(&self) { pub async fn reconnect_all(self: &Arc<Self>) {
// Graceful: do not drop all at once. New connections will use updated secret. let ws = self.writers.read().await.clone();
// Existing writers remain until health monitor replaces them. for w in ws {
// No-op here to avoid total outage. if let Ok(()) = self.connect_one(w.addr, self.rng.as_ref()).await {
self.mark_writer_draining(w.id).await;
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
} }
pub(super) async fn key_selector(&self) -> u32 { pub(super) async fn key_selector(&self) -> u32 {
@ -277,7 +359,24 @@ impl MePool {
return Err(ProxyError::Proxy("Too many ME DC init failures, falling back to direct".into())); return Err(ProxyError::Proxy("Too many ME DC init failures, falling back to direct".into()));
} }
// Additional connections up to pool_size total (round-robin across DCs) // Additional connections up to pool_size total (round-robin across DCs), staggered to de-phase lifecycles.
if self.me_warmup_stagger_enabled {
let mut delay_ms = 0u64;
for (dc, addrs) in dc_addrs.iter() {
for (ip, port) in addrs {
if self.connection_count() >= pool_size {
break;
}
let addr = SocketAddr::new(*ip, *port);
let jitter = rand::rng().random_range(0..=self.me_warmup_step_jitter.as_millis() as u64);
delay_ms = delay_ms.saturating_add(self.me_warmup_step_delay.as_millis() as u64 + jitter);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
if let Err(e) = self.connect_one(addr, rng.as_ref()).await {
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed (staggered)");
}
}
}
} else {
for (dc, addrs) in dc_addrs.iter() { for (dc, addrs) in dc_addrs.iter() {
for (ip, port) in addrs { for (ip, port) in addrs {
if self.connection_count() >= pool_size { if self.connection_count() >= pool_size {
@ -292,6 +391,7 @@ impl MePool {
break; break;
} }
} }
}
if !self.decision.effective_multipath && self.connection_count() > 0 { if !self.decision.effective_multipath && self.connection_count() > 0 {
break; break;
@ -317,21 +417,61 @@ impl MePool {
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
let degraded = Arc::new(AtomicBool::new(false)); let degraded = Arc::new(AtomicBool::new(false));
let draining = Arc::new(AtomicBool::new(false)); let draining = Arc::new(AtomicBool::new(false));
let rpc_w = Arc::new(Mutex::new(RpcWriter { let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
let tx_for_keepalive = tx.clone();
let keepalive_random = self.me_keepalive_payload_random;
let stats = self.stats.clone();
let mut rpc_writer = RpcWriter {
writer: hs.wr, writer: hs.wr,
key: hs.write_key, key: hs.write_key,
iv: hs.write_iv, iv: hs.write_iv,
seq_no: 0, seq_no: 0,
})); };
let cancel_wr = cancel.clone();
tokio::spawn(async move {
loop {
tokio::select! {
cmd = rx.recv() => {
match cmd {
Some(WriterCommand::Data(payload)) => {
if rpc_writer.send(&payload).await.is_err() { break; }
}
Some(WriterCommand::DataAndFlush(payload)) => {
if rpc_writer.send_and_flush(&payload).await.is_err() { break; }
}
Some(WriterCommand::Keepalive) => {
let mut payload = [0u8; ME_KEEPALIVE_PAYLOAD_LEN];
if keepalive_random {
rand::rng().fill(&mut payload);
}
match rpc_writer.send_keepalive(payload).await {
Ok(()) => {
stats.increment_me_keepalive_sent();
}
Err(_) => {
stats.increment_me_keepalive_failed();
break;
}
}
}
Some(WriterCommand::Close) | None => break,
}
}
_ = cancel_wr.cancelled() => break,
}
}
});
let writer = MeWriter { let writer = MeWriter {
id: writer_id, id: writer_id,
addr, addr,
writer: rpc_w.clone(), tx: tx.clone(),
cancel: cancel.clone(), cancel: cancel.clone(),
degraded: degraded.clone(), degraded: degraded.clone(),
draining: draining.clone(), draining: draining.clone(),
}; };
self.writers.write().await.push(writer.clone()); self.writers.write().await.push(writer.clone());
self.conn_count.fetch_add(1, Ordering::Relaxed);
self.writer_available.notify_waiters();
let reg = self.registry.clone(); let reg = self.registry.clone();
let writers_arc = self.writers_arc(); let writers_arc = self.writers_arc();
@ -339,11 +479,19 @@ impl MePool {
let rtt_stats = self.rtt_stats.clone(); let rtt_stats = self.rtt_stats.clone();
let pool = Arc::downgrade(self); let pool = Arc::downgrade(self);
let cancel_ping = cancel.clone(); let cancel_ping = cancel.clone();
let rpc_w_ping = rpc_w.clone(); let tx_ping = tx.clone();
let ping_tracker_ping = ping_tracker.clone(); let ping_tracker_ping = ping_tracker.clone();
let cleanup_done = Arc::new(AtomicBool::new(false));
let cleanup_for_reader = cleanup_done.clone();
let cleanup_for_ping = cleanup_done.clone();
let keepalive_enabled = self.me_keepalive_enabled;
let keepalive_interval = self.me_keepalive_interval;
let keepalive_jitter = self.me_keepalive_jitter;
let cancel_reader_token = cancel.clone();
let cancel_ping_token = cancel_ping.clone();
let cancel_keepalive_token = cancel.clone();
tokio::spawn(async move { tokio::spawn(async move {
let cancel_reader = cancel.clone();
let res = reader_loop( let res = reader_loop(
hs.rd, hs.rd,
hs.read_key, hs.read_key,
@ -351,16 +499,21 @@ impl MePool {
reg.clone(), reg.clone(),
BytesMut::new(), BytesMut::new(),
BytesMut::new(), BytesMut::new(),
rpc_w.clone(), tx.clone(),
ping_tracker.clone(), ping_tracker.clone(),
rtt_stats.clone(), rtt_stats.clone(),
writer_id, writer_id,
degraded.clone(), degraded.clone(),
cancel_reader.clone(), cancel_reader_token.clone(),
) )
.await; .await;
if let Some(pool) = pool.upgrade() { if let Some(pool) = pool.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await; if cleanup_for_reader
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
}
} }
if let Err(e) = res { if let Err(e) = res {
warn!(error = %e, "ME reader ended"); warn!(error = %e, "ME reader ended");
@ -378,7 +531,7 @@ impl MePool {
.random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS); .random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64; let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
tokio::select! { tokio::select! {
_ = cancel_ping.cancelled() => { _ = cancel_ping_token.cancelled() => {
break; break;
} }
_ = tokio::time::sleep(Duration::from_secs(wait)) => {} _ = tokio::time::sleep(Duration::from_secs(wait)) => {}
@ -389,20 +542,45 @@ impl MePool {
p.extend_from_slice(&sent_id.to_le_bytes()); p.extend_from_slice(&sent_id.to_le_bytes());
{ {
let mut tracker = ping_tracker_ping.lock().await; let mut tracker = ping_tracker_ping.lock().await;
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
tracker.insert(sent_id, (std::time::Instant::now(), writer_id)); tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
} }
ping_id = ping_id.wrapping_add(1); ping_id = ping_id.wrapping_add(1);
if let Err(e) = rpc_w_ping.lock().await.send_and_flush(&p).await { if tx_ping.send(WriterCommand::DataAndFlush(p)).await.is_err() {
debug!(error = %e, "Active ME ping failed, removing dead writer"); debug!("Active ME ping failed, removing dead writer");
cancel_ping.cancel(); cancel_ping.cancel();
if let Some(pool) = pool_ping.upgrade() { if let Some(pool) = pool_ping.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await; if cleanup_for_ping
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
}
} }
break; break;
} }
} }
}); });
if keepalive_enabled {
let tx_keepalive = tx_for_keepalive;
let cancel_keepalive = cancel_keepalive_token;
tokio::spawn(async move {
// Per-writer jittered start to avoid phase sync.
let initial_jitter_ms = rand::rng().random_range(0..=keepalive_jitter.as_millis().max(1) as u64);
tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await;
loop {
tokio::select! {
_ = cancel_keepalive.cancelled() => break,
_ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=keepalive_jitter.as_millis() as u64))) => {}
}
if tx_keepalive.send(WriterCommand::Keepalive).await.is_err() {
break;
}
}
});
}
Ok(()) Ok(())
} }
@ -430,7 +608,7 @@ impl MePool {
false false
} }
pub(crate) async fn remove_writer_and_close_clients(&self, writer_id: u64) { pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
let conns = self.remove_writer_only(writer_id).await; let conns = self.remove_writer_only(writer_id).await;
for bound in conns { for bound in conns {
let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await; let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await;
@ -444,8 +622,11 @@ impl MePool {
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) { if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
let w = ws.remove(pos); let w = ws.remove(pos);
w.cancel.cancel(); w.cancel.cancel();
let _ = w.tx.send(WriterCommand::Close).await;
self.conn_count.fetch_sub(1, Ordering::Relaxed);
} }
} }
self.rtt_stats.lock().await.remove(&writer_id);
self.registry.writer_lost(writer_id).await self.registry.writer_lost(writer_id).await
} }
@ -459,8 +640,14 @@ impl MePool {
let pool = Arc::downgrade(self); let pool = Arc::downgrade(self);
tokio::spawn(async move { tokio::spawn(async move {
let deadline = Instant::now() + Duration::from_secs(300);
loop { loop {
if let Some(p) = pool.upgrade() { if let Some(p) = pool.upgrade() {
if Instant::now() >= deadline {
warn!(writer_id, "Drain timeout, force-closing");
let _ = p.remove_writer_and_close_clients(writer_id).await;
break;
}
if p.registry.is_writer_empty(writer_id).await { if p.registry.is_writer_empty(writer_id).await {
let _ = p.remove_writer_only(writer_id).await; let _ = p.remove_writer_only(writer_id).await;
break; break;

View File

@ -98,16 +98,18 @@ impl MePool {
family: IpFamily, family: IpFamily,
) -> Option<std::net::SocketAddr> { ) -> Option<std::net::SocketAddr> {
const STUN_CACHE_TTL: Duration = Duration::from_secs(600); const STUN_CACHE_TTL: Duration = Duration::from_secs(600);
// If STUN probing was disabled after attempts, reuse cached (even stale) or skip. // Backoff window
if self.nat_probe_disabled.load(std::sync::atomic::Ordering::Relaxed) { if let Some(until) = *self.stun_backoff_until.read().await {
if let Ok(cache) = self.nat_reflection_cache.try_lock() { if Instant::now() < until {
let slot = match family { if let Ok(cache) = self.nat_reflection_cache.try_lock() {
IpFamily::V4 => cache.v4, let slot = match family {
IpFamily::V6 => cache.v6, IpFamily::V4 => cache.v4,
}; IpFamily::V6 => cache.v6,
return slot.map(|(_, addr)| addr); };
return slot.map(|(_, addr)| addr);
}
return None;
} }
return None;
} }
if let Ok(mut cache) = self.nat_reflection_cache.try_lock() { if let Ok(mut cache) = self.nat_reflection_cache.try_lock() {
@ -123,48 +125,42 @@ impl MePool {
} }
let attempt = self.nat_probe_attempts.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let attempt = self.nat_probe_attempts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if attempt >= 2 { let servers = if !self.nat_stun_servers.is_empty() {
self.nat_probe_disabled.store(true, std::sync::atomic::Ordering::Relaxed); self.nat_stun_servers.clone()
return None; } else if let Some(s) = &self.nat_stun {
} vec![s.clone()]
} else {
vec!["stun.l.google.com:19302".to_string()]
};
let stun_addr = self for stun_addr in servers {
.nat_stun match stun_probe_dual(&stun_addr).await {
.clone() Ok(res) => {
.unwrap_or_else(|| "stun.l.google.com:19302".to_string()); let picked: Option<StunProbeResult> = match family {
match stun_probe_dual(&stun_addr).await { IpFamily::V4 => res.v4,
Ok(res) => { IpFamily::V6 => res.v6,
let picked: Option<StunProbeResult> = match family { };
IpFamily::V4 => res.v4, if let Some(result) = picked {
IpFamily::V6 => res.v6, info!(local = %result.local_addr, reflected = %result.reflected_addr, family = ?family, stun = %stun_addr, "NAT probe: reflected address");
}; self.nat_probe_attempts.store(0, std::sync::atomic::Ordering::Relaxed);
if let Some(result) = picked { if let Ok(mut cache) = self.nat_reflection_cache.try_lock() {
info!(local = %result.local_addr, reflected = %result.reflected_addr, family = ?family, "NAT probe: reflected address"); let slot = match family {
if let Ok(mut cache) = self.nat_reflection_cache.try_lock() { IpFamily::V4 => &mut cache.v4,
let slot = match family { IpFamily::V6 => &mut cache.v6,
IpFamily::V4 => &mut cache.v4, };
IpFamily::V6 => &mut cache.v6, *slot = Some((Instant::now(), result.reflected_addr));
}; }
*slot = Some((Instant::now(), result.reflected_addr)); return Some(result.reflected_addr);
} }
Some(result.reflected_addr)
} else {
None
} }
} Err(e) => {
Err(e) => { warn!(error = %e, stun = %stun_addr, attempt = attempt + 1, "NAT probe failed, trying next server");
let attempts = attempt + 1;
if attempts <= 2 {
warn!(error = %e, attempt = attempts, "NAT probe failed");
} else {
debug!(error = %e, attempt = attempts, "NAT probe suppressed after max attempts");
} }
if attempts >= 2 {
self.nat_probe_disabled.store(true, std::sync::atomic::Ordering::Relaxed);
}
None
} }
} }
let backoff = Duration::from_secs(60 * 2u64.pow((attempt as u32).min(6)));
*self.stun_backoff_until.write().await = Some(Instant::now() + backoff);
None
} }
} }

View File

@ -6,7 +6,7 @@ use std::time::Instant;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::sync::Mutex; use tokio::sync::{Mutex, mpsc};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn}; use tracing::{debug, trace, warn};
@ -14,7 +14,7 @@ use crate::crypto::{AesCbc, crc32};
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::protocol::constants::*; use crate::protocol::constants::*;
use super::codec::RpcWriter; use super::codec::WriterCommand;
use super::{ConnRegistry, MeResponse}; use super::{ConnRegistry, MeResponse};
pub(crate) async fn reader_loop( pub(crate) async fn reader_loop(
@ -24,7 +24,7 @@ pub(crate) async fn reader_loop(
reg: Arc<ConnRegistry>, reg: Arc<ConnRegistry>,
enc_leftover: BytesMut, enc_leftover: BytesMut,
mut dec: BytesMut, mut dec: BytesMut,
writer: Arc<Mutex<RpcWriter>>, tx: mpsc::Sender<WriterCommand>,
ping_tracker: Arc<Mutex<HashMap<i64, (Instant, u64)>>>, ping_tracker: Arc<Mutex<HashMap<i64, (Instant, u64)>>>,
rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>, rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>,
_writer_id: u64, _writer_id: u64,
@ -33,6 +33,8 @@ pub(crate) async fn reader_loop(
) -> Result<()> { ) -> Result<()> {
let mut raw = enc_leftover; let mut raw = enc_leftover;
let mut expected_seq: i32 = 0; let mut expected_seq: i32 = 0;
let mut crc_errors = 0u32;
let mut seq_mismatch = 0u32;
loop { loop {
let mut tmp = [0u8; 16_384]; let mut tmp = [0u8; 16_384];
@ -80,12 +82,20 @@ pub(crate) async fn reader_loop(
let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap()); let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap());
if crc32(&frame[..pe]) != ec { if crc32(&frame[..pe]) != ec {
warn!("CRC mismatch in data frame"); warn!("CRC mismatch in data frame");
crc_errors += 1;
if crc_errors > 3 {
return Err(ProxyError::Proxy("Too many CRC mismatches".into()));
}
continue; continue;
} }
let seq_no = i32::from_le_bytes(frame[4..8].try_into().unwrap()); let seq_no = i32::from_le_bytes(frame[4..8].try_into().unwrap());
if seq_no != expected_seq { if seq_no != expected_seq {
warn!(seq_no, expected = expected_seq, "ME RPC seq mismatch"); warn!(seq_no, expected = expected_seq, "ME RPC seq mismatch");
seq_mismatch += 1;
if seq_mismatch > 10 {
return Err(ProxyError::Proxy("Too many seq mismatches".into()));
}
expected_seq = seq_no.wrapping_add(1); expected_seq = seq_no.wrapping_add(1);
} else { } else {
expected_seq = expected_seq.wrapping_add(1); expected_seq = expected_seq.wrapping_add(1);
@ -108,7 +118,7 @@ pub(crate) async fn reader_loop(
let routed = reg.route(cid, MeResponse::Data { flags, data }).await; let routed = reg.route(cid, MeResponse::Data { flags, data }).await;
if !routed { if !routed {
reg.unregister(cid).await; reg.unregister(cid).await;
send_close_conn(&writer, cid).await; send_close_conn(&tx, cid).await;
} }
} else if pt == RPC_SIMPLE_ACK_U32 && body.len() >= 12 { } else if pt == RPC_SIMPLE_ACK_U32 && body.len() >= 12 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
@ -118,7 +128,7 @@ pub(crate) async fn reader_loop(
let routed = reg.route(cid, MeResponse::Ack(cfm)).await; let routed = reg.route(cid, MeResponse::Ack(cfm)).await;
if !routed { if !routed {
reg.unregister(cid).await; reg.unregister(cid).await;
send_close_conn(&writer, cid).await; send_close_conn(&tx, cid).await;
} }
} else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 { } else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
@ -136,8 +146,8 @@ pub(crate) async fn reader_loop(
let mut pong = Vec::with_capacity(12); let mut pong = Vec::with_capacity(12);
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes()); pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
pong.extend_from_slice(&ping_id.to_le_bytes()); pong.extend_from_slice(&ping_id.to_le_bytes());
if let Err(e) = writer.lock().await.send_and_flush(&pong).await { if tx.send(WriterCommand::DataAndFlush(pong)).await.is_err() {
warn!(error = %e, "PONG send failed"); warn!("PONG send failed");
break; break;
} }
} else if pt == RPC_PONG_U32 && body.len() >= 8 { } else if pt == RPC_PONG_U32 && body.len() >= 8 {
@ -171,12 +181,10 @@ pub(crate) async fn reader_loop(
} }
} }
async fn send_close_conn(writer: &Arc<Mutex<RpcWriter>>, conn_id: u64) { async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) {
let mut p = Vec::with_capacity(12); let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes()); p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes());
if let Err(e) = writer.lock().await.send_and_flush(&p).await { let _ = tx.send(WriterCommand::DataAndFlush(p)).await;
debug!(conn_id, error = %e, "Failed to send RPC_CLOSE_CONN");
}
} }

View File

@ -5,7 +5,7 @@ use std::sync::Arc;
use tokio::sync::{mpsc, Mutex, RwLock}; use tokio::sync::{mpsc, Mutex, RwLock};
use super::codec::RpcWriter; use super::codec::WriterCommand;
use super::MeResponse; use super::MeResponse;
#[derive(Clone)] #[derive(Clone)]
@ -25,12 +25,12 @@ pub struct BoundConn {
#[derive(Clone)] #[derive(Clone)]
pub struct ConnWriter { pub struct ConnWriter {
pub writer_id: u64, pub writer_id: u64,
pub writer: Arc<Mutex<RpcWriter>>, pub tx: mpsc::Sender<WriterCommand>,
} }
struct RegistryInner { struct RegistryInner {
map: HashMap<u64, mpsc::Sender<MeResponse>>, map: HashMap<u64, mpsc::Sender<MeResponse>>,
writers: HashMap<u64, Arc<Mutex<RpcWriter>>>, writers: HashMap<u64, mpsc::Sender<WriterCommand>>,
writer_for_conn: HashMap<u64, u64>, writer_for_conn: HashMap<u64, u64>,
conns_for_writer: HashMap<u64, HashSet<u64>>, conns_for_writer: HashMap<u64, HashSet<u64>>,
meta: HashMap<u64, ConnMeta>, meta: HashMap<u64, ConnMeta>,
@ -96,13 +96,13 @@ impl ConnRegistry {
&self, &self,
conn_id: u64, conn_id: u64,
writer_id: u64, writer_id: u64,
writer: Arc<Mutex<RpcWriter>>, tx: mpsc::Sender<WriterCommand>,
meta: ConnMeta, meta: ConnMeta,
) { ) {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
inner.meta.entry(conn_id).or_insert(meta); inner.meta.entry(conn_id).or_insert(meta);
inner.writer_for_conn.insert(conn_id, writer_id); inner.writer_for_conn.insert(conn_id, writer_id);
inner.writers.entry(writer_id).or_insert_with(|| writer.clone()); inner.writers.entry(writer_id).or_insert_with(|| tx.clone());
inner inner
.conns_for_writer .conns_for_writer
.entry(writer_id) .entry(writer_id)
@ -114,7 +114,7 @@ impl ConnRegistry {
let inner = self.inner.read().await; let inner = self.inner.read().await;
let writer_id = inner.writer_for_conn.get(&conn_id).cloned()?; let writer_id = inner.writer_for_conn.get(&conn_id).cloned()?;
let writer = inner.writers.get(&writer_id).cloned()?; let writer = inner.writers.get(&writer_id).cloned()?;
Some(ConnWriter { writer_id, writer }) Some(ConnWriter { writer_id, tx: writer })
} }
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> { pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {

View File

@ -31,8 +31,17 @@ pub async fn me_rotation_task(pool: Arc<MePool>, rng: Arc<SecureRandom>, interva
info!(addr = %w.addr, writer_id = w.id, "Rotating ME connection"); info!(addr = %w.addr, writer_id = w.id, "Rotating ME connection");
match pool.connect_one(w.addr, rng.as_ref()).await { match pool.connect_one(w.addr, rng.as_ref()).await {
Ok(()) => { Ok(()) => {
// Mark old writer for graceful drain; removal happens when sessions finish. tokio::time::sleep(Duration::from_secs(2)).await;
pool.mark_writer_draining(w.id).await; let ws = pool.writers.read().await;
let new_alive = ws.iter().any(|nw|
nw.id != w.id && nw.addr == w.addr && !nw.degraded.load(Ordering::Relaxed) && !nw.draining.load(Ordering::Relaxed)
);
drop(ws);
if new_alive {
pool.mark_writer_draining(w.id).await;
} else {
warn!(addr = %w.addr, writer_id = w.id, "New writer died, keeping old");
}
} }
Err(e) => { Err(e) => {
warn!(addr = %w.addr, writer_id = w.id, error = %e, "ME rotation connect failed"); warn!(addr = %w.addr, writer_id = w.id, error = %e, "ME rotation connect failed");

View File

@ -10,6 +10,7 @@ use crate::network::IpFamily;
use crate::protocol::constants::RPC_CLOSE_EXT_U32; use crate::protocol::constants::RPC_CLOSE_EXT_U32;
use super::MePool; use super::MePool;
use super::codec::WriterCommand;
use super::wire::build_proxy_req_payload; use super::wire::build_proxy_req_payload;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use super::registry::ConnMeta; use super::registry::ConnMeta;
@ -43,18 +44,15 @@ impl MePool {
loop { loop {
if let Some(current) = self.registry.get_writer(conn_id).await { if let Some(current) = self.registry.get_writer(conn_id).await {
let send_res = { let send_res = {
if let Ok(mut guard) = current.writer.try_lock() { current
let r = guard.send(&payload).await; .tx
drop(guard); .send(WriterCommand::Data(payload.clone()))
r .await
} else {
current.writer.lock().await.send(&payload).await
}
}; };
match send_res { match send_res {
Ok(()) => return Ok(()), Ok(()) => return Ok(()),
Err(e) => { Err(_) => {
warn!(error = %e, writer_id = current.writer_id, "ME write failed"); warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await; self.remove_writer_and_close_clients(current.writer_id).await;
continue; continue;
} }
@ -64,7 +62,26 @@ impl MePool {
let mut writers_snapshot = { let mut writers_snapshot = {
let ws = self.writers.read().await; let ws = self.writers.read().await;
if ws.is_empty() { if ws.is_empty() {
return Err(ProxyError::Proxy("All ME connections dead".into())); drop(ws);
for family in self.family_order() {
let map = match family {
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
};
for (_dc, addrs) in map.iter() {
for (ip, port) in addrs {
let addr = SocketAddr::new(*ip, *port);
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
self.writer_available.notify_waiters();
break;
}
}
}
}
if tokio::time::timeout(Duration::from_secs(3), self.writer_available.notified()).await.is_err() {
return Err(ProxyError::Proxy("All ME connections dead (waited 3s)".into()));
}
continue;
} }
ws.clone() ws.clone()
}; };
@ -96,9 +113,10 @@ impl MePool {
writers_snapshot = ws2.clone(); writers_snapshot = ws2.clone();
drop(ws2); drop(ws2);
candidate_indices = self.candidate_indices_for_dc(&writers_snapshot, target_dc).await; candidate_indices = self.candidate_indices_for_dc(&writers_snapshot, target_dc).await;
break; if !candidate_indices.is_empty() {
break;
}
} }
drop(map_guard);
} }
if candidate_indices.is_empty() { if candidate_indices.is_empty() {
return Err(ProxyError::Proxy("No ME writers available for target DC".into())); return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
@ -120,22 +138,15 @@ impl MePool {
if w.draining.load(Ordering::Relaxed) { if w.draining.load(Ordering::Relaxed) {
continue; continue;
} }
if let Ok(mut guard) = w.writer.try_lock() { if w.tx.send(WriterCommand::Data(payload.clone())).await.is_ok() {
let send_res = guard.send(&payload).await; self.registry
drop(guard); .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
match send_res { .await;
Ok(()) => { return Ok(());
self.registry } else {
.bind_writer(conn_id, w.id, w.writer.clone(), meta.clone()) warn!(writer_id = w.id, "ME writer channel closed");
.await; self.remove_writer_and_close_clients(w.id).await;
return Ok(()); continue;
}
Err(e) => {
warn!(error = %e, writer_id = w.id, "ME write failed");
self.remove_writer_and_close_clients(w.id).await;
continue;
}
}
} }
} }
@ -143,15 +154,15 @@ impl MePool {
if w.draining.load(Ordering::Relaxed) { if w.draining.load(Ordering::Relaxed) {
continue; continue;
} }
match w.writer.lock().await.send(&payload).await { match w.tx.send(WriterCommand::Data(payload.clone())).await {
Ok(()) => { Ok(()) => {
self.registry self.registry
.bind_writer(conn_id, w.id, w.writer.clone(), meta.clone()) .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
.await; .await;
return Ok(()); return Ok(());
} }
Err(e) => { Err(_) => {
warn!(error = %e, writer_id = w.id, "ME write failed (blocking)"); warn!(writer_id = w.id, "ME writer channel closed (blocking)");
self.remove_writer_and_close_clients(w.id).await; self.remove_writer_and_close_clients(w.id).await;
} }
} }
@ -163,8 +174,8 @@ impl MePool {
let mut p = Vec::with_capacity(12); let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes());
if let Err(e) = w.writer.lock().await.send_and_flush(&p).await { if w.tx.send(WriterCommand::DataAndFlush(p)).await.is_err() {
debug!(error = %e, "ME close write failed"); debug!("ME close write failed");
self.remove_writer_and_close_clients(w.writer_id).await; self.remove_writer_and_close_clients(w.writer_id).await;
} }
} else { } else {
@ -176,7 +187,7 @@ impl MePool {
} }
pub fn connection_count(&self) -> usize { pub fn connection_count(&self) -> usize {
self.writers.try_read().map(|w| w.len()).unwrap_or(0) self.conn_count.load(Ordering::Relaxed)
} }
pub(super) async fn candidate_indices_for_dc( pub(super) async fn candidate_indices_for_dc(

View File

@ -167,20 +167,44 @@ impl UpstreamManager {
} }
/// Select upstream using latency-weighted random selection. /// Select upstream using latency-weighted random selection.
async fn select_upstream(&self, dc_idx: Option<i16>) -> Option<usize> { async fn select_upstream(&self, dc_idx: Option<i16>, scope: Option<&str>) -> Option<usize> {
let upstreams = self.upstreams.read().await; let upstreams = self.upstreams.read().await;
if upstreams.is_empty() { if upstreams.is_empty() {
return None; return None;
} }
// Scope filter:
let healthy: Vec<usize> = upstreams.iter() // If scope is set: only scoped and matched items
// If scope is not set: only unscoped items
let filtered_upstreams : Vec<usize> = upstreams.iter()
.enumerate() .enumerate()
.filter(|(_, u)| u.healthy) .filter(|(_, u)| {
scope.map_or(
u.config.scopes.is_empty(),
|req_scope| {
u.config.scopes
.split(',')
.map(str::trim)
.any(|s| s == req_scope)
}
)
})
.map(|(i, _)| i) .map(|(i, _)| i)
.collect(); .collect();
// Healthy filter
let healthy: Vec<usize> = filtered_upstreams.iter()
.filter(|&&i| upstreams[i].healthy)
.copied()
.collect();
if filtered_upstreams.is_empty() {
warn!(scope = scope, "No upstreams available! Using first (direct?)");
return None;
}
if healthy.is_empty() { if healthy.is_empty() {
return Some(rand::rng().gen_range(0..upstreams.len())); warn!(scope = scope, "No healthy upstreams available! Using random.");
return Some(filtered_upstreams[rand::rng().gen_range(0..filtered_upstreams.len())]);
} }
if healthy.len() == 1 { if healthy.len() == 1 {
@ -222,15 +246,20 @@ impl UpstreamManager {
} }
/// Connect to target through a selected upstream. /// Connect to target through a selected upstream.
pub async fn connect(&self, target: SocketAddr, dc_idx: Option<i16>) -> Result<TcpStream> { pub async fn connect(&self, target: SocketAddr, dc_idx: Option<i16>, scope: Option<&str>) -> Result<TcpStream> {
let idx = self.select_upstream(dc_idx).await let idx = self.select_upstream(dc_idx, scope).await
.ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?; .ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?;
let upstream = { let mut upstream = {
let guard = self.upstreams.read().await; let guard = self.upstreams.read().await;
guard[idx].config.clone() guard[idx].config.clone()
}; };
// Set scope for configuration copy
if let Some(s) = scope {
upstream.selected_scope = s.to_string();
}
let start = Instant::now(); let start = Instant::now();
match self.connect_via_upstream(&upstream, target).await { match self.connect_via_upstream(&upstream, target).await {
@ -313,8 +342,12 @@ impl UpstreamManager {
if let Some(e) = stream.take_error()? { if let Some(e) = stream.take_error()? {
return Err(ProxyError::Io(e)); return Err(ProxyError::Io(e));
} }
// replace socks user_id with config.selected_scope, if set
let scope: Option<&str> = Some(config.selected_scope.as_str())
.filter(|s| !s.is_empty());
let _user_id: Option<&str> = scope.or(user_id.as_deref());
connect_socks4(&mut stream, target, user_id.as_deref()).await?; connect_socks4(&mut stream, target, _user_id).await?;
Ok(stream) Ok(stream)
}, },
UpstreamType::Socks5 { address, interface, username, password } => { UpstreamType::Socks5 { address, interface, username, password } => {
@ -341,7 +374,14 @@ impl UpstreamManager {
return Err(ProxyError::Io(e)); return Err(ProxyError::Io(e));
} }
connect_socks5(&mut stream, target, username.as_deref(), password.as_deref()).await?; debug!(config = ?config, "Socks5 connection");
// replace socks user:pass with config.selected_scope, if set
let scope: Option<&str> = Some(config.selected_scope.as_str())
.filter(|s| !s.is_empty());
let _username: Option<&str> = scope.or(username.as_deref());
let _password: Option<&str> = scope.or(password.as_deref());
connect_socks5(&mut stream, target, _username, _password).await?;
Ok(stream) Ok(stream)
}, },
} }