Compare commits

...

37 Commits
3.0.3 ... 3.0.4

Author SHA1 Message Date
Alexey
f7a7fb94d4 Update release.yml 2026-02-19 16:59:29 +03:00
Alexey
85fff5e30a Update Cargo.toml 2026-02-19 16:48:26 +03:00
Alexey
fc28c1ad88 Update Cargo.toml 2026-02-19 16:30:04 +03:00
Alexey
bb87a37686 Update config.toml 2026-02-19 16:19:58 +03:00
Alexey
bf2da8f5d8 Merge pull request #165 from telemt/flow
ME Healthcheck + Keepalives + Concurrency
2026-02-19 16:12:01 +03:00
Alexey
2926b9f5c8 ME Concurrency
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-19 16:02:50 +03:00
Alexey
820ed8d346 ME Keepalives
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-19 15:49:35 +03:00
Alexey
e340b716b2 Drafting ME Healthcheck
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-19 15:39:30 +03:00
Alexey
9edbbb692e Merge pull request #164 from telemt/flow
ME Pool V2 - Healthcheck + Pool rebuild
2026-02-19 14:33:23 +03:00
Alexey
356d64371a Merge branch 'flow' of https://github.com/telemt/telemt into flow 2026-02-19 14:25:45 +03:00
Alexey
4be4670668 ME Pool V2 - Agressive Healthcheck and Pool Rebuild
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-19 14:25:39 +03:00
Alexey
0768fee06a Merge pull request #162 from telemt/flow
ME Pool V2
2026-02-19 13:42:03 +03:00
Alexey
35ae455e2b ME Pool V2
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-19 13:35:56 +03:00
Alexey
433e6c9a20 Merge pull request #157 from vladon/ci/add-musl-build-targets
ci: add musl build targets for static Linux binaries
2026-02-19 13:14:07 +03:00
Alexey
34f5289fc3 Merge pull request #159 from vladon/feat/version-flag
feat: Add -V/--version flag to print version string
2026-02-19 13:13:51 +03:00
Alexey
97804d47ff Merge pull request #158 from vladon/docs/disable_colors
docs: Document disable_colors configuration parameter
2026-02-19 12:35:38 +03:00
Alexey
b68e9d642e Merge pull request #154 from ivulit/fix/stun-ipv6-enetunreach
Handle IPv6 ENETUNREACH in STUN probe gracefully
2026-02-19 12:35:22 +03:00
Vladislav Yaroslavlev
f31d9d42fe feat: Add -V/--version flag to print version string
Closes #156

- Add handling for -V and --version arguments in CLI parser
- Print version to stdout using CARGO_PKG_VERSION from Cargo.toml
- Update help text to include version option
2026-02-19 10:23:49 +03:00
Vladislav Yaroslavlev
d941873cce docs: Document disable_colors configuration parameter 2026-02-19 10:15:03 +03:00
Vladislav Yaroslavlev
b11a767741 ci: add musl build targets for static Linux binaries 2026-02-19 09:43:31 +03:00
Alexey
301f829c3c Update LICENSING.md 2026-02-19 03:00:47 +03:00
Alexey
76a02610d8 Create LICENSING.md
Drafting licensing...
2026-02-19 03:00:04 +03:00
Alexey
76bf5337e8 Update CONTRIBUTING.md 2026-02-19 02:49:38 +03:00
Alexey
e76b388a05 Create CONTRIBUTING.md 2026-02-19 02:49:08 +03:00
Alexey
f37e6cbe29 Merge pull request #155 from unuunn/feat/scoped-routing
feat: implement selective routing for "scope_*" users
2026-02-19 02:19:42 +03:00
ivulit
e54dce5366 Handle IPv6 ENETUNREACH in STUN probe gracefully
When IPv6 is unavailable on the host, treat NetworkUnreachable at
connect() as Ok(None) instead of propagating an error, so the dual
STUN probe succeeds with just the IPv4 result and no spurious WARN.
2026-02-19 00:27:19 +03:00
unuunn
c7464d53e1 feat: implement selective routing for "scope_*" users
- Users with "scope_{name}" prefix are routed to upstreams where {name}
  is present in the "scopes" property (comma-separated).
- Strict separation: Scoped upstreams are excluded from general routing, and vice versa.
- Constraint: SOCKS upstreams and DIRECT(`use_middle_proxy =
false`) mode only.

Example:
  User "scope_hello" matches an upstream with `scopes = "world,hello"`
2026-02-18 23:29:08 +03:00
Alexey
03a6493147 Merge pull request #153 from vladon/fix/release-changes-package-version
release changes package version
2026-02-18 23:23:04 +03:00
Vladislav Yaroslavlev
36ef2f722d release changes package version 2026-02-18 22:46:45 +03:00
Alexey
b9fda9e2c2 Merge pull request #151 from vladon/fix-ci2
fix(ci) 2nd try
2026-02-18 22:34:30 +03:00
Vladislav Yaroslavlev
c5b590062c fix(ci): replace deprecated actions-rs/cargo with direct cross commands
The actions-rs organization has been archived and is no longer available.
Replace the deprecated action with direct cross installation and build commands.
2026-02-18 22:10:17 +03:00
Alexey
c0357b2890 Merge pull request #149 from vladon/fix/ci-deprecated-actions-rs
fix(ci): replace deprecated actions-rs/cargo with direct cross commands
2026-02-18 22:02:16 +03:00
Vladislav Yaroslavlev
4f7f7d6880 fix(ci): replace deprecated actions-rs/cargo with direct cross commands
The actions-rs organization has been archived and is no longer available.
Replace the deprecated action with direct cross installation and build commands.
2026-02-18 21:49:42 +03:00
Alexey
efba10f839 Update README.md 2026-02-18 21:34:04 +03:00
Alexey
6ba12f35d0 Update README.md 2026-02-18 21:31:58 +03:00
Alexey
6a57c23700 Update README.md 2026-02-18 20:56:03 +03:00
Alexey
94b85afbc5 Update Cargo.toml 2026-02-18 20:25:17 +03:00
25 changed files with 839 additions and 317 deletions

View File

@@ -25,10 +25,16 @@ jobs:
include:
- target: x86_64-unknown-linux-gnu
artifact_name: telemt
asset_name: telemt-x86_64-linux
asset_name: telemt-x86_64-linux-gnu
- target: aarch64-unknown-linux-gnu
artifact_name: telemt
asset_name: telemt-aarch64-linux
asset_name: telemt-aarch64-linux-gnu
- target: x86_64-unknown-linux-musl
artifact_name: telemt
asset_name: telemt-x86_64-linux-musl
- target: aarch64-unknown-linux-musl
artifact_name: telemt
asset_name: telemt-aarch64-linux-musl
steps:
- name: Checkout repository
@@ -56,12 +62,13 @@ jobs:
restore-keys: |
${{ runner.os }}-${{ matrix.target }}-cargo-
- name: Install cross
run: cargo install cross --git https://github.com/cross-rs/cross
- name: Build Release
uses: actions-rs/cargo@ae10961054e4aa8bff448f48a500763b90d5c550 # v1.0.1
with:
use-cross: true
command: build
args: --release --target ${{ matrix.target }}
env:
RUSTFLAGS: ${{ contains(matrix.target, 'musl') && '-C target-feature=+crt-static' || '' }}
run: cross build --release --target ${{ matrix.target }}
- name: Package binary
run: |
@@ -85,13 +92,42 @@ jobs:
contents: write
steps:
- name: Checkout repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
token: ${{ secrets.GITHUB_TOKEN }}
- name: Download all artifacts
uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
path: artifacts
- name: Update version in Cargo.toml and Cargo.lock
run: |
# Extract version from tag (remove 'v' prefix if present)
VERSION="${GITHUB_REF#refs/tags/}"
VERSION="${VERSION#v}"
# Install cargo-edit for version bumping
cargo install cargo-edit
# Update Cargo.toml version
cargo set-version "$VERSION"
# Configure git
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
# Commit and push changes
#git add Cargo.toml Cargo.lock
#git commit -m "chore: bump version to $VERSION" || echo "No changes to commit"
#git push origin HEAD:main
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Create Release
uses: softprops/action-gh-release@c95fe1489396fe360a41fb53f90de6ddce8c4c8a # v2.2.1
uses: softprops/action-gh-release@v2
with:
files: artifacts/**/*
generate_release_notes: true

5
CONTRIBUTING.md Normal file
View File

@@ -0,0 +1,5 @@
## Pull Requests - Rules
- ONLY signed and verified commits
- ONLY from your name
- DO NOT commit with `codex` or `claude` as author/commiter
- PREFER `flow` branch for development, not `main`

View File

@@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.0.3"
version = "3.0.4"
edition = "2024"
[dependencies]

17
LICENSING.md Normal file
View File

@@ -0,0 +1,17 @@
# LICENSING
## Licenses for Versions
| Version | License |
|---------|---------------|
| 1.0 | NO LICNESE |
| 1.1 | NO LICENSE |
| 1.2 | NO LICENSE |
| 2.0 | NO LICENSE |
| 3.0 | TELEMT UL 1 |
### License Types
- **NO LICENSE** = ***ALL RIGHT RESERVED***
- **TELEMT UL1** - work in progress license for source code of `telemt`, which encourages:
- fair use,
- contributions,
- distribution,
- but prohibits NOT mentioning the authors

