From 6a57c237004c44854cb4900fe508a2ee452a7c1c Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 18 Feb 2026 20:56:03 +0300 Subject: [PATCH 01/22] Update README.md --- README.md | 78 ++++++++++++++++--------------------------------------- 1 file changed, 22 insertions(+), 56 deletions(-) diff --git a/README.md b/README.md index 65bfa60..3f80a60 100644 --- a/README.md +++ b/README.md @@ -10,74 +10,40 @@ ### πŸ‡·πŸ‡Ί RU -15 фСвраля ΠΌΡ‹ ΠΎΠΏΡƒΠ±Π»ΠΈΠΊΠΎΠ²Π°Π»ΠΈ `telemt 3` с ΠΏΠΎΠ΄Π΄Π΅Ρ€ΠΆΠΊΠΎΠΉ Middle-End Proxy, Π° Π·Π½Π°Ρ‡ΠΈΡ‚: +18 фСвраля ΠΌΡ‹ ΠΎΠΏΡƒΠ±Π»ΠΈΠΊΠΎΠ²Π°Π»ΠΈ `telemt 3.0.3`, ΠΎΠ½ ΠΈΠΌΠ΅Π΅Ρ‚: -- с Ρ„ΡƒΠ½ΠΊΡ†ΠΈΠΎΠ½Π°Π»ΡŒΠ½Ρ‹ΠΌΠΈ ΠΌΠ΅Π΄ΠΈΠ°, Π² Ρ‚ΠΎΠΌ числС с CDN/DC=203 -- с Ad-tag β€” ΠΏΠΎΠΊΠ°Π·Ρ‹Π²Π°ΠΉΡ‚Π΅ спонсорский ΠΊΠ°Π½Π°Π» ΠΈ собирайтС статистику Ρ‡Π΅Ρ€Π΅Π· ΠΎΡ„ΠΈΡ†ΠΈΠ°Π»ΡŒΠ½ΠΎΠ³ΠΎ Π±ΠΎΡ‚Π° -- с Π½ΠΎΠ²Ρ‹ΠΌ ΠΏΠΎΠ΄Ρ…ΠΎΠ΄ΠΎΠΌ ΠΊ бСзопасности ΠΈ асинхронности -- с высокоточной диагностикой ΠΊΡ€ΠΈΠΏΡ‚ΠΎΠ³Ρ€Π°Ρ„ΠΈΠΈ Ρ‡Π΅Ρ€Π΅Π· `ME_DIAG` +- ΡƒΠ»ΡƒΡ‡ΡˆΠ΅Π½Π½Ρ‹ΠΉ ΠΌΠ΅Ρ…Π°Π½ΠΈΠ·ΠΌ Middle-End Health Check +- высокоскоростноС восстановлСниС ΠΈΠ½ΠΈΡ†ΠΈΠ°Π»ΠΈΠ·Π°Ρ†ΠΈΠΈ Middle-End +- мСньшС Π·Π°Π΄Π΅Ρ€ΠΆΠ΅ΠΊ Π½Π° hot-path +- Π±ΠΎΠ»Π΅Π΅ ΠΊΠΎΡ€Ρ€Π΅ΠΊΡ‚Π½ΡƒΡŽ Ρ€Π°Π±ΠΎΡ‚Ρƒ Π² Dualstack, Π° ΠΈΠΌΠ΅Π½Π½ΠΎ - IPv6 Middle-End +- Π°ΠΊΠΊΡƒΡ€Π°Ρ‚Π½ΠΎΠ΅ ΠΏΠ΅Ρ€Π΅ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ ΠΊΠ»ΠΈΠ΅Π½Ρ‚Π° Π±Π΅Π· Π΄Ρ€ΠΈΡ„Ρ‚Π° сСссий ΠΌΠ΅ΠΆΠ΄Ρƒ Middle-End +- автоматичСская дСградация Π½Π° Direct-DC ΠΏΡ€ΠΈ массовой (>2 ME-DC-Π³Ρ€ΡƒΠΏΠΏ) нСдоступности Middle-End +- Π°Π²Ρ‚ΠΎΠ΄Π΅Ρ‚Π΅ΠΊΡ‚ IP Π·Π° NAT, ΠΏΡ€ΠΈ возмоТности - Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ Ρ…Π΅Π½Π΄ΡˆΠ΅ΠΉΠΊ с ME, ΠΏΡ€ΠΈ Π½Π΅ΡƒΠ΄Π°Ρ‡Π΅ - автодСградация +- СдинствСнный извСстный ΡΠΏΠ΅Ρ†ΠΈΠ°Π»ΡŒΠ½Ρ‹ΠΉ DC=203 ΡƒΠΆΠ΅ Π΄ΠΎΠ±Π°Π²Π»Π΅Π½ Π² ΠΊΠΎΠ΄: ΠΌΠ΅Π΄ΠΈΠ° Π·Π°Π³Ρ€ΡƒΠΆΠ°ΡŽΡ‚ΡΡ с CDN Π² Direct-DC Ρ€Π΅ΠΆΠΈΠΌΠ΅ -Для использования Π½ΡƒΠΆΠ½ΠΎ: +[Π—Π΄Π΅ΡΡŒ Π²Ρ‹ ΠΌΠΎΠΆΠ΅Ρ‚Π΅ Π½Π°ΠΉΡ‚ΠΈ Ρ€Π΅Π»ΠΈΠ·](https://github.com/telemt/telemt/releases/tag/3.0.3) -1. ВСрсия `telemt` β‰₯3.0.0 -2. Π’Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ любого ΠΈΠ· Π½Π°Π±ΠΎΡ€ΠΎΠ² условий: - - ΠΏΡƒΠ±Π»ΠΈΡ‡Π½Ρ‹ΠΉ IP для исходящих соСдинСний установлСн Π½Π° интСрфСйса инстанса с `telemt` - - Π›Π˜Π‘Πž - - Π²Ρ‹ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚Π΅ NAT 1:1 + Π²ΠΊΠ»ΡŽΡ‡ΠΈΠ»ΠΈ STUN-ΠΏΡ€ΠΎΠ±ΠΈΠ½Π³ -3. Π’ ΠΊΠΎΠ½Ρ„ΠΈΠ³Π΅, Π² сСкции `[general]` ΡƒΠΊΠ°Π·Π°Ρ‚ΡŒ: -```toml -use_middle_proxy = true -``` - -Если условия ΠΈΠ· ΠΏΡƒΠ½ΠΊΡ‚Π° 1 Π½Π΅ Π²Ρ‹ΠΏΠΎΠ»Π½ΡΡŽΡ‚ΡΡ: -1. Π’Ρ‹ΠΊΠ»ΡŽΡ‡ΠΈΡ‚Π΅ ME-Ρ€Π΅ΠΆΠΈΠΌ: - - установитС `use_middle_proxy = false` - - Π›Π˜Π‘Πž - - Middle-End Proxy Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹ΠΊΠ»ΡŽΡ‡Π΅Π½ автоматичСски ΠΏΠΎ Ρ‚Π°ΠΉΠΌΠ°ΡƒΡ‚Ρƒ, Π½ΠΎ это Π·Π°ΠΉΠΌΡ‘Ρ‚ большС Π²Ρ€Π΅ΠΌΠ΅Π½ΠΈ ΠΏΡ€ΠΈ запускС -2. Π’ ΠΊΠΎΠ½Ρ„ΠΈΠ³Π΅, Π΄ΠΎΠ±Π°Π²ΡŒΡ‚Π΅ Π² ΠΊΠΎΠ½Π΅Ρ†: -```toml -[dc_overrides] -"203" = "91.105.192.100:443" -``` - -Если Ρƒ вас Π΅ΡΡ‚ΡŒ ΠΊΠΎΠΌΠΏΠ΅Ρ‚Π΅Π½Ρ†ΠΈΠΈ Π² асинхронных сСтСвых прилоТСниях, Π°Π½Π°Π»ΠΈΠ·Π΅ Ρ‚Ρ€Π°Ρ„ΠΈΠΊΠ°, рСвСрс-ΠΈΠ½ΠΆΠΈΠ½ΠΈΡ€ΠΈΠ½Π³Π΅ ΠΈΠ»ΠΈ сСтСвых расслСдованиях β€” ΠΌΡ‹ ΠΎΡ‚ΠΊΡ€Ρ‹Ρ‚Ρ‹ ΠΊ идСям ΠΈ pull requests. +Если Ρƒ вас Π΅ΡΡ‚ΡŒ ΠΊΠΎΠΌΠΏΠ΅Ρ‚Π΅Π½Ρ†ΠΈΠΈ Π² асинхронных сСтСвых прилоТСниях, Π°Π½Π°Π»ΠΈΠ·Π΅ Ρ‚Ρ€Π°Ρ„ΠΈΠΊΠ°, рСвСрс-ΠΈΠ½ΠΆΠΈΠ½ΠΈΡ€ΠΈΠ½Π³Π΅ ΠΈΠ»ΠΈ сСтСвых расслСдованиях - ΠΌΡ‹ ΠΎΡ‚ΠΊΡ€Ρ‹Ρ‚Ρ‹ ΠΊ идСям ΠΈ pull requests! ### πŸ‡¬πŸ‡§ EN -On February 15, we released `telemt 3` with support for Middle-End Proxy, which means: +On February 18, we released `telemt 3.0.3`. This version introduces: -- functional media, including CDN/DC=203 -- Ad-tag support – promote a sponsored channel and collect statistics via Telegram bot -- new approach to security and asynchronicity -- high-precision cryptography diagnostics via `ME_DIAG` +- improved Middle-End Health Check method +- high-speed recovery of Middle-End init +- reduced latency on the hot path +- correct Dualstack support: proper handling of IPv6 Middle-End +- *clean* client reconnection without session "drift" between Middle-End +- automatic degradation to Direct-DC mode in case of large-scale (>2 ME-DC groups) Middle-End unavailability +- automatic public IP detection behind NAT; first - Middle-End handshake is performed, otherwise automatic degradation is applied +- known special DC=203 is now handled natively: media is delivered from the CDN via Direct-DC mode -To use this feature, the following requirements must be met: -1. `telemt` version β‰₯ 3.0.0 -2. One of the following conditions satisfied: - - the instance running `telemt` has a public IP address assigned to its network interface for outbound connections - - OR - - you are using 1:1 NAT and have STUN probing enabled -3. In the config file, under the `[general]` section, specify: -```toml -use_middle_proxy = true -```` +[Release is available here](https://github.com/telemt/telemt/releases/tag/3.0.3) -If the conditions from step 1 are not satisfied: -1. Disable Middle-End mode: - - set `use_middle_proxy = false` - - OR - - Middle-End Proxy will be disabled automatically after a timeout, but this will increase startup time - -2. In the config file, add the following at the end: -```toml -[dc_overrides] -"203" = "91.105.192.100:443" -``` - -If you have expertise in asynchronous network applications, traffic analysis, reverse engineering, or network forensics β€” we welcome ideas, suggestions, and pull requests. +If you have expertise in asynchronous network applications, traffic analysis, reverse engineering, or network forensics - we welcome ideas and pull requests! From 6ba12f35d02280f122393f4d762ae22878465f0b Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 18 Feb 2026 21:31:58 +0300 Subject: [PATCH 02/22] Update README.md --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 3f80a60..9bde777 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,9 @@ If you have expertise in asynchronous network applications, traffic analysis, re # Features πŸ’₯ The configuration structure has changed since version 1.1.0.0. change it in your environment! -βš“ Our implementation of **TLS-fronting** is one of the most deeply debugged, focused, advanced and *almost* **"behaviorally consistent to real"**: we are confident we have it right - [see evidence on our validation and traces](#recognizability-for-dpi-and-crawler) +βš“ Our implementation of **TLS-fronting** is one of the most deeply debugged, focused, advanced and *almost* **"behaviorally consistent to real"**: we are confident we have it right - [see evidence on our validation and traces](#recognizability-for-dpi-and-crawler) + +βš“ Our ***Middle-End Pool*** is fastest by design in standard scenarios, compared to other implementations of connecting to the Middle-End Proxy # GOTO - [Features](#features) From efba10f83926f7d6e5653b14e05afb8fe07134a0 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 18 Feb 2026 21:34:04 +0300 Subject: [PATCH 03/22] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 9bde777..ff06d3a 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ If you have expertise in asynchronous network applications, traffic analysis, re βš“ Our implementation of **TLS-fronting** is one of the most deeply debugged, focused, advanced and *almost* **"behaviorally consistent to real"**: we are confident we have it right - [see evidence on our validation and traces](#recognizability-for-dpi-and-crawler) -βš“ Our ***Middle-End Pool*** is fastest by design in standard scenarios, compared to other implementations of connecting to the Middle-End Proxy +βš“ Our ***Middle-End Pool*** is fastest by design in standard scenarios, compared to other implementations of connecting to the Middle-End Proxy: non dramatically, but usual # GOTO - [Features](#features) From 4f7f7d6880622b61e8bd068340523ee688398d8f Mon Sep 17 00:00:00 2001 From: Vladislav Yaroslavlev Date: Wed, 18 Feb 2026 21:49:42 +0300 Subject: [PATCH 04/22] fix(ci): replace deprecated actions-rs/cargo with direct cross commands The actions-rs organization has been archived and is no longer available. Replace the deprecated action with direct cross installation and build commands. --- .github/workflows/release.yml | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 05e334a..3296c42 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -56,12 +56,11 @@ jobs: restore-keys: | ${{ runner.os }}-${{ matrix.target }}-cargo- + - name: Install cross + run: cargo install cross --git https://github.com/cross-rs/cross + - name: Build Release - uses: actions-rs/cargo@ae10961054e4aa8bff448f48a500763b90d5c550 # v1.0.1 - with: - use-cross: true - command: build - args: --release --target ${{ matrix.target }} + run: cross build --release --target ${{ matrix.target }} - name: Package binary run: | From c5b590062c06d47cd95b2e7dc54db5fbc6a58260 Mon Sep 17 00:00:00 2001 From: Vladislav Yaroslavlev Date: Wed, 18 Feb 2026 21:49:42 +0300 Subject: [PATCH 05/22] fix(ci): replace deprecated actions-rs/cargo with direct cross commands The actions-rs organization has been archived and is no longer available. Replace the deprecated action with direct cross installation and build commands. --- .github/workflows/release.yml | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 05e334a..9827074 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -56,12 +56,11 @@ jobs: restore-keys: | ${{ runner.os }}-${{ matrix.target }}-cargo- + - name: Install cross + run: cargo install cross --git https://github.com/cross-rs/cross + - name: Build Release - uses: actions-rs/cargo@ae10961054e4aa8bff448f48a500763b90d5c550 # v1.0.1 - with: - use-cross: true - command: build - args: --release --target ${{ matrix.target }} + run: cross build --release --target ${{ matrix.target }} - name: Package binary run: | @@ -91,7 +90,7 @@ jobs: path: artifacts - name: Create Release - uses: softprops/action-gh-release@c95fe1489396fe360a41fb53f90de6ddce8c4c8a # v2.2.1 + uses: softprops/action-gh-release@v2 with: files: artifacts/**/* generate_release_notes: true From 36ef2f722d85d353d4673a860f9d3b31f7f65907 Mon Sep 17 00:00:00 2001 From: Vladislav Yaroslavlev Date: Wed, 18 Feb 2026 22:46:45 +0300 Subject: [PATCH 06/22] release changes package version --- .github/workflows/release.yml | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 9827074..ec0e68d 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -84,11 +84,40 @@ jobs: contents: write steps: + - name: Checkout repository + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + fetch-depth: 0 + token: ${{ secrets.GITHUB_TOKEN }} + - name: Download all artifacts uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8 with: path: artifacts + - name: Update version in Cargo.toml and Cargo.lock + run: | + # Extract version from tag (remove 'v' prefix if present) + VERSION="${GITHUB_REF#refs/tags/}" + VERSION="${VERSION#v}" + + # Install cargo-edit for version bumping + cargo install cargo-edit + + # Update Cargo.toml version + cargo set-version "$VERSION" + + # Configure git + git config user.name "github-actions[bot]" + git config user.email "github-actions[bot]@users.noreply.github.com" + + # Commit and push changes + git add Cargo.toml Cargo.lock + git commit -m "chore: bump version to $VERSION" || echo "No changes to commit" + git push origin HEAD:main + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - name: Create Release uses: softprops/action-gh-release@v2 with: From c7464d53e1e8c19d88d4853e81d057d825771f25 Mon Sep 17 00:00:00 2001 From: unuunn <62112339+unuunn@users.noreply.github.com> Date: Tue, 17 Feb 2026 13:14:50 +0300 Subject: [PATCH 07/22] feat: implement selective routing for "scope_*" users - Users with "scope_{name}" prefix are routed to upstreams where {name} is present in the "scopes" property (comma-separated). - Strict separation: Scoped upstreams are excluded from general routing, and vice versa. - Constraint: SOCKS upstreams and DIRECT(`use_middle_proxy = false`) mode only. Example: User "scope_hello" matches an upstream with `scopes = "world,hello"` --- src/config/load.rs | 2 ++ src/config/types.rs | 4 +++ src/proxy/direct_relay.rs | 2 +- src/transport/upstream.rs | 60 ++++++++++++++++++++++++++++++++------- 4 files changed, 57 insertions(+), 11 deletions(-) diff --git a/src/config/load.rs b/src/config/load.rs index 512b734..4f00f77 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -208,6 +208,8 @@ impl ProxyConfig { upstream_type: UpstreamType::Direct { interface: None }, weight: 1, enabled: true, + scopes: String::new(), + selected_scope: String::new(), }); } diff --git a/src/config/types.rs b/src/config/types.rs index ef8fef7..766851b 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -403,6 +403,10 @@ pub struct UpstreamConfig { pub weight: u16, #[serde(default = "default_true")] pub enabled: bool, + #[serde(default)] + pub scopes: String, + #[serde(skip)] + pub selected_scope: String, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index 537a93e..7b1ac1b 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -45,7 +45,7 @@ where ); let tg_stream = upstream_manager - .connect(dc_addr, Some(success.dc_idx)) + .connect(dc_addr, Some(success.dc_idx), user.strip_prefix("scope_").filter(|s| !s.is_empty())) .await?; debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected, performing TG handshake"); diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index bf73408..8fdd437 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -167,20 +167,44 @@ impl UpstreamManager { } /// Select upstream using latency-weighted random selection. - async fn select_upstream(&self, dc_idx: Option) -> Option { + async fn select_upstream(&self, dc_idx: Option, scope: Option<&str>) -> Option { let upstreams = self.upstreams.read().await; if upstreams.is_empty() { return None; } - - let healthy: Vec = upstreams.iter() + // Scope filter: + // If scope is set: only scoped and matched items + // If scope is not set: only unscoped items + let filtered_upstreams : Vec = upstreams.iter() .enumerate() - .filter(|(_, u)| u.healthy) + .filter(|(_, u)| { + scope.map_or( + u.config.scopes.is_empty(), + |req_scope| { + u.config.scopes + .split(',') + .map(str::trim) + .any(|s| s == req_scope) + } + ) + }) .map(|(i, _)| i) .collect(); + // Healthy filter + let healthy: Vec = filtered_upstreams.iter() + .filter(|&&i| upstreams[i].healthy) + .copied() + .collect(); + + if filtered_upstreams.is_empty() { + warn!(scope = scope, "No upstreams available! Using first (direct?)"); + return None; + } + if healthy.is_empty() { - return Some(rand::rng().gen_range(0..upstreams.len())); + warn!(scope = scope, "No healthy upstreams available! Using random."); + return Some(filtered_upstreams[rand::rng().gen_range(0..filtered_upstreams.len())]); } if healthy.len() == 1 { @@ -222,15 +246,20 @@ impl UpstreamManager { } /// Connect to target through a selected upstream. - pub async fn connect(&self, target: SocketAddr, dc_idx: Option) -> Result { - let idx = self.select_upstream(dc_idx).await + pub async fn connect(&self, target: SocketAddr, dc_idx: Option, scope: Option<&str>) -> Result { + let idx = self.select_upstream(dc_idx, scope).await .ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?; - let upstream = { + let mut upstream = { let guard = self.upstreams.read().await; guard[idx].config.clone() }; + // Set scope for configuration copy + if let Some(s) = scope { + upstream.selected_scope = s.to_string(); + } + let start = Instant::now(); match self.connect_via_upstream(&upstream, target).await { @@ -313,8 +342,12 @@ impl UpstreamManager { if let Some(e) = stream.take_error()? { return Err(ProxyError::Io(e)); } + // replace socks user_id with config.selected_scope, if set + let scope: Option<&str> = Some(config.selected_scope.as_str()) + .filter(|s| !s.is_empty()); + let _user_id: Option<&str> = scope.or(user_id.as_deref()); - connect_socks4(&mut stream, target, user_id.as_deref()).await?; + connect_socks4(&mut stream, target, _user_id).await?; Ok(stream) }, UpstreamType::Socks5 { address, interface, username, password } => { @@ -341,7 +374,14 @@ impl UpstreamManager { return Err(ProxyError::Io(e)); } - connect_socks5(&mut stream, target, username.as_deref(), password.as_deref()).await?; + debug!(config = ?config, "Socks5 connection"); + // replace socks user:pass with config.selected_scope, if set + let scope: Option<&str> = Some(config.selected_scope.as_str()) + .filter(|s| !s.is_empty()); + let _username: Option<&str> = scope.or(username.as_deref()); + let _password: Option<&str> = scope.or(password.as_deref()); + + connect_socks5(&mut stream, target, _username, _password).await?; Ok(stream) }, } From e54dce53663cbe3291dfbcd0d8af97d8caa45113 Mon Sep 17 00:00:00 2001 From: ivulit Date: Wed, 18 Feb 2026 23:22:31 +0300 Subject: [PATCH 08/22] Handle IPv6 ENETUNREACH in STUN probe gracefully When IPv6 is unavailable on the host, treat NetworkUnreachable at connect() as Ok(None) instead of propagating an error, so the dual STUN probe succeeds with just the IPv4 result and no spurious WARN. --- src/network/stun.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/network/stun.rs b/src/network/stun.rs index 251454e..6a93339 100644 --- a/src/network/stun.rs +++ b/src/network/stun.rs @@ -50,10 +50,17 @@ pub async fn stun_probe_family(stun_addr: &str, family: IpFamily) -> Result {} + Err(e) if family == IpFamily::V6 && matches!( + e.kind(), + std::io::ErrorKind::NetworkUnreachable + | std::io::ErrorKind::HostUnreachable + | std::io::ErrorKind::Unsupported + | std::io::ErrorKind::NetworkDown + ) => return Ok(None), + Err(e) => return Err(ProxyError::Proxy(format!("STUN connect failed: {e}"))), + } } else { return Ok(None); } From e76b388a05a548c993e8b44f159d7a19ecc52e49 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Feb 2026 02:49:08 +0300 Subject: [PATCH 09/22] Create CONTRIBUTING.md --- CONTRIBUTING.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 CONTRIBUTING.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..ea9b99e --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,5 @@ +## Pull Requests - Rules +- ONLY signed and verified commits +- ONLY from your name +- DO NOT commit `codex` or `claude` code +- PREFER `flow` branch for development, not `main` From 76bf5337e824f1d2c65bb8f6d600e33845d447b1 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Feb 2026 02:49:38 +0300 Subject: [PATCH 10/22] Update CONTRIBUTING.md --- CONTRIBUTING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index ea9b99e..d1f22c1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,5 +1,5 @@ ## Pull Requests - Rules - ONLY signed and verified commits - ONLY from your name -- DO NOT commit `codex` or `claude` code +- DO NOT commit with `codex` or `claude` as author/commiter - PREFER `flow` branch for development, not `main` From 76a02610d89f8b0b43f7eeab072c2015a3e24a48 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Feb 2026 03:00:04 +0300 Subject: [PATCH 11/22] Create LICENSING.md Drafting licensing... --- LICENSING.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 LICENSING.md diff --git a/LICENSING.md b/LICENSING.md new file mode 100644 index 0000000..9b09f4a --- /dev/null +++ b/LICENSING.md @@ -0,0 +1,17 @@ +# LICENSING +## Licenses for Versions +| Version | License | +|---------|---------------| +| 1.0 | NO LICNESE | +| 1.1 | NO LICENSE | +| 1.2 | NO LICENSE | +| 2.0 | NO LICENSE | +| 3.0 | TELEMT UL 1 | + +### License Types +- **NO LICENSE** = ***ALL RIGHT RESERVED*** +- **TELEMT UL1** - work in progress license for source code of `telemt`, which encourages: + - fair use, + - contributions, + - distribution, + - but prohibits NOT mentioning the authors From 301f829c3c2956c6117ddd1932f54a05d86d54c1 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Feb 2026 03:00:47 +0300 Subject: [PATCH 12/22] Update LICENSING.md --- LICENSING.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/LICENSING.md b/LICENSING.md index 9b09f4a..50d007b 100644 --- a/LICENSING.md +++ b/LICENSING.md @@ -11,7 +11,7 @@ ### License Types - **NO LICENSE** = ***ALL RIGHT RESERVED*** - **TELEMT UL1** - work in progress license for source code of `telemt`, which encourages: - - fair use, - - contributions, - - distribution, - - but prohibits NOT mentioning the authors + - fair use, + - contributions, + - distribution, + - but prohibits NOT mentioning the authors From b11a7677416430ffe2dd83dbbd08c078c39d7a91 Mon Sep 17 00:00:00 2001 From: Vladislav Yaroslavlev Date: Thu, 19 Feb 2026 09:43:31 +0300 Subject: [PATCH 13/22] ci: add musl build targets for static Linux binaries --- .github/workflows/release.yml | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ec0e68d..bbf1b00 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -25,10 +25,16 @@ jobs: include: - target: x86_64-unknown-linux-gnu artifact_name: telemt - asset_name: telemt-x86_64-linux + asset_name: telemt-x86_64-linux-gnu - target: aarch64-unknown-linux-gnu artifact_name: telemt - asset_name: telemt-aarch64-linux + asset_name: telemt-aarch64-linux-gnu + - target: x86_64-unknown-linux-musl + artifact_name: telemt + asset_name: telemt-x86_64-linux-musl + - target: aarch64-unknown-linux-musl + artifact_name: telemt + asset_name: telemt-aarch64-linux-musl steps: - name: Checkout repository @@ -60,6 +66,8 @@ jobs: run: cargo install cross --git https://github.com/cross-rs/cross - name: Build Release + env: + RUSTFLAGS: ${{ contains(matrix.target, 'musl') && '-C target-feature=+crt-static' || '' }} run: cross build --release --target ${{ matrix.target }} - name: Package binary From d941873cceff5443c5b67c6a62233ee6678ecf3f Mon Sep 17 00:00:00 2001 From: Vladislav Yaroslavlev Date: Thu, 19 Feb 2026 10:15:03 +0300 Subject: [PATCH 14/22] docs: Document disable_colors configuration parameter --- README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ff06d3a..cb7cd49 100644 --- a/README.md +++ b/README.md @@ -183,6 +183,7 @@ prefer_ipv6 = false fast_mode = true use_middle_proxy = false # ad_tag = "..." +# disable_colors = false # Disable colored output in logs (useful for files/systemd) [network] ipv4 = true @@ -215,7 +216,9 @@ ip = "::" # Users to show in the startup log (tg:// links) [general.links] -show = ["hello"] # Users to show in the startup log (tg:// links) +show = ["hello"] # Only show links for user "hello" +# show = ["alice", "bob"] # Only show links for alice and bob +# show = "*" # Show links for all users # public_host = "proxy.example.com" # Host (IP or domain) for tg:// links # public_port = 443 # Port for tg:// links (default: server.port) From f31d9d42feecb157e8a01d8eb4eff9b4d0a5a86c Mon Sep 17 00:00:00 2001 From: Vladislav Yaroslavlev Date: Thu, 19 Feb 2026 10:23:49 +0300 Subject: [PATCH 15/22] feat: Add -V/--version flag to print version string Closes #156 - Add handling for -V and --version arguments in CLI parser - Print version to stdout using CARGO_PKG_VERSION from Cargo.toml - Update help text to include version option --- src/main.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main.rs b/src/main.rs index 2660213..7663fab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -74,6 +74,7 @@ fn parse_cli() -> (String, bool, Option) { eprintln!("Options:"); eprintln!(" --silent, -s Suppress info logs"); eprintln!(" --log-level debug|verbose|normal|silent"); + eprintln!(" --version, -V Print version information"); eprintln!(" --help, -h Show this help"); eprintln!(); eprintln!("Setup (fire-and-forget):"); @@ -92,6 +93,10 @@ fn parse_cli() -> (String, bool, Option) { eprintln!(" --no-start Don't start the service after install"); std::process::exit(0); } + "--version" | "-V" => { + println!("telemt {}", env!("CARGO_PKG_VERSION")); + std::process::exit(0); + } s if !s.starts_with('-') => { config_path = s.to_string(); } From 35ae455e2b6fd56f4312d7d968df38c9a37663a5 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Feb 2026 13:35:56 +0300 Subject: [PATCH 16/22] ME Pool V2 Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/defaults.rs | 4 + src/config/load.rs | 38 +++++- src/config/types.rs | 15 +++ src/crypto/random.rs | 5 +- src/main.rs | 21 ++- src/transport/middle_proxy/codec.rs | 7 + src/transport/middle_proxy/config_updater.rs | 35 ++++- src/transport/middle_proxy/pool.rs | 133 ++++++++++++++++--- src/transport/middle_proxy/pool_nat.rs | 86 ++++++------ src/transport/middle_proxy/reader.rs | 30 +++-- src/transport/middle_proxy/registry.rs | 12 +- src/transport/middle_proxy/rotation.rs | 13 +- src/transport/middle_proxy/send.rs | 81 ++++++----- 13 files changed, 343 insertions(+), 137 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 3f8254c..c11e738 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -74,6 +74,10 @@ pub(crate) fn default_unknown_dc_log_path() -> Option { Some("unknown-dc.txt".to_string()) } +pub(crate) fn default_pool_size() -> usize { + 2 +} + // Custom deserializer helpers #[derive(Deserialize)] diff --git a/src/config/load.rs b/src/config/load.rs index 4f00f77..a2fc19b 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -11,6 +11,32 @@ use crate::error::{ProxyError, Result}; use super::defaults::*; use super::types::*; +fn preprocess_includes(content: &str, base_dir: &Path, depth: u8) -> Result { + if depth > 10 { + return Err(ProxyError::Config("Include depth > 10".into())); + } + let mut output = String::with_capacity(content.len()); + for line in content.lines() { + let trimmed = line.trim(); + if let Some(rest) = trimmed.strip_prefix("include") { + let rest = rest.trim(); + if let Some(rest) = rest.strip_prefix('=') { + let path_str = rest.trim().trim_matches('"'); + let resolved = base_dir.join(path_str); + let included = std::fs::read_to_string(&resolved) + .map_err(|e| ProxyError::Config(e.to_string()))?; + let included_dir = resolved.parent().unwrap_or(base_dir); + output.push_str(&preprocess_includes(&included, included_dir, depth + 1)?); + output.push('\n'); + continue; + } + } + output.push_str(line); + output.push('\n'); + } + Ok(output) +} + fn validate_network_cfg(net: &mut NetworkConfig) -> Result<()> { if !net.ipv4 && matches!(net.ipv6, Some(false)) { return Err(ProxyError::Config( @@ -84,10 +110,12 @@ pub struct ProxyConfig { impl ProxyConfig { pub fn load>(path: P) -> Result { let content = - std::fs::read_to_string(path).map_err(|e| ProxyError::Config(e.to_string()))?; + std::fs::read_to_string(&path).map_err(|e| ProxyError::Config(e.to_string()))?; + let base_dir = path.as_ref().parent().unwrap_or(Path::new(".")); + let processed = preprocess_includes(&content, base_dir, 0)?; let mut config: ProxyConfig = - toml::from_str(&content).map_err(|e| ProxyError::Config(e.to_string()))?; + toml::from_str(&processed).map_err(|e| ProxyError::Config(e.to_string()))?; // Validate secrets. for (user, secret) in &config.access.users { @@ -151,8 +179,10 @@ impl ProxyConfig { validate_network_cfg(&mut config.network)?; - // Random fake_cert_len. - config.censorship.fake_cert_len = rand::rng().gen_range(1024..4096); + // Random fake_cert_len only when default is in use. + if config.censorship.fake_cert_len == default_fake_cert_len() { + config.censorship.fake_cert_len = rand::rng().gen_range(1024..4096); + } // Resolve listen_tcp: explicit value wins, otherwise auto-detect. // If unix socket is set β†’ TCP only when listen_addr_ipv4 or listeners are explicitly provided. diff --git a/src/config/types.rs b/src/config/types.rs index 766851b..1cdb1bf 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -143,6 +143,18 @@ pub struct GeneralConfig { #[serde(default)] pub middle_proxy_nat_stun: Option, + /// Optional list of STUN servers for NAT probing fallback. + #[serde(default)] + pub middle_proxy_nat_stun_servers: Vec, + + /// Desired size of active Middle-Proxy writer pool. + #[serde(default = "default_pool_size")] + pub middle_proxy_pool_size: usize, + + /// Number of warm standby ME connections kept pre-initialized. + #[serde(default)] + pub middle_proxy_warm_standby: usize, + /// Ignore STUN/interface IP mismatch (keep using Middle Proxy even if NAT detected). #[serde(default)] pub stun_iface_mismatch_ignore: bool, @@ -175,6 +187,9 @@ impl Default for GeneralConfig { middle_proxy_nat_ip: None, middle_proxy_nat_probe: false, middle_proxy_nat_stun: None, + middle_proxy_nat_stun_servers: Vec::new(), + middle_proxy_pool_size: default_pool_size(), + middle_proxy_warm_standby: 0, stun_iface_mismatch_ignore: false, unknown_dc_log_path: default_unknown_dc_log_path(), log_level: LogLevel::Normal, diff --git a/src/crypto/random.rs b/src/crypto/random.rs index 18862ab..99aa5f3 100644 --- a/src/crypto/random.rs +++ b/src/crypto/random.rs @@ -11,6 +11,9 @@ pub struct SecureRandom { inner: Mutex, } +unsafe impl Send for SecureRandom {} +unsafe impl Sync for SecureRandom {} + struct SecureRandomInner { rng: StdRng, cipher: AesCtr, @@ -211,4 +214,4 @@ mod tests { assert_ne!(shuffled, original); } -} \ No newline at end of file +} diff --git a/src/main.rs b/src/main.rs index 7663fab..33aefcb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -74,7 +74,6 @@ fn parse_cli() -> (String, bool, Option) { eprintln!("Options:"); eprintln!(" --silent, -s Suppress info logs"); eprintln!(" --log-level debug|verbose|normal|silent"); - eprintln!(" --version, -V Print version information"); eprintln!(" --help, -h Show this help"); eprintln!(); eprintln!("Setup (fire-and-forget):"); @@ -111,18 +110,20 @@ fn parse_cli() -> (String, bool, Option) { } fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { - info!("--- Proxy Links ({}) ---", host); + info!(target: "telemt::links", "--- Proxy Links ({}) ---", host); for user_name in config.general.links.show.resolve_users(&config.access.users) { if let Some(secret) = config.access.users.get(user_name) { - info!("User: {}", user_name); + info!(target: "telemt::links", "User: {}", user_name); if config.general.modes.classic { info!( + target: "telemt::links", " Classic: tg://proxy?server={}&port={}&secret={}", host, port, secret ); } if config.general.modes.secure { info!( + target: "telemt::links", " DD: tg://proxy?server={}&port={}&secret=dd{}", host, port, secret ); @@ -130,15 +131,16 @@ fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { if config.general.modes.tls { let domain_hex = hex::encode(&config.censorship.tls_domain); info!( + target: "telemt::links", " EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}", host, port, secret, domain_hex ); } } else { - warn!("User '{}' in show_link not found", user_name); + warn!(target: "telemt::links", "User '{}' in show_link not found", user_name); } } - info!("------------------------"); + info!(target: "telemt::links", "------------------------"); } #[tokio::main] @@ -322,6 +324,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai config.general.middle_proxy_nat_ip, config.general.middle_proxy_nat_probe, config.general.middle_proxy_nat_stun.clone(), + config.general.middle_proxy_nat_stun_servers.clone(), probe.detected_ipv6, config.timeouts.me_one_retry, config.timeouts.me_one_timeout_ms, @@ -332,16 +335,18 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai rng.clone(), ); - match pool.init(2, &rng).await { + let pool_size = config.general.middle_proxy_pool_size.max(1); + match pool.init(pool_size, &rng).await { Ok(()) => { info!("Middle-End pool initialized successfully"); // Phase 4: Start health monitor let pool_clone = pool.clone(); let rng_clone = rng.clone(); + let min_conns = pool_size; tokio::spawn(async move { crate::transport::middle_proxy::me_health_monitor( - pool_clone, rng_clone, 2, + pool_clone, rng_clone, min_conns, ) .await; }); @@ -745,6 +750,8 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai // Switch to user-configured log level after startup let runtime_filter = if has_rust_log { EnvFilter::from_default_env() + } else if matches!(effective_log_level, LogLevel::Silent) { + EnvFilter::new("warn,telemt::links=info") } else { EnvFilter::new(effective_log_level.to_filter_str()) }; diff --git a/src/transport/middle_proxy/codec.rs b/src/transport/middle_proxy/codec.rs index cc33ae8..12efc45 100644 --- a/src/transport/middle_proxy/codec.rs +++ b/src/transport/middle_proxy/codec.rs @@ -4,6 +4,13 @@ use crate::crypto::{AesCbc, crc32}; use crate::error::{ProxyError, Result}; use crate::protocol::constants::*; +/// Commands sent to dedicated writer tasks to avoid mutex contention on TCP writes. +pub(crate) enum WriterCommand { + Data(Vec), + DataAndFlush(Vec), + Close, +} + pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8]) -> Vec { let total_len = (4 + 4 + payload.len() + 4) as u32; let mut frame = Vec::with_capacity(total_len as usize); diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 3c36820..d2bb550 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -13,6 +13,24 @@ use super::secret::download_proxy_secret; use crate::crypto::SecureRandom; use std::time::SystemTime; +async fn retry_fetch(url: &str) -> Option { + let delays = [1u64, 5, 15]; + for (i, d) in delays.iter().enumerate() { + match fetch_proxy_config(url).await { + Ok(cfg) => return Some(cfg), + Err(e) => { + if i == delays.len() - 1 { + warn!(error = %e, url, "fetch_proxy_config failed"); + } else { + debug!(error = %e, url, "fetch_proxy_config retrying"); + tokio::time::sleep(Duration::from_secs(*d)).await; + } + } + } + } + None +} + #[derive(Debug, Clone, Default)] pub struct ProxyConfigData { pub map: HashMap>, @@ -118,7 +136,8 @@ pub async fn me_config_updater(pool: Arc, rng: Arc, interv tick.tick().await; // Update proxy config v4 - if let Ok(cfg) = fetch_proxy_config("https://core.telegram.org/getProxyConfig").await { + let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await; + if let Some(cfg) = cfg_v4 { let changed = pool.update_proxy_maps(cfg.map.clone(), None).await; if let Some(dc) = cfg.default_dc { pool.default_dc.store(dc, std::sync::atomic::Ordering::Relaxed); @@ -129,14 +148,20 @@ pub async fn me_config_updater(pool: Arc, rng: Arc, interv } else { debug!("ME config v4 unchanged"); } - } else { - warn!("getProxyConfig update failed"); } // Update proxy config v6 (optional) - if let Ok(cfg_v6) = fetch_proxy_config("https://core.telegram.org/getProxyConfigV6").await { - let _ = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await; + let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await; + if let Some(cfg_v6) = cfg_v6 { + let changed = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await; + if changed { + info!("ME config updated (v6), reconciling connections"); + pool.reconcile_connections(&rng).await; + } else { + debug!("ME config v6 unchanged"); + } } + pool.reset_stun_state(); // Update proxy-secret match download_proxy_secret().await { diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 00041fd..e860dc8 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -1,14 +1,14 @@ use std::collections::HashMap; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicUsize, Ordering}; use bytes::BytesMut; use rand::Rng; use rand::seq::SliceRandom; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{Mutex, RwLock, mpsc, Notify}; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; -use std::time::Duration; +use std::time::{Duration, Instant}; use crate::crypto::SecureRandom; use crate::error::{ProxyError, Result}; @@ -18,7 +18,7 @@ use crate::protocol::constants::*; use super::ConnRegistry; use super::registry::{BoundConn, ConnMeta}; -use super::codec::RpcWriter; +use super::codec::{RpcWriter, WriterCommand}; use super::reader::reader_loop; use super::MeResponse; @@ -29,7 +29,7 @@ const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; pub struct MeWriter { pub id: u64, pub addr: SocketAddr, - pub writer: Arc>, + pub tx: mpsc::Sender, pub cancel: CancellationToken, pub degraded: Arc, pub draining: Arc, @@ -47,9 +47,11 @@ pub struct MePool { pub(super) nat_ip_detected: Arc>>, pub(super) nat_probe: bool, pub(super) nat_stun: Option, + pub(super) nat_stun_servers: Vec, pub(super) detected_ipv6: Option, pub(super) nat_probe_attempts: std::sync::atomic::AtomicU8, pub(super) nat_probe_disabled: std::sync::atomic::AtomicBool, + pub(super) stun_backoff_until: Arc>>, pub(super) me_one_retry: u8, pub(super) me_one_timeout: Duration, pub(super) proxy_map_v4: Arc>>>, @@ -59,6 +61,8 @@ pub struct MePool { pub(super) ping_tracker: Arc>>, pub(super) rtt_stats: Arc>>, pub(super) nat_reflection_cache: Arc>, + pub(super) writer_available: Arc, + pub(super) conn_count: AtomicUsize, pool_size: usize, } @@ -75,6 +79,7 @@ impl MePool { nat_ip: Option, nat_probe: bool, nat_stun: Option, + nat_stun_servers: Vec, detected_ipv6: Option, me_one_retry: u8, me_one_timeout_ms: u64, @@ -96,9 +101,11 @@ impl MePool { nat_ip_detected: Arc::new(RwLock::new(None)), nat_probe, nat_stun, + nat_stun_servers, detected_ipv6, nat_probe_attempts: std::sync::atomic::AtomicU8::new(0), nat_probe_disabled: std::sync::atomic::AtomicBool::new(false), + stun_backoff_until: Arc::new(RwLock::new(None)), me_one_retry, me_one_timeout: Duration::from_millis(me_one_timeout_ms), pool_size: 2, @@ -109,6 +116,8 @@ impl MePool { ping_tracker: Arc::new(Mutex::new(HashMap::new())), rtt_stats: Arc::new(Mutex::new(HashMap::new())), nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())), + writer_available: Arc::new(Notify::new()), + conn_count: AtomicUsize::new(0), }) } @@ -116,6 +125,11 @@ impl MePool { self.proxy_tag.is_some() } + pub fn reset_stun_state(&self) { + self.nat_probe_attempts.store(0, Ordering::Relaxed); + self.nat_probe_disabled.store(false, Ordering::Relaxed); + } + pub fn translate_our_addr(&self, addr: SocketAddr) -> SocketAddr { let ip = self.translate_ip_for_nat(addr.ip()); SocketAddr::new(ip, addr.port()) @@ -132,7 +146,11 @@ impl MePool { pub async fn reconcile_connections(self: &Arc, rng: &SecureRandom) { use std::collections::HashSet; let writers = self.writers.read().await; - let current: HashSet = writers.iter().map(|w| w.addr).collect(); + let current: HashSet = writers + .iter() + .filter(|w| !w.draining.load(Ordering::Relaxed)) + .map(|w| w.addr) + .collect(); drop(writers); for family in self.family_order() { @@ -175,12 +193,36 @@ impl MePool { let mut guard = self.proxy_map_v6.write().await; if !v6.is_empty() && *guard != v6 { *guard = v6; + changed = true; + } + } + // Ensure negative DC entries mirror positives when absent (Telegram convention). + { + let mut guard = self.proxy_map_v4.write().await; + let keys: Vec = guard.keys().cloned().collect(); + for k in keys.iter().cloned().filter(|k| *k > 0) { + if !guard.contains_key(&-k) { + if let Some(addrs) = guard.get(&k).cloned() { + guard.insert(-k, addrs); + } + } + } + } + { + let mut guard = self.proxy_map_v6.write().await; + let keys: Vec = guard.keys().cloned().collect(); + for k in keys.iter().cloned().filter(|k| *k > 0) { + if !guard.contains_key(&-k) { + if let Some(addrs) = guard.get(&k).cloned() { + guard.insert(-k, addrs); + } + } } } changed } - pub async fn update_secret(&self, new_secret: Vec) -> bool { + pub async fn update_secret(self: &Arc, new_secret: Vec) -> bool { if new_secret.len() < 32 { warn!(len = new_secret.len(), "proxy-secret update ignored (too short)"); return false; @@ -195,10 +237,14 @@ impl MePool { false } - pub async fn reconnect_all(&self) { - // Graceful: do not drop all at once. New connections will use updated secret. - // Existing writers remain until health monitor replaces them. - // No-op here to avoid total outage. + pub async fn reconnect_all(self: &Arc) { + let ws = self.writers.read().await.clone(); + for w in ws { + if let Ok(()) = self.connect_one(w.addr, self.rng.as_ref()).await { + self.mark_writer_draining(w.id).await; + tokio::time::sleep(Duration::from_secs(2)).await; + } + } } pub(super) async fn key_selector(&self) -> u32 { @@ -317,21 +363,43 @@ impl MePool { let cancel = CancellationToken::new(); let degraded = Arc::new(AtomicBool::new(false)); let draining = Arc::new(AtomicBool::new(false)); - let rpc_w = Arc::new(Mutex::new(RpcWriter { + let (tx, mut rx) = mpsc::channel::(4096); + let mut rpc_writer = RpcWriter { writer: hs.wr, key: hs.write_key, iv: hs.write_iv, seq_no: 0, - })); + }; + let cancel_wr = cancel.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + cmd = rx.recv() => { + match cmd { + Some(WriterCommand::Data(payload)) => { + if rpc_writer.send(&payload).await.is_err() { break; } + } + Some(WriterCommand::DataAndFlush(payload)) => { + if rpc_writer.send_and_flush(&payload).await.is_err() { break; } + } + Some(WriterCommand::Close) | None => break, + } + } + _ = cancel_wr.cancelled() => break, + } + } + }); let writer = MeWriter { id: writer_id, addr, - writer: rpc_w.clone(), + tx: tx.clone(), cancel: cancel.clone(), degraded: degraded.clone(), draining: draining.clone(), }; self.writers.write().await.push(writer.clone()); + self.conn_count.fetch_add(1, Ordering::Relaxed); + self.writer_available.notify_waiters(); let reg = self.registry.clone(); let writers_arc = self.writers_arc(); @@ -339,8 +407,11 @@ impl MePool { let rtt_stats = self.rtt_stats.clone(); let pool = Arc::downgrade(self); let cancel_ping = cancel.clone(); - let rpc_w_ping = rpc_w.clone(); + let tx_ping = tx.clone(); let ping_tracker_ping = ping_tracker.clone(); + let cleanup_done = Arc::new(AtomicBool::new(false)); + let cleanup_for_reader = cleanup_done.clone(); + let cleanup_for_ping = cleanup_done.clone(); tokio::spawn(async move { let cancel_reader = cancel.clone(); @@ -351,7 +422,7 @@ impl MePool { reg.clone(), BytesMut::new(), BytesMut::new(), - rpc_w.clone(), + tx.clone(), ping_tracker.clone(), rtt_stats.clone(), writer_id, @@ -360,7 +431,12 @@ impl MePool { ) .await; if let Some(pool) = pool.upgrade() { - pool.remove_writer_and_close_clients(writer_id).await; + if cleanup_for_reader + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() + { + pool.remove_writer_and_close_clients(writer_id).await; + } } if let Err(e) = res { warn!(error = %e, "ME reader ended"); @@ -389,14 +465,20 @@ impl MePool { p.extend_from_slice(&sent_id.to_le_bytes()); { let mut tracker = ping_tracker_ping.lock().await; + tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120)); tracker.insert(sent_id, (std::time::Instant::now(), writer_id)); } ping_id = ping_id.wrapping_add(1); - if let Err(e) = rpc_w_ping.lock().await.send_and_flush(&p).await { - debug!(error = %e, "Active ME ping failed, removing dead writer"); + if tx_ping.send(WriterCommand::DataAndFlush(p)).await.is_err() { + debug!("Active ME ping failed, removing dead writer"); cancel_ping.cancel(); if let Some(pool) = pool_ping.upgrade() { - pool.remove_writer_and_close_clients(writer_id).await; + if cleanup_for_ping + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() + { + pool.remove_writer_and_close_clients(writer_id).await; + } } break; } @@ -430,7 +512,7 @@ impl MePool { false } - pub(crate) async fn remove_writer_and_close_clients(&self, writer_id: u64) { + pub(crate) async fn remove_writer_and_close_clients(self: &Arc, writer_id: u64) { let conns = self.remove_writer_only(writer_id).await; for bound in conns { let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await; @@ -444,8 +526,11 @@ impl MePool { if let Some(pos) = ws.iter().position(|w| w.id == writer_id) { let w = ws.remove(pos); w.cancel.cancel(); + let _ = w.tx.send(WriterCommand::Close).await; + self.conn_count.fetch_sub(1, Ordering::Relaxed); } } + self.rtt_stats.lock().await.remove(&writer_id); self.registry.writer_lost(writer_id).await } @@ -459,8 +544,14 @@ impl MePool { let pool = Arc::downgrade(self); tokio::spawn(async move { + let deadline = Instant::now() + Duration::from_secs(300); loop { if let Some(p) = pool.upgrade() { + if Instant::now() >= deadline { + warn!(writer_id, "Drain timeout, force-closing"); + let _ = p.remove_writer_and_close_clients(writer_id).await; + break; + } if p.registry.is_writer_empty(writer_id).await { let _ = p.remove_writer_only(writer_id).await; break; diff --git a/src/transport/middle_proxy/pool_nat.rs b/src/transport/middle_proxy/pool_nat.rs index 1b4f7c7..d3dec16 100644 --- a/src/transport/middle_proxy/pool_nat.rs +++ b/src/transport/middle_proxy/pool_nat.rs @@ -98,16 +98,18 @@ impl MePool { family: IpFamily, ) -> Option { const STUN_CACHE_TTL: Duration = Duration::from_secs(600); - // If STUN probing was disabled after attempts, reuse cached (even stale) or skip. - if self.nat_probe_disabled.load(std::sync::atomic::Ordering::Relaxed) { - if let Ok(cache) = self.nat_reflection_cache.try_lock() { - let slot = match family { - IpFamily::V4 => cache.v4, - IpFamily::V6 => cache.v6, - }; - return slot.map(|(_, addr)| addr); + // Backoff window + if let Some(until) = *self.stun_backoff_until.read().await { + if Instant::now() < until { + if let Ok(cache) = self.nat_reflection_cache.try_lock() { + let slot = match family { + IpFamily::V4 => cache.v4, + IpFamily::V6 => cache.v6, + }; + return slot.map(|(_, addr)| addr); + } + return None; } - return None; } if let Ok(mut cache) = self.nat_reflection_cache.try_lock() { @@ -123,48 +125,42 @@ impl MePool { } let attempt = self.nat_probe_attempts.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if attempt >= 2 { - self.nat_probe_disabled.store(true, std::sync::atomic::Ordering::Relaxed); - return None; - } + let servers = if !self.nat_stun_servers.is_empty() { + self.nat_stun_servers.clone() + } else if let Some(s) = &self.nat_stun { + vec![s.clone()] + } else { + vec!["stun.l.google.com:19302".to_string()] + }; - let stun_addr = self - .nat_stun - .clone() - .unwrap_or_else(|| "stun.l.google.com:19302".to_string()); - match stun_probe_dual(&stun_addr).await { - Ok(res) => { - let picked: Option = match family { - IpFamily::V4 => res.v4, - IpFamily::V6 => res.v6, - }; - if let Some(result) = picked { - info!(local = %result.local_addr, reflected = %result.reflected_addr, family = ?family, "NAT probe: reflected address"); - if let Ok(mut cache) = self.nat_reflection_cache.try_lock() { - let slot = match family { - IpFamily::V4 => &mut cache.v4, - IpFamily::V6 => &mut cache.v6, - }; - *slot = Some((Instant::now(), result.reflected_addr)); + for stun_addr in servers { + match stun_probe_dual(&stun_addr).await { + Ok(res) => { + let picked: Option = match family { + IpFamily::V4 => res.v4, + IpFamily::V6 => res.v6, + }; + if let Some(result) = picked { + info!(local = %result.local_addr, reflected = %result.reflected_addr, family = ?family, stun = %stun_addr, "NAT probe: reflected address"); + self.nat_probe_attempts.store(0, std::sync::atomic::Ordering::Relaxed); + if let Ok(mut cache) = self.nat_reflection_cache.try_lock() { + let slot = match family { + IpFamily::V4 => &mut cache.v4, + IpFamily::V6 => &mut cache.v6, + }; + *slot = Some((Instant::now(), result.reflected_addr)); + } + return Some(result.reflected_addr); } - Some(result.reflected_addr) - } else { - None } - } - Err(e) => { - let attempts = attempt + 1; - if attempts <= 2 { - warn!(error = %e, attempt = attempts, "NAT probe failed"); - } else { - debug!(error = %e, attempt = attempts, "NAT probe suppressed after max attempts"); + Err(e) => { + warn!(error = %e, stun = %stun_addr, attempt = attempt + 1, "NAT probe failed, trying next server"); } - if attempts >= 2 { - self.nat_probe_disabled.store(true, std::sync::atomic::Ordering::Relaxed); - } - None } } + let backoff = Duration::from_secs(60 * 2u64.pow((attempt as u32).min(6))); + *self.stun_backoff_until.write().await = Some(Instant::now() + backoff); + None } } diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index fb40fdb..c22ed68 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -6,7 +6,7 @@ use std::time::Instant; use bytes::{Bytes, BytesMut}; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, mpsc}; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; @@ -14,7 +14,7 @@ use crate::crypto::{AesCbc, crc32}; use crate::error::{ProxyError, Result}; use crate::protocol::constants::*; -use super::codec::RpcWriter; +use super::codec::WriterCommand; use super::{ConnRegistry, MeResponse}; pub(crate) async fn reader_loop( @@ -24,7 +24,7 @@ pub(crate) async fn reader_loop( reg: Arc, enc_leftover: BytesMut, mut dec: BytesMut, - writer: Arc>, + tx: mpsc::Sender, ping_tracker: Arc>>, rtt_stats: Arc>>, _writer_id: u64, @@ -33,6 +33,8 @@ pub(crate) async fn reader_loop( ) -> Result<()> { let mut raw = enc_leftover; let mut expected_seq: i32 = 0; + let mut crc_errors = 0u32; + let mut seq_mismatch = 0u32; loop { let mut tmp = [0u8; 16_384]; @@ -80,12 +82,20 @@ pub(crate) async fn reader_loop( let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap()); if crc32(&frame[..pe]) != ec { warn!("CRC mismatch in data frame"); + crc_errors += 1; + if crc_errors > 3 { + return Err(ProxyError::Proxy("Too many CRC mismatches".into())); + } continue; } let seq_no = i32::from_le_bytes(frame[4..8].try_into().unwrap()); if seq_no != expected_seq { warn!(seq_no, expected = expected_seq, "ME RPC seq mismatch"); + seq_mismatch += 1; + if seq_mismatch > 10 { + return Err(ProxyError::Proxy("Too many seq mismatches".into())); + } expected_seq = seq_no.wrapping_add(1); } else { expected_seq = expected_seq.wrapping_add(1); @@ -108,7 +118,7 @@ pub(crate) async fn reader_loop( let routed = reg.route(cid, MeResponse::Data { flags, data }).await; if !routed { reg.unregister(cid).await; - send_close_conn(&writer, cid).await; + send_close_conn(&tx, cid).await; } } else if pt == RPC_SIMPLE_ACK_U32 && body.len() >= 12 { let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); @@ -118,7 +128,7 @@ pub(crate) async fn reader_loop( let routed = reg.route(cid, MeResponse::Ack(cfm)).await; if !routed { reg.unregister(cid).await; - send_close_conn(&writer, cid).await; + send_close_conn(&tx, cid).await; } } else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 { let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); @@ -136,8 +146,8 @@ pub(crate) async fn reader_loop( let mut pong = Vec::with_capacity(12); pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes()); pong.extend_from_slice(&ping_id.to_le_bytes()); - if let Err(e) = writer.lock().await.send_and_flush(&pong).await { - warn!(error = %e, "PONG send failed"); + if tx.send(WriterCommand::DataAndFlush(pong)).await.is_err() { + warn!("PONG send failed"); break; } } else if pt == RPC_PONG_U32 && body.len() >= 8 { @@ -171,12 +181,10 @@ pub(crate) async fn reader_loop( } } -async fn send_close_conn(writer: &Arc>, conn_id: u64) { +async fn send_close_conn(tx: &mpsc::Sender, conn_id: u64) { let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes()); - if let Err(e) = writer.lock().await.send_and_flush(&p).await { - debug!(conn_id, error = %e, "Failed to send RPC_CLOSE_CONN"); - } + let _ = tx.send(WriterCommand::DataAndFlush(p)).await; } diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 04f8baa..ab4f280 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use tokio::sync::{mpsc, Mutex, RwLock}; -use super::codec::RpcWriter; +use super::codec::WriterCommand; use super::MeResponse; #[derive(Clone)] @@ -25,12 +25,12 @@ pub struct BoundConn { #[derive(Clone)] pub struct ConnWriter { pub writer_id: u64, - pub writer: Arc>, + pub tx: mpsc::Sender, } struct RegistryInner { map: HashMap>, - writers: HashMap>>, + writers: HashMap>, writer_for_conn: HashMap, conns_for_writer: HashMap>, meta: HashMap, @@ -96,13 +96,13 @@ impl ConnRegistry { &self, conn_id: u64, writer_id: u64, - writer: Arc>, + tx: mpsc::Sender, meta: ConnMeta, ) { let mut inner = self.inner.write().await; inner.meta.entry(conn_id).or_insert(meta); inner.writer_for_conn.insert(conn_id, writer_id); - inner.writers.entry(writer_id).or_insert_with(|| writer.clone()); + inner.writers.entry(writer_id).or_insert_with(|| tx.clone()); inner .conns_for_writer .entry(writer_id) @@ -114,7 +114,7 @@ impl ConnRegistry { let inner = self.inner.read().await; let writer_id = inner.writer_for_conn.get(&conn_id).cloned()?; let writer = inner.writers.get(&writer_id).cloned()?; - Some(ConnWriter { writer_id, writer }) + Some(ConnWriter { writer_id, tx: writer }) } pub async fn writer_lost(&self, writer_id: u64) -> Vec { diff --git a/src/transport/middle_proxy/rotation.rs b/src/transport/middle_proxy/rotation.rs index 6d94f3e..e141fc4 100644 --- a/src/transport/middle_proxy/rotation.rs +++ b/src/transport/middle_proxy/rotation.rs @@ -31,8 +31,17 @@ pub async fn me_rotation_task(pool: Arc, rng: Arc, interva info!(addr = %w.addr, writer_id = w.id, "Rotating ME connection"); match pool.connect_one(w.addr, rng.as_ref()).await { Ok(()) => { - // Mark old writer for graceful drain; removal happens when sessions finish. - pool.mark_writer_draining(w.id).await; + tokio::time::sleep(Duration::from_secs(2)).await; + let ws = pool.writers.read().await; + let new_alive = ws.iter().any(|nw| + nw.id != w.id && nw.addr == w.addr && !nw.degraded.load(Ordering::Relaxed) && !nw.draining.load(Ordering::Relaxed) + ); + drop(ws); + if new_alive { + pool.mark_writer_draining(w.id).await; + } else { + warn!(addr = %w.addr, writer_id = w.id, "New writer died, keeping old"); + } } Err(e) => { warn!(addr = %w.addr, writer_id = w.id, error = %e, "ME rotation connect failed"); diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 2b0c42e..627906d 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -10,6 +10,7 @@ use crate::network::IpFamily; use crate::protocol::constants::RPC_CLOSE_EXT_U32; use super::MePool; +use super::codec::WriterCommand; use super::wire::build_proxy_req_payload; use rand::seq::SliceRandom; use super::registry::ConnMeta; @@ -43,18 +44,15 @@ impl MePool { loop { if let Some(current) = self.registry.get_writer(conn_id).await { let send_res = { - if let Ok(mut guard) = current.writer.try_lock() { - let r = guard.send(&payload).await; - drop(guard); - r - } else { - current.writer.lock().await.send(&payload).await - } + current + .tx + .send(WriterCommand::Data(payload.clone())) + .await }; match send_res { Ok(()) => return Ok(()), - Err(e) => { - warn!(error = %e, writer_id = current.writer_id, "ME write failed"); + Err(_) => { + warn!(writer_id = current.writer_id, "ME writer channel closed"); self.remove_writer_and_close_clients(current.writer_id).await; continue; } @@ -64,7 +62,26 @@ impl MePool { let mut writers_snapshot = { let ws = self.writers.read().await; if ws.is_empty() { - return Err(ProxyError::Proxy("All ME connections dead".into())); + drop(ws); + for family in self.family_order() { + let map = match family { + IpFamily::V4 => self.proxy_map_v4.read().await.clone(), + IpFamily::V6 => self.proxy_map_v6.read().await.clone(), + }; + for (_dc, addrs) in map.iter() { + for (ip, port) in addrs { + let addr = SocketAddr::new(*ip, *port); + if self.connect_one(addr, self.rng.as_ref()).await.is_ok() { + self.writer_available.notify_waiters(); + break; + } + } + } + } + if tokio::time::timeout(Duration::from_secs(3), self.writer_available.notified()).await.is_err() { + return Err(ProxyError::Proxy("All ME connections dead (waited 3s)".into())); + } + continue; } ws.clone() }; @@ -96,9 +113,10 @@ impl MePool { writers_snapshot = ws2.clone(); drop(ws2); candidate_indices = self.candidate_indices_for_dc(&writers_snapshot, target_dc).await; - break; + if !candidate_indices.is_empty() { + break; + } } - drop(map_guard); } if candidate_indices.is_empty() { return Err(ProxyError::Proxy("No ME writers available for target DC".into())); @@ -120,22 +138,15 @@ impl MePool { if w.draining.load(Ordering::Relaxed) { continue; } - if let Ok(mut guard) = w.writer.try_lock() { - let send_res = guard.send(&payload).await; - drop(guard); - match send_res { - Ok(()) => { - self.registry - .bind_writer(conn_id, w.id, w.writer.clone(), meta.clone()) - .await; - return Ok(()); - } - Err(e) => { - warn!(error = %e, writer_id = w.id, "ME write failed"); - self.remove_writer_and_close_clients(w.id).await; - continue; - } - } + if w.tx.send(WriterCommand::Data(payload.clone())).await.is_ok() { + self.registry + .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) + .await; + return Ok(()); + } else { + warn!(writer_id = w.id, "ME writer channel closed"); + self.remove_writer_and_close_clients(w.id).await; + continue; } } @@ -143,15 +154,15 @@ impl MePool { if w.draining.load(Ordering::Relaxed) { continue; } - match w.writer.lock().await.send(&payload).await { + match w.tx.send(WriterCommand::Data(payload.clone())).await { Ok(()) => { self.registry - .bind_writer(conn_id, w.id, w.writer.clone(), meta.clone()) + .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) .await; return Ok(()); } - Err(e) => { - warn!(error = %e, writer_id = w.id, "ME write failed (blocking)"); + Err(_) => { + warn!(writer_id = w.id, "ME writer channel closed (blocking)"); self.remove_writer_and_close_clients(w.id).await; } } @@ -163,8 +174,8 @@ impl MePool { let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes()); - if let Err(e) = w.writer.lock().await.send_and_flush(&p).await { - debug!(error = %e, "ME close write failed"); + if w.tx.send(WriterCommand::DataAndFlush(p)).await.is_err() { + debug!("ME close write failed"); self.remove_writer_and_close_clients(w.writer_id).await; } } else { @@ -176,7 +187,7 @@ impl MePool { } pub fn connection_count(&self) -> usize { - self.writers.try_read().map(|w| w.len()).unwrap_or(0) + self.conn_count.load(Ordering::Relaxed) } pub(super) async fn candidate_indices_for_dc( From 4be4670668d984bbc16eced25b9bb4f647c0df43 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Feb 2026 14:25:39 +0300 Subject: [PATCH 17/22] ME Pool V2 - Agressive Healthcheck and Pool Rebuild Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/health.rs | 37 +++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 7141f94..574ea73 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -11,12 +11,16 @@ use crate::network::IpFamily; use super::MePool; +const HEALTH_INTERVAL_SECS: u64 = 1; +const QUICK_RETRY_ATTEMPTS: u8 = 10; +const QUICK_RETRY_DELAY_MS: u64 = 2500; + pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut inflight_single: HashSet<(i32, IpFamily)> = HashSet::new(); loop { - tokio::time::sleep(Duration::from_secs(30)).await; + tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await; check_family( IpFamily::V4, &pool, @@ -83,8 +87,33 @@ async fn check_family( inflight_single.remove(&(dc, family)); continue; } + + // Aggressive quick-retry burst: up to 10 attempts every 2.5s before falling back to exponential backoff. let key = (dc, family); - let delay = *backoff.get(&key).unwrap_or(&30); + for attempt in 0..QUICK_RETRY_ATTEMPTS { + let mut shuffled = dc_addrs.clone(); + shuffled.shuffle(&mut rand::rng()); + let mut success = false; + for addr in &shuffled { + match pool.connect_one(*addr, rng.as_ref()).await { + Ok(()) => { + info!(%addr, dc = %dc, ?family, attempt, "ME reconnected (quick burst)"); + backoff.insert(key, HEALTH_INTERVAL_SECS); + last_attempt.insert(key, Instant::now()); + inflight_single.remove(&key); + success = true; + break; + } + Err(e) => debug!(%addr, dc = %dc, error = %e, attempt, ?family, "ME reconnect failed (quick)"), + } + } + if success { + continue; + } + tokio::time::sleep(Duration::from_millis(QUICK_RETRY_DELAY_MS)).await; + } + + let delay = *backoff.get(&key).unwrap_or(&HEALTH_INTERVAL_SECS); let now = Instant::now(); if let Some(last) = last_attempt.get(&key) { if now.duration_since(*last).as_secs() < delay { @@ -149,7 +178,7 @@ async fn check_family( continue; } - warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting..."); + warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting (backoff)..."); let mut shuffled = dc_addrs.clone(); shuffled.shuffle(&mut rand::rng()); let mut reconnected = false; @@ -166,7 +195,7 @@ async fn check_family( } } if !reconnected { - let next = (*backoff.get(&key).unwrap_or(&30)).saturating_mul(2).min(300); + let next = (*backoff.get(&key).unwrap_or(&HEALTH_INTERVAL_SECS)).saturating_mul(2).min(60); backoff.insert(key, next); last_attempt.insert(key, now); } From e340b716b2f4751b223424e64f761ab52392ff16 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Feb 2026 15:39:30 +0300 Subject: [PATCH 18/22] Drafting ME Healthcheck Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/defaults.rs | 24 +++++ src/config/types.rs | 55 +++++++++++ src/main.rs | 11 +++ src/transport/middle_proxy/codec.rs | 9 ++ src/transport/middle_proxy/health.rs | 136 ++++++--------------------- src/transport/middle_proxy/pool.rs | 94 +++++++++++++++++- 6 files changed, 217 insertions(+), 112 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index c11e738..19269a2 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -78,6 +78,30 @@ pub(crate) fn default_pool_size() -> usize { 2 } +pub(crate) fn default_keepalive_interval() -> u64 { + 25 +} + +pub(crate) fn default_keepalive_jitter() -> u64 { + 5 +} + +pub(crate) fn default_warmup_step_delay_ms() -> u64 { + 500 +} + +pub(crate) fn default_warmup_step_jitter_ms() -> u64 { + 300 +} + +pub(crate) fn default_reconnect_backoff_base_ms() -> u64 { + 500 +} + +pub(crate) fn default_reconnect_backoff_cap_ms() -> u64 { + 30_000 +} + // Custom deserializer helpers #[derive(Deserialize)] diff --git a/src/config/types.rs b/src/config/types.rs index 1cdb1bf..9f6467a 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -155,6 +155,50 @@ pub struct GeneralConfig { #[serde(default)] pub middle_proxy_warm_standby: usize, + /// Enable ME keepalive padding frames. + #[serde(default = "default_true")] + pub me_keepalive_enabled: bool, + + /// Keepalive interval in seconds. + #[serde(default = "default_keepalive_interval")] + pub me_keepalive_interval_secs: u64, + + /// Keepalive jitter in seconds. + #[serde(default = "default_keepalive_jitter")] + pub me_keepalive_jitter_secs: u64, + + /// Keepalive payload randomized (4 bytes); otherwise zeros. + #[serde(default = "default_true")] + pub me_keepalive_payload_random: bool, + + /// Enable staggered warmup of extra ME writers. + #[serde(default = "default_true")] + pub me_warmup_stagger_enabled: bool, + + /// Base delay between warmup connections in ms. + #[serde(default = "default_warmup_step_delay_ms")] + pub me_warmup_step_delay_ms: u64, + + /// Jitter for warmup delay in ms. + #[serde(default = "default_warmup_step_jitter_ms")] + pub me_warmup_step_jitter_ms: u64, + + /// Max concurrent reconnect attempts per DC. + #[serde(default)] + pub me_reconnect_max_concurrent_per_dc: u32, + + /// Base backoff in ms for reconnect. + #[serde(default = "default_reconnect_backoff_base_ms")] + pub me_reconnect_backoff_base_ms: u64, + + /// Cap backoff in ms for reconnect. + #[serde(default = "default_reconnect_backoff_cap_ms")] + pub me_reconnect_backoff_cap_ms: u64, + + /// Fast retry attempts before backoff. + #[serde(default)] + pub me_reconnect_fast_retry_count: u32, + /// Ignore STUN/interface IP mismatch (keep using Middle Proxy even if NAT detected). #[serde(default)] pub stun_iface_mismatch_ignore: bool, @@ -190,6 +234,17 @@ impl Default for GeneralConfig { middle_proxy_nat_stun_servers: Vec::new(), middle_proxy_pool_size: default_pool_size(), middle_proxy_warm_standby: 0, + me_keepalive_enabled: true, + me_keepalive_interval_secs: default_keepalive_interval(), + me_keepalive_jitter_secs: default_keepalive_jitter(), + me_keepalive_payload_random: true, + me_warmup_stagger_enabled: true, + me_warmup_step_delay_ms: default_warmup_step_delay_ms(), + me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(), + me_reconnect_max_concurrent_per_dc: 1, + me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(), + me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(), + me_reconnect_fast_retry_count: 1, stun_iface_mismatch_ignore: false, unknown_dc_log_path: default_unknown_dc_log_path(), log_level: LogLevel::Normal, diff --git a/src/main.rs b/src/main.rs index 33aefcb..e8883f2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -333,6 +333,17 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai cfg_v4.default_dc.or(cfg_v6.default_dc), decision.clone(), rng.clone(), + config.general.me_keepalive_enabled, + config.general.me_keepalive_interval_secs, + config.general.me_keepalive_jitter_secs, + config.general.me_keepalive_payload_random, + config.general.me_warmup_stagger_enabled, + config.general.me_warmup_step_delay_ms, + config.general.me_warmup_step_jitter_ms, + config.general.me_reconnect_max_concurrent_per_dc, + config.general.me_reconnect_backoff_base_ms, + config.general.me_reconnect_backoff_cap_ms, + config.general.me_reconnect_fast_retry_count, ); let pool_size = config.general.middle_proxy_pool_size.max(1); diff --git a/src/transport/middle_proxy/codec.rs b/src/transport/middle_proxy/codec.rs index 12efc45..82f0960 100644 --- a/src/transport/middle_proxy/codec.rs +++ b/src/transport/middle_proxy/codec.rs @@ -8,6 +8,7 @@ use crate::protocol::constants::*; pub(crate) enum WriterCommand { Data(Vec), DataAndFlush(Vec), + Keepalive, Close, } @@ -188,4 +189,12 @@ impl RpcWriter { self.send(payload).await?; self.writer.flush().await.map_err(ProxyError::Io) } + + pub(crate) async fn send_keepalive(&mut self, payload: [u8; 4]) -> Result<()> { + // Keepalive is a frame with fl == 4 and 4 bytes payload. + let mut frame = Vec::with_capacity(8); + frame.extend_from_slice(&4u32.to_le_bytes()); + frame.extend_from_slice(&payload); + self.send(&frame).await + } } diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 574ea73..e2cdad6 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -5,6 +5,7 @@ use std::time::{Duration, Instant}; use tracing::{debug, info, warn}; use rand::seq::SliceRandom; +use rand::Rng; use crate::crypto::SecureRandom; use crate::network::IpFamily; @@ -12,13 +13,11 @@ use crate::network::IpFamily; use super::MePool; const HEALTH_INTERVAL_SECS: u64 = 1; -const QUICK_RETRY_ATTEMPTS: u8 = 10; -const QUICK_RETRY_DELAY_MS: u64 = 2500; +const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); - let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); - let mut inflight_single: HashSet<(i32, IpFamily)> = HashSet::new(); + let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); loop { tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await; check_family( @@ -26,8 +25,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &pool, &rng, &mut backoff, - &mut last_attempt, - &mut inflight_single, + &mut next_attempt, ) .await; check_family( @@ -35,8 +33,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &pool, &rng, &mut backoff, - &mut last_attempt, - &mut inflight_single, + &mut next_attempt, ) .await; } @@ -47,8 +44,7 @@ async fn check_family( pool: &Arc, rng: &Arc, backoff: &mut HashMap<(i32, IpFamily), u64>, - last_attempt: &mut HashMap<(i32, IpFamily), Instant>, - inflight_single: &mut HashSet<(i32, IpFamily)>, + next_attempt: &mut HashMap<(i32, IpFamily), Instant>, ) { let enabled = match family { IpFamily::V4 => pool.decision.ipv4_me, @@ -84,120 +80,46 @@ async fn check_family( for (dc, dc_addrs) in entries { let has_coverage = dc_addrs.iter().any(|a| writer_addrs.contains(a)); if has_coverage { - inflight_single.remove(&(dc, family)); continue; } - // Aggressive quick-retry burst: up to 10 attempts every 2.5s before falling back to exponential backoff. let key = (dc, family); - for attempt in 0..QUICK_RETRY_ATTEMPTS { - let mut shuffled = dc_addrs.clone(); - shuffled.shuffle(&mut rand::rng()); - let mut success = false; - for addr in &shuffled { - match pool.connect_one(*addr, rng.as_ref()).await { - Ok(()) => { - info!(%addr, dc = %dc, ?family, attempt, "ME reconnected (quick burst)"); - backoff.insert(key, HEALTH_INTERVAL_SECS); - last_attempt.insert(key, Instant::now()); - inflight_single.remove(&key); - success = true; - break; - } - Err(e) => debug!(%addr, dc = %dc, error = %e, attempt, ?family, "ME reconnect failed (quick)"), - } - } - if success { - continue; - } - tokio::time::sleep(Duration::from_millis(QUICK_RETRY_DELAY_MS)).await; - } - - let delay = *backoff.get(&key).unwrap_or(&HEALTH_INTERVAL_SECS); let now = Instant::now(); - if let Some(last) = last_attempt.get(&key) { - if now.duration_since(*last).as_secs() < delay { + if let Some(ts) = next_attempt.get(&key) { + if now < *ts { continue; } } - if dc_addrs.len() == 1 { - // Single ME address: fast retries then slower background retries. - if inflight_single.contains(&key) { - continue; - } - inflight_single.insert(key); - let addr = dc_addrs[0]; - let dc_id = dc; - let pool_clone = pool.clone(); - let rng_clone = rng.clone(); - let timeout = pool.me_one_timeout; - let quick_attempts = pool.me_one_retry.max(1); - tokio::spawn(async move { - let mut success = false; - for _ in 0..quick_attempts { - let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await; - match res { - Ok(Ok(())) => { - info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage"); - success = true; - break; - } - Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"), - Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"), - } - tokio::time::sleep(Duration::from_millis(1000)).await; - } - if success { - return; - } - let timeout_ms = timeout.as_millis(); - warn!( - dc = %dc_id, - ?family, - attempts = quick_attempts, - timeout_ms, - "DC={} has no ME coverage: {} tries * {} ms... retry in 5 seconds...", - dc_id, - quick_attempts, - timeout_ms - ); - loop { - tokio::time::sleep(Duration::from_secs(5)).await; - let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await; - match res { - Ok(Ok(())) => { - info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage"); - break; - } - Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"), - Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"), - } - } - // will drop inflight flag in outer loop when coverage detected - }); - continue; - } - warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting (backoff)..."); let mut shuffled = dc_addrs.clone(); shuffled.shuffle(&mut rand::rng()); - let mut reconnected = false; + let mut success = false; for addr in shuffled { - match pool.connect_one(addr, rng.as_ref()).await { - Ok(()) => { + let res = tokio::time::timeout(pool.me_one_timeout, pool.connect_one(addr, rng.as_ref())).await; + match res { + Ok(Ok(())) => { info!(%addr, dc = %dc, ?family, "ME reconnected for DC coverage"); - backoff.insert(key, 30); - last_attempt.insert(key, now); - reconnected = true; + backoff.insert(key, pool.me_reconnect_backoff_base.as_millis() as u64); + let jitter = pool.me_reconnect_backoff_base.as_millis() as u64 / JITTER_FRAC_NUM; + let wait = pool.me_reconnect_backoff_base + + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); + next_attempt.insert(key, now + wait); + success = true; break; } - Err(e) => debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed"), + Ok(Err(e)) => debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed"), + Err(_) => debug!(%addr, dc = %dc, ?family, "ME reconnect timed out"), } } - if !reconnected { - let next = (*backoff.get(&key).unwrap_or(&HEALTH_INTERVAL_SECS)).saturating_mul(2).min(60); - backoff.insert(key, next); - last_attempt.insert(key, now); + if !success { + let curr = *backoff.get(&key).unwrap_or(&(pool.me_reconnect_backoff_base.as_millis() as u64)); + let next_ms = (curr.saturating_mul(2)).min(pool.me_reconnect_backoff_cap.as_millis() as u64); + backoff.insert(key, next_ms); + let jitter = next_ms / JITTER_FRAC_NUM; + let wait = Duration::from_millis(next_ms) + + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); + next_attempt.insert(key, now + wait); + warn!(dc = %dc, backoff_ms = next_ms, ?family, "DC has no ME coverage, scheduled reconnect"); } } } diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index e860dc8..5106004 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -21,9 +21,9 @@ use super::registry::{BoundConn, ConnMeta}; use super::codec::{RpcWriter, WriterCommand}; use super::reader::reader_loop; use super::MeResponse; - const ME_ACTIVE_PING_SECS: u64 = 25; const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; +const ME_KEEPALIVE_PAYLOAD_LEN: usize = 4; #[derive(Clone)] pub struct MeWriter { @@ -54,6 +54,17 @@ pub struct MePool { pub(super) stun_backoff_until: Arc>>, pub(super) me_one_retry: u8, pub(super) me_one_timeout: Duration, + pub(super) me_keepalive_enabled: bool, + pub(super) me_keepalive_interval: Duration, + pub(super) me_keepalive_jitter: Duration, + pub(super) me_keepalive_payload_random: bool, + pub(super) me_warmup_stagger_enabled: bool, + pub(super) me_warmup_step_delay: Duration, + pub(super) me_warmup_step_jitter: Duration, + pub(super) me_reconnect_max_concurrent_per_dc: u32, + pub(super) me_reconnect_backoff_base: Duration, + pub(super) me_reconnect_backoff_cap: Duration, + pub(super) me_reconnect_fast_retry_count: u32, pub(super) proxy_map_v4: Arc>>>, pub(super) proxy_map_v6: Arc>>>, pub(super) default_dc: AtomicI32, @@ -88,6 +99,17 @@ impl MePool { default_dc: Option, decision: NetworkDecision, rng: Arc, + me_keepalive_enabled: bool, + me_keepalive_interval_secs: u64, + me_keepalive_jitter_secs: u64, + me_keepalive_payload_random: bool, + me_warmup_stagger_enabled: bool, + me_warmup_step_delay_ms: u64, + me_warmup_step_jitter_ms: u64, + me_reconnect_max_concurrent_per_dc: u32, + me_reconnect_backoff_base_ms: u64, + me_reconnect_backoff_cap_ms: u64, + me_reconnect_fast_retry_count: u32, ) -> Arc { Arc::new(Self { registry: Arc::new(ConnRegistry::new()), @@ -108,6 +130,17 @@ impl MePool { stun_backoff_until: Arc::new(RwLock::new(None)), me_one_retry, me_one_timeout: Duration::from_millis(me_one_timeout_ms), + me_keepalive_enabled, + me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs), + me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs), + me_keepalive_payload_random, + me_warmup_stagger_enabled, + me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms), + me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms), + me_reconnect_max_concurrent_per_dc, + me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms), + me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms), + me_reconnect_fast_retry_count, pool_size: 2, proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), @@ -323,7 +356,24 @@ impl MePool { return Err(ProxyError::Proxy("Too many ME DC init failures, falling back to direct".into())); } - // Additional connections up to pool_size total (round-robin across DCs) + // Additional connections up to pool_size total (round-robin across DCs), staggered to de-phase lifecycles. + if self.me_warmup_stagger_enabled { + let mut delay_ms = 0u64; + for (dc, addrs) in dc_addrs.iter() { + for (ip, port) in addrs { + if self.connection_count() >= pool_size { + break; + } + let addr = SocketAddr::new(*ip, *port); + let jitter = rand::rng().random_range(0..=self.me_warmup_step_jitter.as_millis() as u64); + delay_ms = delay_ms.saturating_add(self.me_warmup_step_delay.as_millis() as u64 + jitter); + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + if let Err(e) = self.connect_one(addr, rng.as_ref()).await { + debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed (staggered)"); + } + } + } + } else { for (dc, addrs) in dc_addrs.iter() { for (ip, port) in addrs { if self.connection_count() >= pool_size { @@ -338,6 +388,7 @@ impl MePool { break; } } + } if !self.decision.effective_multipath && self.connection_count() > 0 { break; @@ -364,6 +415,8 @@ impl MePool { let degraded = Arc::new(AtomicBool::new(false)); let draining = Arc::new(AtomicBool::new(false)); let (tx, mut rx) = mpsc::channel::(4096); + let tx_for_keepalive = tx.clone(); + let keepalive_random = self.me_keepalive_payload_random; let mut rpc_writer = RpcWriter { writer: hs.wr, key: hs.write_key, @@ -382,6 +435,13 @@ impl MePool { Some(WriterCommand::DataAndFlush(payload)) => { if rpc_writer.send_and_flush(&payload).await.is_err() { break; } } + Some(WriterCommand::Keepalive) => { + let mut payload = [0u8; ME_KEEPALIVE_PAYLOAD_LEN]; + if keepalive_random { + rand::rng().fill(&mut payload); + } + if rpc_writer.send_keepalive(payload).await.is_err() { break; } + } Some(WriterCommand::Close) | None => break, } } @@ -412,9 +472,14 @@ impl MePool { let cleanup_done = Arc::new(AtomicBool::new(false)); let cleanup_for_reader = cleanup_done.clone(); let cleanup_for_ping = cleanup_done.clone(); + let keepalive_enabled = self.me_keepalive_enabled; + let keepalive_interval = self.me_keepalive_interval; + let keepalive_jitter = self.me_keepalive_jitter; + let cancel_reader_token = cancel.clone(); + let cancel_ping_token = cancel_ping.clone(); + let cancel_keepalive_token = cancel.clone(); tokio::spawn(async move { - let cancel_reader = cancel.clone(); let res = reader_loop( hs.rd, hs.read_key, @@ -427,7 +492,7 @@ impl MePool { rtt_stats.clone(), writer_id, degraded.clone(), - cancel_reader.clone(), + cancel_reader_token.clone(), ) .await; if let Some(pool) = pool.upgrade() { @@ -454,7 +519,7 @@ impl MePool { .random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS); let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64; tokio::select! { - _ = cancel_ping.cancelled() => { + _ = cancel_ping_token.cancelled() => { break; } _ = tokio::time::sleep(Duration::from_secs(wait)) => {} @@ -485,6 +550,25 @@ impl MePool { } }); + if keepalive_enabled { + let tx_keepalive = tx_for_keepalive; + let cancel_keepalive = cancel_keepalive_token; + tokio::spawn(async move { + // Per-writer jittered start to avoid phase sync. + let initial_jitter_ms = rand::rng().random_range(0..=keepalive_jitter.as_millis().max(1) as u64); + tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await; + loop { + tokio::select! { + _ = cancel_keepalive.cancelled() => break, + _ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=keepalive_jitter.as_millis() as u64))) => {} + } + if tx_keepalive.send(WriterCommand::Keepalive).await.is_err() { + break; + } + } + }); + } + Ok(()) } From 820ed8d346d7cf11811f4db4e21e94d357752a31 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Feb 2026 15:49:35 +0300 Subject: [PATCH 19/22] ME Keepalives Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/main.rs | 1 + src/metrics.rs | 16 ++++++++++++++++ src/stats/mod.rs | 12 ++++++++++++ src/transport/middle_proxy/health.rs | 7 ++++++- src/transport/middle_proxy/pool.rs | 13 ++++++++++++- 5 files changed, 47 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index e8883f2..d542b63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -333,6 +333,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai cfg_v4.default_dc.or(cfg_v6.default_dc), decision.clone(), rng.clone(), + stats.clone(), config.general.me_keepalive_enabled, config.general.me_keepalive_interval_secs, config.general.me_keepalive_jitter_secs, diff --git a/src/metrics.rs b/src/metrics.rs index 24acf30..fa6c680 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -91,6 +91,22 @@ fn render_metrics(stats: &Stats) -> String { let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter"); let _ = writeln!(out, "telemt_handshake_timeouts_total {}", stats.get_handshake_timeouts()); + let _ = writeln!(out, "# HELP telemt_me_keepalive_sent_total ME keepalive frames sent"); + let _ = writeln!(out, "# TYPE telemt_me_keepalive_sent_total counter"); + let _ = writeln!(out, "telemt_me_keepalive_sent_total {}", stats.get_me_keepalive_sent()); + + let _ = writeln!(out, "# HELP telemt_me_keepalive_failed_total ME keepalive send failures"); + let _ = writeln!(out, "# TYPE telemt_me_keepalive_failed_total counter"); + let _ = writeln!(out, "telemt_me_keepalive_failed_total {}", stats.get_me_keepalive_failed()); + + let _ = writeln!(out, "# HELP telemt_me_reconnect_attempts_total ME reconnect attempts"); + let _ = writeln!(out, "# TYPE telemt_me_reconnect_attempts_total counter"); + let _ = writeln!(out, "telemt_me_reconnect_attempts_total {}", stats.get_me_reconnect_attempts()); + + let _ = writeln!(out, "# HELP telemt_me_reconnect_success_total ME reconnect successes"); + let _ = writeln!(out, "# TYPE telemt_me_reconnect_success_total counter"); + let _ = writeln!(out, "telemt_me_reconnect_success_total {}", stats.get_me_reconnect_success()); + let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections"); let _ = writeln!(out, "# TYPE telemt_user_connections_total counter"); let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections"); diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 2cdcdf9..38318cc 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -19,6 +19,10 @@ pub struct Stats { connects_all: AtomicU64, connects_bad: AtomicU64, handshake_timeouts: AtomicU64, + me_keepalive_sent: AtomicU64, + me_keepalive_failed: AtomicU64, + me_reconnect_attempts: AtomicU64, + me_reconnect_success: AtomicU64, user_stats: DashMap, start_time: parking_lot::RwLock>, } @@ -43,8 +47,16 @@ impl Stats { pub fn increment_connects_all(&self) { self.connects_all.fetch_add(1, Ordering::Relaxed); } pub fn increment_connects_bad(&self) { self.connects_bad.fetch_add(1, Ordering::Relaxed); } pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); } + pub fn increment_me_keepalive_sent(&self) { self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); } + pub fn increment_me_keepalive_failed(&self) { self.me_keepalive_failed.fetch_add(1, Ordering::Relaxed); } + pub fn increment_me_reconnect_attempt(&self) { self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); } + pub fn increment_me_reconnect_success(&self) { self.me_reconnect_success.fetch_add(1, Ordering::Relaxed); } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } + pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } + pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) } + pub fn get_me_reconnect_attempts(&self) -> u64 { self.me_reconnect_attempts.load(Ordering::Relaxed) } + pub fn get_me_reconnect_success(&self) -> u64 { self.me_reconnect_success.load(Ordering::Relaxed) } pub fn increment_user_connects(&self, user: &str) { self.user_stats.entry(user.to_string()).or_default() diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index e2cdad6..79c92d6 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -99,6 +99,7 @@ async fn check_family( match res { Ok(Ok(())) => { info!(%addr, dc = %dc, ?family, "ME reconnected for DC coverage"); + pool.stats.increment_me_reconnect_success(); backoff.insert(key, pool.me_reconnect_backoff_base.as_millis() as u64); let jitter = pool.me_reconnect_backoff_base.as_millis() as u64 / JITTER_FRAC_NUM; let wait = pool.me_reconnect_backoff_base @@ -107,11 +108,15 @@ async fn check_family( success = true; break; } - Ok(Err(e)) => debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed"), + Ok(Err(e)) => { + pool.stats.increment_me_reconnect_attempt(); + debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed") + } Err(_) => debug!(%addr, dc = %dc, ?family, "ME reconnect timed out"), } } if !success { + pool.stats.increment_me_reconnect_attempt(); let curr = *backoff.get(&key).unwrap_or(&(pool.me_reconnect_backoff_base.as_millis() as u64)); let next_ms = (curr.saturating_mul(2)).min(pool.me_reconnect_backoff_cap.as_millis() as u64); backoff.insert(key, next_ms); diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 5106004..250b922 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -74,6 +74,7 @@ pub struct MePool { pub(super) nat_reflection_cache: Arc>, pub(super) writer_available: Arc, pub(super) conn_count: AtomicUsize, + pub(super) stats: Arc, pool_size: usize, } @@ -99,6 +100,7 @@ impl MePool { default_dc: Option, decision: NetworkDecision, rng: Arc, + stats: Arc, me_keepalive_enabled: bool, me_keepalive_interval_secs: u64, me_keepalive_jitter_secs: u64, @@ -130,6 +132,7 @@ impl MePool { stun_backoff_until: Arc::new(RwLock::new(None)), me_one_retry, me_one_timeout: Duration::from_millis(me_one_timeout_ms), + stats, me_keepalive_enabled, me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs), me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs), @@ -440,7 +443,15 @@ impl MePool { if keepalive_random { rand::rng().fill(&mut payload); } - if rpc_writer.send_keepalive(payload).await.is_err() { break; } + match rpc_writer.send_keepalive(payload).await { + Ok(()) => { + stats.increment_me_keepalive_sent(); + } + Err(_) => { + stats.increment_me_keepalive_failed(); + break; + } + } } Some(WriterCommand::Close) | None => break, } From 2926b9f5c8cf18d50ec6703d69192be1a1df67ab Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Feb 2026 16:02:50 +0300 Subject: [PATCH 20/22] ME Concurrency Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/health.rs | 14 ++++++++++++++ src/transport/middle_proxy/pool.rs | 1 + 2 files changed, 15 insertions(+) diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 79c92d6..d4d4a70 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -14,10 +14,12 @@ use super::MePool; const HEALTH_INTERVAL_SECS: u64 = 1; const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff +const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1; pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut inflight: HashMap<(i32, IpFamily), usize> = HashMap::new(); loop { tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await; check_family( @@ -26,6 +28,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &rng, &mut backoff, &mut next_attempt, + &mut inflight, ) .await; check_family( @@ -34,6 +37,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &rng, &mut backoff, &mut next_attempt, + &mut inflight, ) .await; } @@ -45,6 +49,7 @@ async fn check_family( rng: &Arc, backoff: &mut HashMap<(i32, IpFamily), u64>, next_attempt: &mut HashMap<(i32, IpFamily), Instant>, + inflight: &mut HashMap<(i32, IpFamily), usize>, ) { let enabled = match family { IpFamily::V4 => pool.decision.ipv4_me, @@ -91,6 +96,12 @@ async fn check_family( } } + let max_concurrent = pool.me_reconnect_max_concurrent_per_dc.max(1) as usize; + if *inflight.get(&key).unwrap_or(&0) >= max_concurrent { + return; + } + *inflight.entry(key).or_insert(0) += 1; + let mut shuffled = dc_addrs.clone(); shuffled.shuffle(&mut rand::rng()); let mut success = false; @@ -126,5 +137,8 @@ async fn check_family( next_attempt.insert(key, now + wait); warn!(dc = %dc, backoff_ms = next_ms, ?family, "DC has no ME coverage, scheduled reconnect"); } + if let Some(v) = inflight.get_mut(&key) { + *v = v.saturating_sub(1); + } } } diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 250b922..84c526f 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -420,6 +420,7 @@ impl MePool { let (tx, mut rx) = mpsc::channel::(4096); let tx_for_keepalive = tx.clone(); let keepalive_random = self.me_keepalive_payload_random; + let stats = self.stats.clone(); let mut rpc_writer = RpcWriter { writer: hs.wr, key: hs.write_key, From bb87a376867ac959f03592af21d93d906bb3f7f3 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Feb 2026 16:19:58 +0300 Subject: [PATCH 21/22] Update config.toml --- config.toml | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/config.toml b/config.toml index 28dd3b6..45d6b75 100644 --- a/config.toml +++ b/config.toml @@ -5,6 +5,38 @@ prefer_ipv6 = false fast_mode = true use_middle_proxy = true #ad_tag = "00000000000000000000000000000000" +# Path to proxy-secret binary (auto-downloaded if missing). +proxy_secret_path = "proxy-secret" + +# === Middle Proxy (ME) === +# Public IP override for ME KDF when behind NAT; leave unset to auto-detect. +#middle_proxy_nat_ip = "203.0.113.10" +# Enable STUN probing to discover public IP:port for ME. +middle_proxy_nat_probe = true +# Primary STUN server (host:port); defaults to Telegram STUN when empty. +middle_proxy_nat_stun = "stun.l.google.com:19302" +# Optional fallback STUN servers list. +middle_proxy_nat_stun_servers = ["stun1.l.google.com:19302", "stun2.l.google.com:19302"] +# Desired number of concurrent ME writers in pool. +middle_proxy_pool_size = 16 +# Pre-initialized warm-standby ME connections kept idle. +middle_proxy_warm_standby = 8 +# Ignore STUN/interface mismatch and keep ME enabled even if IP differs. +stun_iface_mismatch_ignore = false +# Keepalive padding frames - fl==4 +me_keepalive_enabled = true +me_keepalive_interval_secs = 25 # Period between keepalives +me_keepalive_jitter_secs = 5 # Jitter added to interval +me_keepalive_payload_random = true # Randomize 4-byte payload (vs zeros) +# Stagger extra ME connections on warmup to de-phase lifecycles. +me_warmup_stagger_enabled = true +me_warmup_step_delay_ms = 500 # Base delay between extra connects +me_warmup_step_jitter_ms = 300 # Jitter for warmup delay +# Reconnect policy knobs. +me_reconnect_max_concurrent_per_dc = 1 # Parallel reconnects per DC - EXPERIMENTAL! UNSTABLE! +me_reconnect_backoff_base_ms = 500 # Backoff start +me_reconnect_backoff_cap_ms = 30000 # Backoff cap +me_reconnect_fast_retry_count = 11 # Quick retries before backoff [network] # Enable/disable families; ipv6 = true/false/auto(None) @@ -50,10 +82,13 @@ show = ["hello"] # Users to show in the startup log (tg:// links) # === Timeouts (in seconds) === [timeouts] -client_handshake = 15 +client_handshake = 30 tg_connect = 10 client_keepalive = 60 client_ack = 300 +# Quick ME reconnects for single-address DCs (count and per-attempt timeout, ms). +me_one_retry = 12 +me_one_timeout_ms = 1200 # === Anti-Censorship & Masking === [censorship] From fc28c1ad8851d23f520b1f234e2d070f0f69c386 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Feb 2026 16:30:04 +0300 Subject: [PATCH 22/22] Update Cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 72ac944..77e72ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.0.4" +version = "3.0.5" edition = "2024" [dependencies]