View File

@@ -10,74 +10,40 @@
### 🇷🇺 RU
15 февраля мы опубликовали `telemt 3` с поддержкой Middle-End Proxy, а значит:
18 февраля мы опубликовали `telemt 3.0.3`, он имеет:
- с функциональными медиа, в том числе с CDN/DC=203
- с Ad-tag — показывайте спонсорский канал и собирайте статистику через официального бота
- с новым подходом к безопасности и асинхронности
- с высокоточной диагностикой криптографии через `ME_DIAG`
- улучшенный механизм Middle-End Health Check
- высокоскоростное восстановление инициализации Middle-End
- меньше задержек на hot-path
- более корректную работу в Dualstack, а именно - IPv6 Middle-End
- аккуратное переподключение клиента без дрифта сессий между Middle-End
- автоматическая деградация на Direct-DC при массовой (>2 ME-DC-групп) недоступности Middle-End
- автодетект IP за NAT, при возможности - будет выполнен хендшейк с ME, при неудаче - автодеградация
- единственный известный специальный DC=203 уже добавлен в код: медиа загружаются с CDN в Direct-DC режиме
Для использования нужно:
[Здесь вы можете найти релиз](https://github.com/telemt/telemt/releases/tag/3.0.3)
1. Версия `telemt` ≥3.0.0
2. Выполнение любого из наборов условий:
- публичный IP для исходящих соединений установлен на интерфейса инстанса с `telemt`
- ЛИБО
- вы используете NAT 1:1 + включили STUN-пробинг
3. В конфиге, в секции `[general]` указать:
```toml
use_middle_proxy = true
```
Если условия из пункта 1 не выполняются:
1. Выключите ME-режим:
- установите `use_middle_proxy = false`
- ЛИБО
- Middle-End Proxy будет выключен автоматически по таймауту, но это займёт больше времени при запуске
2. В конфиге, добавьте в конец:
```toml
[dc_overrides]
"203" = "91.105.192.100:443"
```
Если у вас есть компетенции в асинхронных сетевых приложениях, анализе трафика, реверс-инжиниринге или сетевых расследованиях — мы открыты к идеям и pull requests.
Если у вас есть компетенции в асинхронных сетевых приложениях, анализе трафика, реверс-инжиниринге или сетевых расследованиях - мы открыты к идеям и pull requests!
</td>
<td width="50%" valign="top">
### 🇬🇧 EN
On February 15, we released `telemt 3` with support for Middle-End Proxy, which means:
On February 18, we released `telemt 3.0.3`. This version introduces:
- functional media, including CDN/DC=203
- Ad-tag support promote a sponsored channel and collect statistics via Telegram bot
- new approach to security and asynchronicity
- high-precision cryptography diagnostics via `ME_DIAG`
- improved Middle-End Health Check method
- high-speed recovery of Middle-End init
- reduced latency on the hot path
- correct Dualstack support: proper handling of IPv6 Middle-End
- *clean* client reconnection without session "drift" between Middle-End
- automatic degradation to Direct-DC mode in case of large-scale (>2 ME-DC groups) Middle-End unavailability
- automatic public IP detection behind NAT; first - Middle-End handshake is performed, otherwise automatic degradation is applied
- known special DC=203 is now handled natively: media is delivered from the CDN via Direct-DC mode
To use this feature, the following requirements must be met:
1. `telemt` version ≥ 3.0.0
2. One of the following conditions satisfied:
- the instance running `telemt` has a public IP address assigned to its network interface for outbound connections
- OR
- you are using 1:1 NAT and have STUN probing enabled
3. In the config file, under the `[general]` section, specify:
```toml
use_middle_proxy = true
````
[Release is available here](https://github.com/telemt/telemt/releases/tag/3.0.3)
If the conditions from step 1 are not satisfied:
1. Disable Middle-End mode:
- set `use_middle_proxy = false`
- OR
- Middle-End Proxy will be disabled automatically after a timeout, but this will increase startup time
2. In the config file, add the following at the end:
```toml
[dc_overrides]
"203" = "91.105.192.100:443"
```
If you have expertise in asynchronous network applications, traffic analysis, reverse engineering, or network forensics — we welcome ideas, suggestions, and pull requests.
If you have expertise in asynchronous network applications, traffic analysis, reverse engineering, or network forensics - we welcome ideas and pull requests!
</td>
</tr>
@@ -86,7 +52,9 @@ If you have expertise in asynchronous network applications, traffic analysis, re
# Features
💥 The configuration structure has changed since version 1.1.0.0. change it in your environment!
⚓ Our implementation of **TLS-fronting** is one of the most deeply debugged, focused, advanced and *almost* **"behaviorally consistent to real"**: we are confident we have it right - [see evidence on our validation and traces](#recognizability-for-dpi-and-crawler)
⚓ Our implementation of **TLS-fronting** is one of the most deeply debugged, focused, advanced and *almost* **"behaviorally consistent to real"**: we are confident we have it right - [see evidence on our validation and traces](#recognizability-for-dpi-and-crawler)
⚓ Our ***Middle-End Pool*** is fastest by design in standard scenarios, compared to other implementations of connecting to the Middle-End Proxy: non dramatically, but usual
# GOTO
- [Features](#features)
@@ -215,6 +183,7 @@ prefer_ipv6 = false
fast_mode = true
use_middle_proxy = false
# ad_tag = "..."
# disable_colors = false # Disable colored output in logs (useful for files/systemd)
[network]
ipv4 = true
@@ -247,7 +216,9 @@ ip = "::"
# Users to show in the startup log (tg:// links)
[general.links]
show = ["hello"] # Users to show in the startup log (tg:// links)
show = ["hello"] # Only show links for user "hello"
# show = ["alice", "bob"] # Only show links for alice and bob
# show = "*" # Show links for all users
# public_host = "proxy.example.com" # Host (IP or domain) for tg:// links
# public_port = 443 # Port for tg:// links (default: server.port)

View File

@@ -5,6 +5,38 @@ prefer_ipv6 = false
fast_mode = true
use_middle_proxy = true
#ad_tag = "00000000000000000000000000000000"
# Path to proxy-secret binary (auto-downloaded if missing).
proxy_secret_path = "proxy-secret"
# === Middle Proxy (ME) ===
# Public IP override for ME KDF when behind NAT; leave unset to auto-detect.
#middle_proxy_nat_ip = "203.0.113.10"
# Enable STUN probing to discover public IP:port for ME.
middle_proxy_nat_probe = true
# Primary STUN server (host:port); defaults to Telegram STUN when empty.
middle_proxy_nat_stun = "stun.l.google.com:19302"
# Optional fallback STUN servers list.
middle_proxy_nat_stun_servers = ["stun1.l.google.com:19302", "stun2.l.google.com:19302"]
# Desired number of concurrent ME writers in pool.
middle_proxy_pool_size = 16
# Pre-initialized warm-standby ME connections kept idle.
middle_proxy_warm_standby = 8
# Ignore STUN/interface mismatch and keep ME enabled even if IP differs.
stun_iface_mismatch_ignore = false
# Keepalive padding frames - fl==4
me_keepalive_enabled = true
me_keepalive_interval_secs = 25 # Period between keepalives
me_keepalive_jitter_secs = 5 # Jitter added to interval
me_keepalive_payload_random = true # Randomize 4-byte payload (vs zeros)
# Stagger extra ME connections on warmup to de-phase lifecycles.
me_warmup_stagger_enabled = true
me_warmup_step_delay_ms = 500 # Base delay between extra connects
me_warmup_step_jitter_ms = 300 # Jitter for warmup delay
# Reconnect policy knobs.
me_reconnect_max_concurrent_per_dc = 1 # Parallel reconnects per DC - EXPERIMENTAL! UNSTABLE!
me_reconnect_backoff_base_ms = 500 # Backoff start
me_reconnect_backoff_cap_ms = 30000 # Backoff cap
me_reconnect_fast_retry_count = 11 # Quick retries before backoff
[network]
# Enable/disable families; ipv6 = true/false/auto(None)
@@ -50,10 +82,13 @@ show = ["hello"] # Users to show in the startup log (tg:// links)
# === Timeouts (in seconds) ===
[timeouts]
client_handshake = 15
client_handshake = 30
tg_connect = 10
client_keepalive = 60
client_ack = 300
# Quick ME reconnects for single-address DCs (count and per-attempt timeout, ms).
me_one_retry = 12
me_one_timeout_ms = 1200
# === Anti-Censorship & Masking ===
[censorship]

View File

@@ -74,6 +74,34 @@ pub(crate) fn default_unknown_dc_log_path() -> Option<String> {
Some("unknown-dc.txt".to_string())
}
pub(crate) fn default_pool_size() -> usize {
2
}
pub(crate) fn default_keepalive_interval() -> u64 {
25
}
pub(crate) fn default_keepalive_jitter() -> u64 {
5
}
pub(crate) fn default_warmup_step_delay_ms() -> u64 {
500
}
pub(crate) fn default_warmup_step_jitter_ms() -> u64 {
300
}
pub(crate) fn default_reconnect_backoff_base_ms() -> u64 {
500
}
pub(crate) fn default_reconnect_backoff_cap_ms() -> u64 {
30_000
}
// Custom deserializer helpers
#[derive(Deserialize)]

View File

@@ -11,6 +11,32 @@ use crate::error::{ProxyError, Result};
use super::defaults::*;
use super::types::*;
fn preprocess_includes(content: &str, base_dir: &Path, depth: u8) -> Result<String> {
if depth > 10 {
return Err(ProxyError::Config("Include depth > 10".into()));
}
let mut output = String::with_capacity(content.len());
for line in content.lines() {
let trimmed = line.trim();
if let Some(rest) = trimmed.strip_prefix("include") {
let rest = rest.trim();
if let Some(rest) = rest.strip_prefix('=') {
let path_str = rest.trim().trim_matches('"');
let resolved = base_dir.join(path_str);
let included = std::fs::read_to_string(&resolved)
.map_err(|e| ProxyError::Config(e.to_string()))?;
let included_dir = resolved.parent().unwrap_or(base_dir);
output.push_str(&preprocess_includes(&included, included_dir, depth + 1)?);
output.push('\n');
continue;
}
}
output.push_str(line);
output.push('\n');
}
Ok(output)
}
fn validate_network_cfg(net: &mut NetworkConfig) -> Result<()> {
if !net.ipv4 && matches!(net.ipv6, Some(false)) {
return Err(ProxyError::Config(
@@ -84,10 +110,12 @@ pub struct ProxyConfig {
impl ProxyConfig {
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self> {
let content =
std::fs::read_to_string(path).map_err(|e| ProxyError::Config(e.to_string()))?;
std::fs::read_to_string(&path).map_err(|e| ProxyError::Config(e.to_string()))?;
let base_dir = path.as_ref().parent().unwrap_or(Path::new("."));
let processed = preprocess_includes(&content, base_dir, 0)?;
let mut config: ProxyConfig =
toml::from_str(&content).map_err(|e| ProxyError::Config(e.to_string()))?;
toml::from_str(&processed).map_err(|e| ProxyError::Config(e.to_string()))?;
// Validate secrets.
for (user, secret) in &config.access.users {
@@ -151,8 +179,10 @@ impl ProxyConfig {
validate_network_cfg(&mut config.network)?;
// Random fake_cert_len.
config.censorship.fake_cert_len = rand::rng().gen_range(1024..4096);
// Random fake_cert_len only when default is in use.
if config.censorship.fake_cert_len == default_fake_cert_len() {
config.censorship.fake_cert_len = rand::rng().gen_range(1024..4096);
}
// Resolve listen_tcp: explicit value wins, otherwise auto-detect.
// If unix socket is set → TCP only when listen_addr_ipv4 or listeners are explicitly provided.
@@ -208,6 +238,8 @@ impl ProxyConfig {
upstream_type: UpstreamType::Direct { interface: None },
weight: 1,
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
});
}

View File

@@ -143,6 +143,62 @@ pub struct GeneralConfig {
#[serde(default)]
pub middle_proxy_nat_stun: Option<String>,
/// Optional list of STUN servers for NAT probing fallback.
#[serde(default)]
pub middle_proxy_nat_stun_servers: Vec<String>,
/// Desired size of active Middle-Proxy writer pool.
#[serde(default = "default_pool_size")]
pub middle_proxy_pool_size: usize,
/// Number of warm standby ME connections kept pre-initialized.
#[serde(default)]
pub middle_proxy_warm_standby: usize,
/// Enable ME keepalive padding frames.
#[serde(default = "default_true")]
pub me_keepalive_enabled: bool,
/// Keepalive interval in seconds.
#[serde(default = "default_keepalive_interval")]
pub me_keepalive_interval_secs: u64,
/// Keepalive jitter in seconds.
#[serde(default = "default_keepalive_jitter")]
pub me_keepalive_jitter_secs: u64,
/// Keepalive payload randomized (4 bytes); otherwise zeros.
#[serde(default = "default_true")]
pub me_keepalive_payload_random: bool,
/// Enable staggered warmup of extra ME writers.
#[serde(default = "default_true")]
pub me_warmup_stagger_enabled: bool,
/// Base delay between warmup connections in ms.
#[serde(default = "default_warmup_step_delay_ms")]
pub me_warmup_step_delay_ms: u64,
/// Jitter for warmup delay in ms.
#[serde(default = "default_warmup_step_jitter_ms")]
pub me_warmup_step_jitter_ms: u64,
/// Max concurrent reconnect attempts per DC.
#[serde(default)]
pub me_reconnect_max_concurrent_per_dc: u32,
/// Base backoff in ms for reconnect.
#[serde(default = "default_reconnect_backoff_base_ms")]
pub me_reconnect_backoff_base_ms: u64,
/// Cap backoff in ms for reconnect.
#[serde(default = "default_reconnect_backoff_cap_ms")]
pub me_reconnect_backoff_cap_ms: u64,
/// Fast retry attempts before backoff.
#[serde(default)]
pub me_reconnect_fast_retry_count: u32,
/// Ignore STUN/interface IP mismatch (keep using Middle Proxy even if NAT detected).
#[serde(default)]
pub stun_iface_mismatch_ignore: bool,
@@ -175,6 +231,20 @@ impl Default for GeneralConfig {
middle_proxy_nat_ip: None,
middle_proxy_nat_probe: false,
middle_proxy_nat_stun: None,
middle_proxy_nat_stun_servers: Vec::new(),
middle_proxy_pool_size: default_pool_size(),
middle_proxy_warm_standby: 0,
me_keepalive_enabled: true,
me_keepalive_interval_secs: default_keepalive_interval(),
me_keepalive_jitter_secs: default_keepalive_jitter(),
me_keepalive_payload_random: true,
me_warmup_stagger_enabled: true,
me_warmup_step_delay_ms: default_warmup_step_delay_ms(),
me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(),
me_reconnect_max_concurrent_per_dc: 1,
me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(),
me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(),
me_reconnect_fast_retry_count: 1,
stun_iface_mismatch_ignore: false,
unknown_dc_log_path: default_unknown_dc_log_path(),
log_level: LogLevel::Normal,
@@ -403,6 +473,10 @@ pub struct UpstreamConfig {
pub weight: u16,
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default)]
pub scopes: String,
#[serde(skip)]
pub selected_scope: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -11,6 +11,9 @@ pub struct SecureRandom {
inner: Mutex<SecureRandomInner>,
}
unsafe impl Send for SecureRandom {}
unsafe impl Sync for SecureRandom {}
struct SecureRandomInner {
rng: StdRng,
cipher: AesCtr,
@@ -211,4 +214,4 @@ mod tests {
assert_ne!(shuffled, original);
}
}
}

View File

@@ -92,6 +92,10 @@ fn parse_cli() -> (String, bool, Option<String>) {
eprintln!(" --no-start Don't start the service after install");
std::process::exit(0);
}
"--version" | "-V" => {
println!("telemt {}", env!("CARGO_PKG_VERSION"));
std::process::exit(0);
}
s if !s.starts_with('-') => {
config_path = s.to_string();
}
@@ -106,18 +110,20 @@ fn parse_cli() -> (String, bool, Option<String>) {
}
fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {
info!("--- Proxy Links ({}) ---", host);
info!(target: "telemt::links", "--- Proxy Links ({}) ---", host);
for user_name in config.general.links.show.resolve_users(&config.access.users) {
if let Some(secret) = config.access.users.get(user_name) {
info!("User: {}", user_name);
info!(target: "telemt::links", "User: {}", user_name);
if config.general.modes.classic {
info!(
target: "telemt::links",
" Classic: tg://proxy?server={}&port={}&secret={}",
host, port, secret
);
}
if config.general.modes.secure {
info!(
target: "telemt::links",
" DD: tg://proxy?server={}&port={}&secret=dd{}",
host, port, secret
);
@@ -125,15 +131,16 @@ fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {
if config.general.modes.tls {
let domain_hex = hex::encode(&config.censorship.tls_domain);
info!(
target: "telemt::links",
" EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}",
host, port, secret, domain_hex
);
}
} else {
warn!("User '{}' in show_link not found", user_name);
warn!(target: "telemt::links", "User '{}' in show_link not found", user_name);
}
}
info!("------------------------");
info!(target: "telemt::links", "------------------------");
}
#[tokio::main]
@@ -317,6 +324,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
config.general.middle_proxy_nat_ip,
config.general.middle_proxy_nat_probe,
config.general.middle_proxy_nat_stun.clone(),
config.general.middle_proxy_nat_stun_servers.clone(),
probe.detected_ipv6,
config.timeouts.me_one_retry,
config.timeouts.me_one_timeout_ms,
@@ -325,18 +333,32 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
cfg_v4.default_dc.or(cfg_v6.default_dc),
decision.clone(),
rng.clone(),
stats.clone(),
config.general.me_keepalive_enabled,
config.general.me_keepalive_interval_secs,
config.general.me_keepalive_jitter_secs,
config.general.me_keepalive_payload_random,
config.general.me_warmup_stagger_enabled,
config.general.me_warmup_step_delay_ms,
config.general.me_warmup_step_jitter_ms,
config.general.me_reconnect_max_concurrent_per_dc,
config.general.me_reconnect_backoff_base_ms,
config.general.me_reconnect_backoff_cap_ms,
config.general.me_reconnect_fast_retry_count,
);
match pool.init(2, &rng).await {
let pool_size = config.general.middle_proxy_pool_size.max(1);
match pool.init(pool_size, &rng).await {
Ok(()) => {
info!("Middle-End pool initialized successfully");
// Phase 4: Start health monitor
let pool_clone = pool.clone();
let rng_clone = rng.clone();
let min_conns = pool_size;
tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
pool_clone, rng_clone, 2,
pool_clone, rng_clone, min_conns,
)
.await;
});
@@ -740,6 +762,8 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
// Switch to user-configured log level after startup
let runtime_filter = if has_rust_log {
EnvFilter::from_default_env()
} else if matches!(effective_log_level, LogLevel::Silent) {
EnvFilter::new("warn,telemt::links=info")
} else {
EnvFilter::new(effective_log_level.to_filter_str())
};

View File

@@ -91,6 +91,22 @@ fn render_metrics(stats: &Stats) -> String {
let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter");
let _ = writeln!(out, "telemt_handshake_timeouts_total {}", stats.get_handshake_timeouts());
let _ = writeln!(out, "# HELP telemt_me_keepalive_sent_total ME keepalive frames sent");
let _ = writeln!(out, "# TYPE telemt_me_keepalive_sent_total counter");
let _ = writeln!(out, "telemt_me_keepalive_sent_total {}", stats.get_me_keepalive_sent());
let _ = writeln!(out, "# HELP telemt_me_keepalive_failed_total ME keepalive send failures");
let _ = writeln!(out, "# TYPE telemt_me_keepalive_failed_total counter");
let _ = writeln!(out, "telemt_me_keepalive_failed_total {}", stats.get_me_keepalive_failed());
let _ = writeln!(out, "# HELP telemt_me_reconnect_attempts_total ME reconnect attempts");
let _ = writeln!(out, "# TYPE telemt_me_reconnect_attempts_total counter");
let _ = writeln!(out, "telemt_me_reconnect_attempts_total {}", stats.get_me_reconnect_attempts());
let _ = writeln!(out, "# HELP telemt_me_reconnect_success_total ME reconnect successes");
let _ = writeln!(out, "# TYPE telemt_me_reconnect_success_total counter");
let _ = writeln!(out, "telemt_me_reconnect_success_total {}", stats.get_me_reconnect_success());
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");

View File

@@ -50,10 +50,17 @@ pub async fn stun_probe_family(stun_addr: &str, family: IpFamily) -> Result<Opti
let target_addr = resolve_stun_addr(stun_addr, family).await?;
if let Some(addr) = target_addr {
socket
.connect(addr)
.await
.map_err(|e| ProxyError::Proxy(format!("STUN connect failed: {e}")))?;
match socket.connect(addr).await {
Ok(()) => {}
Err(e) if family == IpFamily::V6 && matches!(
e.kind(),
std::io::ErrorKind::NetworkUnreachable
| std::io::ErrorKind::HostUnreachable
| std::io::ErrorKind::Unsupported
| std::io::ErrorKind::NetworkDown
) => return Ok(None),
Err(e) => return Err(ProxyError::Proxy(format!("STUN connect failed: {e}"))),
}
} else {
return Ok(None);
}

View File

@@ -45,7 +45,7 @@ where
);
let tg_stream = upstream_manager
.connect(dc_addr, Some(success.dc_idx))
.connect(dc_addr, Some(success.dc_idx), user.strip_prefix("scope_").filter(|s| !s.is_empty()))
.await?;
debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected, performing TG handshake");

View File

@@ -19,6 +19,10 @@ pub struct Stats {
connects_all: AtomicU64,
connects_bad: AtomicU64,
handshake_timeouts: AtomicU64,
me_keepalive_sent: AtomicU64,
me_keepalive_failed: AtomicU64,
me_reconnect_attempts: AtomicU64,
me_reconnect_success: AtomicU64,
user_stats: DashMap<String, UserStats>,
start_time: parking_lot::RwLock<Option<Instant>>,
}
@@ -43,8 +47,16 @@ impl Stats {
pub fn increment_connects_all(&self) { self.connects_all.fetch_add(1, Ordering::Relaxed); }
pub fn increment_connects_bad(&self) { self.connects_bad.fetch_add(1, Ordering::Relaxed); }
pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_keepalive_sent(&self) { self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_keepalive_failed(&self) { self.me_keepalive_failed.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_reconnect_attempt(&self) { self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_reconnect_success(&self) { self.me_reconnect_success.fetch_add(1, Ordering::Relaxed); }
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) }
pub fn get_me_reconnect_attempts(&self) -> u64 { self.me_reconnect_attempts.load(Ordering::Relaxed) }
pub fn get_me_reconnect_success(&self) -> u64 { self.me_reconnect_success.load(Ordering::Relaxed) }
pub fn increment_user_connects(&self, user: &str) {
self.user_stats.entry(user.to_string()).or_default()

View File

@@ -4,6 +4,14 @@ use crate::crypto::{AesCbc, crc32};
use crate::error::{ProxyError, Result};
use crate::protocol::constants::*;
/// Commands sent to dedicated writer tasks to avoid mutex contention on TCP writes.
pub(crate) enum WriterCommand {
Data(Vec<u8>),
DataAndFlush(Vec<u8>),
Keepalive,
Close,
}
pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8]) -> Vec<u8> {
let total_len = (4 + 4 + payload.len() + 4) as u32;
let mut frame = Vec::with_capacity(total_len as usize);
@@ -181,4 +189,12 @@ impl RpcWriter {
self.send(payload).await?;
self.writer.flush().await.map_err(ProxyError::Io)
}
pub(crate) async fn send_keepalive(&mut self, payload: [u8; 4]) -> Result<()> {
// Keepalive is a frame with fl == 4 and 4 bytes payload.
let mut frame = Vec::with_capacity(8);
frame.extend_from_slice(&4u32.to_le_bytes());
frame.extend_from_slice(&payload);
self.send(&frame).await
}
}

View File

@@ -13,6 +13,24 @@ use super::secret::download_proxy_secret;
use crate::crypto::SecureRandom;
use std::time::SystemTime;
async fn retry_fetch(url: &str) -> Option<ProxyConfigData> {
let delays = [1u64, 5, 15];
for (i, d) in delays.iter().enumerate() {
match fetch_proxy_config(url).await {
Ok(cfg) => return Some(cfg),
Err(e) => {
if i == delays.len() - 1 {
warn!(error = %e, url, "fetch_proxy_config failed");
} else {
debug!(error = %e, url, "fetch_proxy_config retrying");
tokio::time::sleep(Duration::from_secs(*d)).await;
}
}
}
}
None
}
#[derive(Debug, Clone, Default)]
pub struct ProxyConfigData {
pub map: HashMap<i32, Vec<(IpAddr, u16)>>,
@@ -118,7 +136,8 @@ pub async fn me_config_updater(pool: Arc<MePool>, rng: Arc<SecureRandom>, interv
tick.tick().await;
// Update proxy config v4
if let Ok(cfg) = fetch_proxy_config("https://core.telegram.org/getProxyConfig").await {
let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await;
if let Some(cfg) = cfg_v4 {
let changed = pool.update_proxy_maps(cfg.map.clone(), None).await;
if let Some(dc) = cfg.default_dc {
pool.default_dc.store(dc, std::sync::atomic::Ordering::Relaxed);
@@ -129,14 +148,20 @@ pub async fn me_config_updater(pool: Arc<MePool>, rng: Arc<SecureRandom>, interv
} else {
debug!("ME config v4 unchanged");
}
} else {
warn!("getProxyConfig update failed");
}
// Update proxy config v6 (optional)
if let Ok(cfg_v6) = fetch_proxy_config("https://core.telegram.org/getProxyConfigV6").await {
let _ = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await;
let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await;
if let Some(cfg_v6) = cfg_v6 {
let changed = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await;
if changed {
info!("ME config updated (v6), reconciling connections");
pool.reconcile_connections(&rng).await;
} else {
debug!("ME config v6 unchanged");
}
}
pool.reset_stun_state();
// Update proxy-secret
match download_proxy_secret().await {

View File

@@ -5,25 +5,30 @@ use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
use rand::seq::SliceRandom;
use rand::Rng;
use crate::crypto::SecureRandom;
use crate::network::IpFamily;
use super::MePool;
const HEALTH_INTERVAL_SECS: u64 = 1;
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1;
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut inflight_single: HashSet<(i32, IpFamily)> = HashSet::new();
let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut inflight: HashMap<(i32, IpFamily), usize> = HashMap::new();
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
check_family(
IpFamily::V4,
&pool,
&rng,
&mut backoff,
&mut last_attempt,
&mut inflight_single,
&mut next_attempt,
&mut inflight,
)
.await;
check_family(
@@ -31,8 +36,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&pool,
&rng,
&mut backoff,
&mut last_attempt,
&mut inflight_single,
&mut next_attempt,
&mut inflight,
)
.await;
}
@@ -43,8 +48,8 @@ async fn check_family(
pool: &Arc<MePool>,
rng: &Arc<SecureRandom>,
backoff: &mut HashMap<(i32, IpFamily), u64>,
last_attempt: &mut HashMap<(i32, IpFamily), Instant>,
inflight_single: &mut HashSet<(i32, IpFamily)>,
next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
inflight: &mut HashMap<(i32, IpFamily), usize>,
) {
let enabled = match family {
IpFamily::V4 => pool.decision.ipv4_me,
@@ -80,95 +85,60 @@ async fn check_family(
for (dc, dc_addrs) in entries {
let has_coverage = dc_addrs.iter().any(|a| writer_addrs.contains(a));
if has_coverage {
inflight_single.remove(&(dc, family));
continue;
}
let key = (dc, family);
let delay = *backoff.get(&key).unwrap_or(&30);
let now = Instant::now();
if let Some(last) = last_attempt.get(&key) {
if now.duration_since(*last).as_secs() < delay {
continue;
}
}
if dc_addrs.len() == 1 {
// Single ME address: fast retries then slower background retries.
if inflight_single.contains(&key) {
continue;
}
inflight_single.insert(key);
let addr = dc_addrs[0];
let dc_id = dc;
let pool_clone = pool.clone();
let rng_clone = rng.clone();
let timeout = pool.me_one_timeout;
let quick_attempts = pool.me_one_retry.max(1);
tokio::spawn(async move {
let mut success = false;
for _ in 0..quick_attempts {
let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await;
match res {
Ok(Ok(())) => {
info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage");
success = true;
break;
}
Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"),
Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"),
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
if success {
return;
}
let timeout_ms = timeout.as_millis();
warn!(
dc = %dc_id,
?family,
attempts = quick_attempts,
timeout_ms,
"DC={} has no ME coverage: {} tries * {} ms... retry in 5 seconds...",
dc_id,
quick_attempts,
timeout_ms
);
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await;
match res {
Ok(Ok(())) => {
info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage");
break;
}
Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"),
Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"),
}
}
// will drop inflight flag in outer loop when coverage detected
});
continue;
}
warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting...");
let mut shuffled = dc_addrs.clone();
shuffled.shuffle(&mut rand::rng());
let mut reconnected = false;
for addr in shuffled {
match pool.connect_one(addr, rng.as_ref()).await {
Ok(()) => {
info!(%addr, dc = %dc, ?family, "ME reconnected for DC coverage");
backoff.insert(key, 30);
last_attempt.insert(key, now);
reconnected = true;
break;
}
Err(e) => debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed"),
let key = (dc, family);
let now = Instant::now();
if let Some(ts) = next_attempt.get(&key) {
if now < *ts {
continue;
}
}
if !reconnected {
let next = (*backoff.get(&key).unwrap_or(&30)).saturating_mul(2).min(300);
backoff.insert(key, next);
last_attempt.insert(key, now);
let max_concurrent = pool.me_reconnect_max_concurrent_per_dc.max(1) as usize;
if *inflight.get(&key).unwrap_or(&0) >= max_concurrent {
return;
}
*inflight.entry(key).or_insert(0) += 1;
let mut shuffled = dc_addrs.clone();
shuffled.shuffle(&mut rand::rng());
let mut success = false;
for addr in shuffled {
let res = tokio::time::timeout(pool.me_one_timeout, pool.connect_one(addr, rng.as_ref())).await;
match res {
Ok(Ok(())) => {
info!(%addr, dc = %dc, ?family, "ME reconnected for DC coverage");
pool.stats.increment_me_reconnect_success();
backoff.insert(key, pool.me_reconnect_backoff_base.as_millis() as u64);
let jitter = pool.me_reconnect_backoff_base.as_millis() as u64 / JITTER_FRAC_NUM;
let wait = pool.me_reconnect_backoff_base
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
next_attempt.insert(key, now + wait);
success = true;
break;
}
Ok(Err(e)) => {
pool.stats.increment_me_reconnect_attempt();
debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed")
}
Err(_) => debug!(%addr, dc = %dc, ?family, "ME reconnect timed out"),
}
}
if !success {
pool.stats.increment_me_reconnect_attempt();
let curr = *backoff.get(&key).unwrap_or(&(pool.me_reconnect_backoff_base.as_millis() as u64));
let next_ms = (curr.saturating_mul(2)).min(pool.me_reconnect_backoff_cap.as_millis() as u64);
backoff.insert(key, next_ms);
let jitter = next_ms / JITTER_FRAC_NUM;
let wait = Duration::from_millis(next_ms)
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
next_attempt.insert(key, now + wait);
warn!(dc = %dc, backoff_ms = next_ms, ?family, "DC has no ME coverage, scheduled reconnect");
}
if let Some(v) = inflight.get_mut(&key) {
*v = v.saturating_sub(1);
}
}
}

View File

@@ -1,14 +1,14 @@
use std::collections::HashMap;
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicUsize, Ordering};
use bytes::BytesMut;
use rand::Rng;
use rand::seq::SliceRandom;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::{Mutex, RwLock, mpsc, Notify};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use std::time::Duration;
use std::time::{Duration, Instant};
use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result};
@@ -18,18 +18,18 @@ use crate::protocol::constants::*;
use super::ConnRegistry;
use super::registry::{BoundConn, ConnMeta};
use super::codec::RpcWriter;
use super::codec::{RpcWriter, WriterCommand};
use super::reader::reader_loop;
use super::MeResponse;
const ME_ACTIVE_PING_SECS: u64 = 25;
const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
const ME_KEEPALIVE_PAYLOAD_LEN: usize = 4;
#[derive(Clone)]
pub struct MeWriter {
pub id: u64,
pub addr: SocketAddr,
pub writer: Arc<Mutex<RpcWriter>>,
pub tx: mpsc::Sender<WriterCommand>,
pub cancel: CancellationToken,
pub degraded: Arc<AtomicBool>,
pub draining: Arc<AtomicBool>,
@@ -47,11 +47,24 @@ pub struct MePool {
pub(super) nat_ip_detected: Arc<RwLock<Option<IpAddr>>>,
pub(super) nat_probe: bool,
pub(super) nat_stun: Option<String>,
pub(super) nat_stun_servers: Vec<String>,
pub(super) detected_ipv6: Option<Ipv6Addr>,
pub(super) nat_probe_attempts: std::sync::atomic::AtomicU8,
pub(super) nat_probe_disabled: std::sync::atomic::AtomicBool,
pub(super) stun_backoff_until: Arc<RwLock<Option<Instant>>>,
pub(super) me_one_retry: u8,
pub(super) me_one_timeout: Duration,
pub(super) me_keepalive_enabled: bool,
pub(super) me_keepalive_interval: Duration,
pub(super) me_keepalive_jitter: Duration,
pub(super) me_keepalive_payload_random: bool,
pub(super) me_warmup_stagger_enabled: bool,
pub(super) me_warmup_step_delay: Duration,
pub(super) me_warmup_step_jitter: Duration,
pub(super) me_reconnect_max_concurrent_per_dc: u32,
pub(super) me_reconnect_backoff_base: Duration,
pub(super) me_reconnect_backoff_cap: Duration,
pub(super) me_reconnect_fast_retry_count: u32,
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) default_dc: AtomicI32,
@@ -59,6 +72,9 @@ pub struct MePool {
pub(super) ping_tracker: Arc<Mutex<HashMap<i64, (std::time::Instant, u64)>>>,
pub(super) rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>,
pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>,
pub(super) writer_available: Arc<Notify>,
pub(super) conn_count: AtomicUsize,
pub(super) stats: Arc<crate::stats::Stats>,
pool_size: usize,
}
@@ -75,6 +91,7 @@ impl MePool {
nat_ip: Option<IpAddr>,
nat_probe: bool,
nat_stun: Option<String>,
nat_stun_servers: Vec<String>,
detected_ipv6: Option<Ipv6Addr>,
me_one_retry: u8,
me_one_timeout_ms: u64,
@@ -83,6 +100,18 @@ impl MePool {
default_dc: Option<i32>,
decision: NetworkDecision,
rng: Arc<SecureRandom>,
stats: Arc<crate::stats::Stats>,
me_keepalive_enabled: bool,
me_keepalive_interval_secs: u64,
me_keepalive_jitter_secs: u64,
me_keepalive_payload_random: bool,
me_warmup_stagger_enabled: bool,
me_warmup_step_delay_ms: u64,
me_warmup_step_jitter_ms: u64,
me_reconnect_max_concurrent_per_dc: u32,
me_reconnect_backoff_base_ms: u64,
me_reconnect_backoff_cap_ms: u64,
me_reconnect_fast_retry_count: u32,
) -> Arc<Self> {
Arc::new(Self {
registry: Arc::new(ConnRegistry::new()),
@@ -96,11 +125,25 @@ impl MePool {
nat_ip_detected: Arc::new(RwLock::new(None)),
nat_probe,
nat_stun,
nat_stun_servers,
detected_ipv6,
nat_probe_attempts: std::sync::atomic::AtomicU8::new(0),
nat_probe_disabled: std::sync::atomic::AtomicBool::new(false),
stun_backoff_until: Arc::new(RwLock::new(None)),
me_one_retry,
me_one_timeout: Duration::from_millis(me_one_timeout_ms),
stats,
me_keepalive_enabled,
me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs),
me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs),
me_keepalive_payload_random,
me_warmup_stagger_enabled,
me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms),
me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms),
me_reconnect_max_concurrent_per_dc,
me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms),
me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms),
me_reconnect_fast_retry_count,
pool_size: 2,
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
@@ -109,6 +152,8 @@ impl MePool {
ping_tracker: Arc::new(Mutex::new(HashMap::new())),
rtt_stats: Arc::new(Mutex::new(HashMap::new())),
nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())),
writer_available: Arc::new(Notify::new()),
conn_count: AtomicUsize::new(0),
})
}
@@ -116,6 +161,11 @@ impl MePool {
self.proxy_tag.is_some()
}
pub fn reset_stun_state(&self) {
self.nat_probe_attempts.store(0, Ordering::Relaxed);
self.nat_probe_disabled.store(false, Ordering::Relaxed);
}
pub fn translate_our_addr(&self, addr: SocketAddr) -> SocketAddr {
let ip = self.translate_ip_for_nat(addr.ip());
SocketAddr::new(ip, addr.port())
@@ -132,7 +182,11 @@ impl MePool {
pub async fn reconcile_connections(self: &Arc<Self>, rng: &SecureRandom) {
use std::collections::HashSet;
let writers = self.writers.read().await;
let current: HashSet<SocketAddr> = writers.iter().map(|w| w.addr).collect();
let current: HashSet<SocketAddr> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.map(|w| w.addr)
.collect();
drop(writers);
for family in self.family_order() {
@@ -175,12 +229,36 @@ impl MePool {
let mut guard = self.proxy_map_v6.write().await;
if !v6.is_empty() && *guard != v6 {
*guard = v6;
changed = true;
}
}
// Ensure negative DC entries mirror positives when absent (Telegram convention).
{
let mut guard = self.proxy_map_v4.write().await;
let keys: Vec<i32> = guard.keys().cloned().collect();
for k in keys.iter().cloned().filter(|k| *k > 0) {
if !guard.contains_key(&-k) {
if let Some(addrs) = guard.get(&k).cloned() {
guard.insert(-k, addrs);
}
}
}
}
{
let mut guard = self.proxy_map_v6.write().await;
let keys: Vec<i32> = guard.keys().cloned().collect();
for k in keys.iter().cloned().filter(|k| *k > 0) {
if !guard.contains_key(&-k) {
if let Some(addrs) = guard.get(&k).cloned() {
guard.insert(-k, addrs);
}
}
}
}
changed
}
pub async fn update_secret(&self, new_secret: Vec<u8>) -> bool {
pub async fn update_secret(self: &Arc<Self>, new_secret: Vec<u8>) -> bool {
if new_secret.len() < 32 {
warn!(len = new_secret.len(), "proxy-secret update ignored (too short)");
return false;
@@ -195,10 +273,14 @@ impl MePool {
false
}
pub async fn reconnect_all(&self) {
// Graceful: do not drop all at once. New connections will use updated secret.
// Existing writers remain until health monitor replaces them.
// No-op here to avoid total outage.
pub async fn reconnect_all(self: &Arc<Self>) {
let ws = self.writers.read().await.clone();
for w in ws {
if let Ok(()) = self.connect_one(w.addr, self.rng.as_ref()).await {
self.mark_writer_draining(w.id).await;
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
pub(super) async fn key_selector(&self) -> u32 {
@@ -277,7 +359,24 @@ impl MePool {
return Err(ProxyError::Proxy("Too many ME DC init failures, falling back to direct".into()));
}
// Additional connections up to pool_size total (round-robin across DCs)
// Additional connections up to pool_size total (round-robin across DCs), staggered to de-phase lifecycles.
if self.me_warmup_stagger_enabled {
let mut delay_ms = 0u64;
for (dc, addrs) in dc_addrs.iter() {
for (ip, port) in addrs {
if self.connection_count() >= pool_size {
break;
}
let addr = SocketAddr::new(*ip, *port);
let jitter = rand::rng().random_range(0..=self.me_warmup_step_jitter.as_millis() as u64);
delay_ms = delay_ms.saturating_add(self.me_warmup_step_delay.as_millis() as u64 + jitter);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
if let Err(e) = self.connect_one(addr, rng.as_ref()).await {
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed (staggered)");
}
}
}
} else {
for (dc, addrs) in dc_addrs.iter() {
for (ip, port) in addrs {
if self.connection_count() >= pool_size {
@@ -292,6 +391,7 @@ impl MePool {
break;
}
}
}
if !self.decision.effective_multipath && self.connection_count() > 0 {
break;
@@ -317,21 +417,61 @@ impl MePool {
let cancel = CancellationToken::new();
let degraded = Arc::new(AtomicBool::new(false));
let draining = Arc::new(AtomicBool::new(false));
let rpc_w = Arc::new(Mutex::new(RpcWriter {
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
let tx_for_keepalive = tx.clone();
let keepalive_random = self.me_keepalive_payload_random;
let stats = self.stats.clone();
let mut rpc_writer = RpcWriter {
writer: hs.wr,
key: hs.write_key,
iv: hs.write_iv,
seq_no: 0,
}));
};
let cancel_wr = cancel.clone();
tokio::spawn(async move {
loop {
tokio::select! {
cmd = rx.recv() => {
match cmd {
Some(WriterCommand::Data(payload)) => {
if rpc_writer.send(&payload).await.is_err() { break; }
}
Some(WriterCommand::DataAndFlush(payload)) => {
if rpc_writer.send_and_flush(&payload).await.is_err() { break; }
}
Some(WriterCommand::Keepalive) => {
let mut payload = [0u8; ME_KEEPALIVE_PAYLOAD_LEN];
if keepalive_random {
rand::rng().fill(&mut payload);
}
match rpc_writer.send_keepalive(payload).await {
Ok(()) => {
stats.increment_me_keepalive_sent();
}
Err(_) => {
stats.increment_me_keepalive_failed();
break;
}
}
}
Some(WriterCommand::Close) | None => break,
}
}
_ = cancel_wr.cancelled() => break,
}
}
});
let writer = MeWriter {
id: writer_id,
addr,
writer: rpc_w.clone(),
tx: tx.clone(),
cancel: cancel.clone(),
degraded: degraded.clone(),
draining: draining.clone(),
};
self.writers.write().await.push(writer.clone());
self.conn_count.fetch_add(1, Ordering::Relaxed);
self.writer_available.notify_waiters();
let reg = self.registry.clone();
let writers_arc = self.writers_arc();
@@ -339,11 +479,19 @@ impl MePool {
let rtt_stats = self.rtt_stats.clone();
let pool = Arc::downgrade(self);
let cancel_ping = cancel.clone();
let rpc_w_ping = rpc_w.clone();
let tx_ping = tx.clone();
let ping_tracker_ping = ping_tracker.clone();
let cleanup_done = Arc::new(AtomicBool::new(false));
let cleanup_for_reader = cleanup_done.clone();
let cleanup_for_ping = cleanup_done.clone();
let keepalive_enabled = self.me_keepalive_enabled;
let keepalive_interval = self.me_keepalive_interval;
let keepalive_jitter = self.me_keepalive_jitter;
let cancel_reader_token = cancel.clone();
let cancel_ping_token = cancel_ping.clone();
let cancel_keepalive_token = cancel.clone();
tokio::spawn(async move {
let cancel_reader = cancel.clone();
let res = reader_loop(
hs.rd,
hs.read_key,
@@ -351,16 +499,21 @@ impl MePool {
reg.clone(),
BytesMut::new(),
BytesMut::new(),
rpc_w.clone(),
tx.clone(),
ping_tracker.clone(),
rtt_stats.clone(),
writer_id,
degraded.clone(),
cancel_reader.clone(),
cancel_reader_token.clone(),
)
.await;
if let Some(pool) = pool.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await;
if cleanup_for_reader
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
}
}
if let Err(e) = res {
warn!(error = %e, "ME reader ended");
@@ -378,7 +531,7 @@ impl MePool {
.random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
tokio::select! {
_ = cancel_ping.cancelled() => {
_ = cancel_ping_token.cancelled() => {
break;
}
_ = tokio::time::sleep(Duration::from_secs(wait)) => {}
@@ -389,20 +542,45 @@ impl MePool {
p.extend_from_slice(&sent_id.to_le_bytes());
{
let mut tracker = ping_tracker_ping.lock().await;
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
}
ping_id = ping_id.wrapping_add(1);
if let Err(e) = rpc_w_ping.lock().await.send_and_flush(&p).await {
debug!(error = %e, "Active ME ping failed, removing dead writer");
if tx_ping.send(WriterCommand::DataAndFlush(p)).await.is_err() {
debug!("Active ME ping failed, removing dead writer");
cancel_ping.cancel();
if let Some(pool) = pool_ping.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await;
if cleanup_for_ping
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
}
}
break;
}
}
});
if keepalive_enabled {
let tx_keepalive = tx_for_keepalive;
let cancel_keepalive = cancel_keepalive_token;
tokio::spawn(async move {
// Per-writer jittered start to avoid phase sync.
let initial_jitter_ms = rand::rng().random_range(0..=keepalive_jitter.as_millis().max(1) as u64);
tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await;
loop {
tokio::select! {
_ = cancel_keepalive.cancelled() => break,
_ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=keepalive_jitter.as_millis() as u64))) => {}
}
if tx_keepalive.send(WriterCommand::Keepalive).await.is_err() {
break;
}
}
});
}
Ok(())
}
@@ -430,7 +608,7 @@ impl MePool {
false
}
pub(crate) async fn remove_writer_and_close_clients(&self, writer_id: u64) {
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
let conns = self.remove_writer_only(writer_id).await;
for bound in conns {
let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await;
@@ -444,8 +622,11 @@ impl MePool {
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
let w = ws.remove(pos);
w.cancel.cancel();
let _ = w.tx.send(WriterCommand::Close).await;
self.conn_count.fetch_sub(1, Ordering::Relaxed);
}
}
self.rtt_stats.lock().await.remove(&writer_id);
self.registry.writer_lost(writer_id).await
}
@@ -459,8 +640,14 @@ impl MePool {
let pool = Arc::downgrade(self);
tokio::spawn(async move {
let deadline = Instant::now() + Duration::from_secs(300);
loop {
if let Some(p) = pool.upgrade() {
if Instant::now() >= deadline {
warn!(writer_id, "Drain timeout, force-closing");
let _ = p.remove_writer_and_close_clients(writer_id).await;
break;
}
if p.registry.is_writer_empty(writer_id).await {
let _ = p.remove_writer_only(writer_id).await;
break;

View File

@@ -98,16 +98,18 @@ impl MePool {
family: IpFamily,
) -> Option<std::net::SocketAddr> {
const STUN_CACHE_TTL: Duration = Duration::from_secs(600);
// If STUN probing was disabled after attempts, reuse cached (even stale) or skip.
if self.nat_probe_disabled.load(std::sync::atomic::Ordering::Relaxed) {
if let Ok(cache) = self.nat_reflection_cache.try_lock() {
let slot = match family {
IpFamily::V4 => cache.v4,
IpFamily::V6 => cache.v6,
};
return slot.map(|(_, addr)| addr);
// Backoff window
if let Some(until) = *self.stun_backoff_until.read().await {
if Instant::now() < until {
if let Ok(cache) = self.nat_reflection_cache.try_lock() {
let slot = match family {
IpFamily::V4 => cache.v4,
IpFamily::V6 => cache.v6,
};
return slot.map(|(_, addr)| addr);
}
return None;
}
return None;
}
if let Ok(mut cache) = self.nat_reflection_cache.try_lock() {
@@ -123,48 +125,42 @@ impl MePool {
}
let attempt = self.nat_probe_attempts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if attempt >= 2 {
self.nat_probe_disabled.store(true, std::sync::atomic::Ordering::Relaxed);
return None;
}
let servers = if !self.nat_stun_servers.is_empty() {
self.nat_stun_servers.clone()
} else if let Some(s) = &self.nat_stun {
vec![s.clone()]
} else {
vec!["stun.l.google.com:19302".to_string()]
};
let stun_addr = self
.nat_stun
.clone()
.unwrap_or_else(|| "stun.l.google.com:19302".to_string());
match stun_probe_dual(&stun_addr).await {
Ok(res) => {
let picked: Option<StunProbeResult> = match family {
IpFamily::V4 => res.v4,
IpFamily::V6 => res.v6,
};
if let Some(result) = picked {
info!(local = %result.local_addr, reflected = %result.reflected_addr, family = ?family, "NAT probe: reflected address");
if let Ok(mut cache) = self.nat_reflection_cache.try_lock() {
let slot = match family {
IpFamily::V4 => &mut cache.v4,
IpFamily::V6 => &mut cache.v6,
};
*slot = Some((Instant::now(), result.reflected_addr));
for stun_addr in servers {
match stun_probe_dual(&stun_addr).await {
Ok(res) => {
let picked: Option<StunProbeResult> = match family {
IpFamily::V4 => res.v4,
IpFamily::V6 => res.v6,
};
if let Some(result) = picked {
info!(local = %result.local_addr, reflected = %result.reflected_addr, family = ?family, stun = %stun_addr, "NAT probe: reflected address");
self.nat_probe_attempts.store(0, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut cache) = self.nat_reflection_cache.try_lock() {
let slot = match family {
IpFamily::V4 => &mut cache.v4,
IpFamily::V6 => &mut cache.v6,
};
*slot = Some((Instant::now(), result.reflected_addr));
}
return Some(result.reflected_addr);
}
Some(result.reflected_addr)
} else {
None
}
}
Err(e) => {
let attempts = attempt + 1;
if attempts <= 2 {
warn!(error = %e, attempt = attempts, "NAT probe failed");
} else {
debug!(error = %e, attempt = attempts, "NAT probe suppressed after max attempts");
Err(e) => {
warn!(error = %e, stun = %stun_addr, attempt = attempt + 1, "NAT probe failed, trying next server");
}
if attempts >= 2 {
self.nat_probe_disabled.store(true, std::sync::atomic::Ordering::Relaxed);
}
None
}
}
let backoff = Duration::from_secs(60 * 2u64.pow((attempt as u32).min(6)));
*self.stun_backoff_until.write().await = Some(Instant::now() + backoff);
None
}
}

View File

@@ -6,7 +6,7 @@ use std::time::Instant;
use bytes::{Bytes, BytesMut};
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
@@ -14,7 +14,7 @@ use crate::crypto::{AesCbc, crc32};
use crate::error::{ProxyError, Result};
use crate::protocol::constants::*;
use super::codec::RpcWriter;
use super::codec::WriterCommand;
use super::{ConnRegistry, MeResponse};
pub(crate) async fn reader_loop(
@@ -24,7 +24,7 @@ pub(crate) async fn reader_loop(
reg: Arc<ConnRegistry>,
enc_leftover: BytesMut,
mut dec: BytesMut,
writer: Arc<Mutex<RpcWriter>>,
tx: mpsc::Sender<WriterCommand>,
ping_tracker: Arc<Mutex<HashMap<i64, (Instant, u64)>>>,
rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>,
_writer_id: u64,
@@ -33,6 +33,8 @@ pub(crate) async fn reader_loop(
) -> Result<()> {
let mut raw = enc_leftover;
let mut expected_seq: i32 = 0;
let mut crc_errors = 0u32;
let mut seq_mismatch = 0u32;
loop {
let mut tmp = [0u8; 16_384];
@@ -80,12 +82,20 @@ pub(crate) async fn reader_loop(
let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap());
if crc32(&frame[..pe]) != ec {
warn!("CRC mismatch in data frame");
crc_errors += 1;
if crc_errors > 3 {
return Err(ProxyError::Proxy("Too many CRC mismatches".into()));
}
continue;
}
let seq_no = i32::from_le_bytes(frame[4..8].try_into().unwrap());
if seq_no != expected_seq {
warn!(seq_no, expected = expected_seq, "ME RPC seq mismatch");
seq_mismatch += 1;
if seq_mismatch > 10 {
return Err(ProxyError::Proxy("Too many seq mismatches".into()));
}
expected_seq = seq_no.wrapping_add(1);
} else {
expected_seq = expected_seq.wrapping_add(1);
@@ -108,7 +118,7 @@ pub(crate) async fn reader_loop(
let routed = reg.route(cid, MeResponse::Data { flags, data }).await;
if !routed {
reg.unregister(cid).await;
send_close_conn(&writer, cid).await;
send_close_conn(&tx, cid).await;
}
} else if pt == RPC_SIMPLE_ACK_U32 && body.len() >= 12 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
@@ -118,7 +128,7 @@ pub(crate) async fn reader_loop(
let routed = reg.route(cid, MeResponse::Ack(cfm)).await;
if !routed {
reg.unregister(cid).await;
send_close_conn(&writer, cid).await;
send_close_conn(&tx, cid).await;
}
} else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
@@ -136,8 +146,8 @@ pub(crate) async fn reader_loop(
let mut pong = Vec::with_capacity(12);
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
pong.extend_from_slice(&ping_id.to_le_bytes());
if let Err(e) = writer.lock().await.send_and_flush(&pong).await {
warn!(error = %e, "PONG send failed");
if tx.send(WriterCommand::DataAndFlush(pong)).await.is_err() {
warn!("PONG send failed");
break;
}
} else if pt == RPC_PONG_U32 && body.len() >= 8 {
@@ -171,12 +181,10 @@ pub(crate) async fn reader_loop(
}
}
async fn send_close_conn(writer: &Arc<Mutex<RpcWriter>>, conn_id: u64) {
async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) {
let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes());
if let Err(e) = writer.lock().await.send_and_flush(&p).await {
debug!(conn_id, error = %e, "Failed to send RPC_CLOSE_CONN");
}
let _ = tx.send(WriterCommand::DataAndFlush(p)).await;
}

View File

@@ -5,7 +5,7 @@ use std::sync::Arc;
use tokio::sync::{mpsc, Mutex, RwLock};
use super::codec::RpcWriter;
use super::codec::WriterCommand;
use super::MeResponse;
#[derive(Clone)]
@@ -25,12 +25,12 @@ pub struct BoundConn {
#[derive(Clone)]
pub struct ConnWriter {
pub writer_id: u64,
pub writer: Arc<Mutex<RpcWriter>>,
pub tx: mpsc::Sender<WriterCommand>,
}
struct RegistryInner {
map: HashMap<u64, mpsc::Sender<MeResponse>>,
writers: HashMap<u64, Arc<Mutex<RpcWriter>>>,
writers: HashMap<u64, mpsc::Sender<WriterCommand>>,
writer_for_conn: HashMap<u64, u64>,
conns_for_writer: HashMap<u64, HashSet<u64>>,
meta: HashMap<u64, ConnMeta>,
@@ -96,13 +96,13 @@ impl ConnRegistry {
&self,
conn_id: u64,
writer_id: u64,
writer: Arc<Mutex<RpcWriter>>,
tx: mpsc::Sender<WriterCommand>,
meta: ConnMeta,
) {
let mut inner = self.inner.write().await;
inner.meta.entry(conn_id).or_insert(meta);
inner.writer_for_conn.insert(conn_id, writer_id);
inner.writers.entry(writer_id).or_insert_with(|| writer.clone());
inner.writers.entry(writer_id).or_insert_with(|| tx.clone());
inner
.conns_for_writer
.entry(writer_id)
@@ -114,7 +114,7 @@ impl ConnRegistry {
let inner = self.inner.read().await;
let writer_id = inner.writer_for_conn.get(&conn_id).cloned()?;
let writer = inner.writers.get(&writer_id).cloned()?;
Some(ConnWriter { writer_id, writer })
Some(ConnWriter { writer_id, tx: writer })
}
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {

View File

@@ -31,8 +31,17 @@ pub async fn me_rotation_task(pool: Arc<MePool>, rng: Arc<SecureRandom>, interva
info!(addr = %w.addr, writer_id = w.id, "Rotating ME connection");
match pool.connect_one(w.addr, rng.as_ref()).await {
Ok(()) => {
// Mark old writer for graceful drain; removal happens when sessions finish.
pool.mark_writer_draining(w.id).await;
tokio::time::sleep(Duration::from_secs(2)).await;
let ws = pool.writers.read().await;
let new_alive = ws.iter().any(|nw|
nw.id != w.id && nw.addr == w.addr && !nw.degraded.load(Ordering::Relaxed) && !nw.draining.load(Ordering::Relaxed)
);
drop(ws);
if new_alive {
pool.mark_writer_draining(w.id).await;
} else {
warn!(addr = %w.addr, writer_id = w.id, "New writer died, keeping old");
}
}
Err(e) => {
warn!(addr = %w.addr, writer_id = w.id, error = %e, "ME rotation connect failed");

View File

@@ -10,6 +10,7 @@ use crate::network::IpFamily;
use crate::protocol::constants::RPC_CLOSE_EXT_U32;
use super::MePool;
use super::codec::WriterCommand;
use super::wire::build_proxy_req_payload;
use rand::seq::SliceRandom;
use super::registry::ConnMeta;
@@ -43,18 +44,15 @@ impl MePool {
loop {
if let Some(current) = self.registry.get_writer(conn_id).await {
let send_res = {
if let Ok(mut guard) = current.writer.try_lock() {
let r = guard.send(&payload).await;
drop(guard);
r
} else {
current.writer.lock().await.send(&payload).await
}
current
.tx
.send(WriterCommand::Data(payload.clone()))
.await
};
match send_res {
Ok(()) => return Ok(()),
Err(e) => {
warn!(error = %e, writer_id = current.writer_id, "ME write failed");
Err(_) => {
warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await;
continue;
}
@@ -64,7 +62,26 @@ impl MePool {
let mut writers_snapshot = {
let ws = self.writers.read().await;
if ws.is_empty() {
return Err(ProxyError::Proxy("All ME connections dead".into()));
drop(ws);
for family in self.family_order() {
let map = match family {
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
};
for (_dc, addrs) in map.iter() {
for (ip, port) in addrs {
let addr = SocketAddr::new(*ip, *port);
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
self.writer_available.notify_waiters();
break;
}
}
}
}
if tokio::time::timeout(Duration::from_secs(3), self.writer_available.notified()).await.is_err() {
return Err(ProxyError::Proxy("All ME connections dead (waited 3s)".into()));
}
continue;
}
ws.clone()
};
@@ -96,9 +113,10 @@ impl MePool {
writers_snapshot = ws2.clone();
drop(ws2);
candidate_indices = self.candidate_indices_for_dc(&writers_snapshot, target_dc).await;
break;
if !candidate_indices.is_empty() {
break;
}
}
drop(map_guard);
}
if candidate_indices.is_empty() {
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
@@ -120,22 +138,15 @@ impl MePool {
if w.draining.load(Ordering::Relaxed) {
continue;
}
if let Ok(mut guard) = w.writer.try_lock() {
let send_res = guard.send(&payload).await;
drop(guard);
match send_res {
Ok(()) => {
self.registry
.bind_writer(conn_id, w.id, w.writer.clone(), meta.clone())
.await;
return Ok(());
}
Err(e) => {
warn!(error = %e, writer_id = w.id, "ME write failed");
self.remove_writer_and_close_clients(w.id).await;
continue;
}
}
if w.tx.send(WriterCommand::Data(payload.clone())).await.is_ok() {
self.registry
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
.await;
return Ok(());
} else {
warn!(writer_id = w.id, "ME writer channel closed");
self.remove_writer_and_close_clients(w.id).await;
continue;
}
}
@@ -143,15 +154,15 @@ impl MePool {
if w.draining.load(Ordering::Relaxed) {
continue;
}
match w.writer.lock().await.send(&payload).await {
match w.tx.send(WriterCommand::Data(payload.clone())).await {
Ok(()) => {
self.registry
.bind_writer(conn_id, w.id, w.writer.clone(), meta.clone())
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
.await;
return Ok(());
}
Err(e) => {
warn!(error = %e, writer_id = w.id, "ME write failed (blocking)");
Err(_) => {
warn!(writer_id = w.id, "ME writer channel closed (blocking)");
self.remove_writer_and_close_clients(w.id).await;
}
}
@@ -163,8 +174,8 @@ impl MePool {
let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes());
if let Err(e) = w.writer.lock().await.send_and_flush(&p).await {
debug!(error = %e, "ME close write failed");
if w.tx.send(WriterCommand::DataAndFlush(p)).await.is_err() {
debug!("ME close write failed");
self.remove_writer_and_close_clients(w.writer_id).await;
}
} else {
@@ -176,7 +187,7 @@ impl MePool {
}
pub fn connection_count(&self) -> usize {
self.writers.try_read().map(|w| w.len()).unwrap_or(0)
self.conn_count.load(Ordering::Relaxed)
}
pub(super) async fn candidate_indices_for_dc(

View File

@@ -167,20 +167,44 @@ impl UpstreamManager {
}
/// Select upstream using latency-weighted random selection.
async fn select_upstream(&self, dc_idx: Option<i16>) -> Option<usize> {
async fn select_upstream(&self, dc_idx: Option<i16>, scope: Option<&str>) -> Option<usize> {
let upstreams = self.upstreams.read().await;
if upstreams.is_empty() {
return None;
}
let healthy: Vec<usize> = upstreams.iter()
// Scope filter:
// If scope is set: only scoped and matched items
// If scope is not set: only unscoped items
let filtered_upstreams : Vec<usize> = upstreams.iter()
.enumerate()
.filter(|(_, u)| u.healthy)
.filter(|(_, u)| {
scope.map_or(
u.config.scopes.is_empty(),
|req_scope| {
u.config.scopes
.split(',')
.map(str::trim)
.any(|s| s == req_scope)
}
)
})
.map(|(i, _)| i)
.collect();
// Healthy filter
let healthy: Vec<usize> = filtered_upstreams.iter()
.filter(|&&i| upstreams[i].healthy)
.copied()
.collect();
if filtered_upstreams.is_empty() {
warn!(scope = scope, "No upstreams available! Using first (direct?)");
return None;
}
if healthy.is_empty() {
return Some(rand::rng().gen_range(0..upstreams.len()));
warn!(scope = scope, "No healthy upstreams available! Using random.");
return Some(filtered_upstreams[rand::rng().gen_range(0..filtered_upstreams.len())]);
}
if healthy.len() == 1 {
@@ -222,15 +246,20 @@ impl UpstreamManager {
}
/// Connect to target through a selected upstream.
pub async fn connect(&self, target: SocketAddr, dc_idx: Option<i16>) -> Result<TcpStream> {
let idx = self.select_upstream(dc_idx).await
pub async fn connect(&self, target: SocketAddr, dc_idx: Option<i16>, scope: Option<&str>) -> Result<TcpStream> {
let idx = self.select_upstream(dc_idx, scope).await
.ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?;
let upstream = {
let mut upstream = {
let guard = self.upstreams.read().await;
guard[idx].config.clone()
};
// Set scope for configuration copy
if let Some(s) = scope {
upstream.selected_scope = s.to_string();
}
let start = Instant::now();
match self.connect_via_upstream(&upstream, target).await {
@@ -313,8 +342,12 @@ impl UpstreamManager {
if let Some(e) = stream.take_error()? {
return Err(ProxyError::Io(e));
}
// replace socks user_id with config.selected_scope, if set
let scope: Option<&str> = Some(config.selected_scope.as_str())
.filter(|s| !s.is_empty());
let _user_id: Option<&str> = scope.or(user_id.as_deref());
connect_socks4(&mut stream, target, user_id.as_deref()).await?;
connect_socks4(&mut stream, target, _user_id).await?;
Ok(stream)
},
UpstreamType::Socks5 { address, interface, username, password } => {
@@ -341,7 +374,14 @@ impl UpstreamManager {
return Err(ProxyError::Io(e));
}
connect_socks5(&mut stream, target, username.as_deref(), password.as_deref()).await?;
debug!(config = ?config, "Socks5 connection");
// replace socks user:pass with config.selected_scope, if set
let scope: Option<&str> = Some(config.selected_scope.as_str())
.filter(|s| !s.is_empty());
let _username: Option<&str> = scope.or(username.as_deref());
let _password: Option<&str> = scope.or(password.as_deref());
connect_socks5(&mut stream, target, _username, _password).await?;
Ok(stream)
},
}