diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 05e334a..bbf1b00 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -25,10 +25,16 @@ jobs: include: - target: x86_64-unknown-linux-gnu artifact_name: telemt - asset_name: telemt-x86_64-linux + asset_name: telemt-x86_64-linux-gnu - target: aarch64-unknown-linux-gnu artifact_name: telemt - asset_name: telemt-aarch64-linux + asset_name: telemt-aarch64-linux-gnu + - target: x86_64-unknown-linux-musl + artifact_name: telemt + asset_name: telemt-x86_64-linux-musl + - target: aarch64-unknown-linux-musl + artifact_name: telemt + asset_name: telemt-aarch64-linux-musl steps: - name: Checkout repository @@ -56,12 +62,13 @@ jobs: restore-keys: | ${{ runner.os }}-${{ matrix.target }}-cargo- + - name: Install cross + run: cargo install cross --git https://github.com/cross-rs/cross + - name: Build Release - uses: actions-rs/cargo@ae10961054e4aa8bff448f48a500763b90d5c550 # v1.0.1 - with: - use-cross: true - command: build - args: --release --target ${{ matrix.target }} + env: + RUSTFLAGS: ${{ contains(matrix.target, 'musl') && '-C target-feature=+crt-static' || '' }} + run: cross build --release --target ${{ matrix.target }} - name: Package binary run: | @@ -85,13 +92,42 @@ jobs: contents: write steps: + - name: Checkout repository + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + fetch-depth: 0 + token: ${{ secrets.GITHUB_TOKEN }} + - name: Download all artifacts uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8 with: path: artifacts + - name: Update version in Cargo.toml and Cargo.lock + run: | + # Extract version from tag (remove 'v' prefix if present) + VERSION="${GITHUB_REF#refs/tags/}" + VERSION="${VERSION#v}" + + # Install cargo-edit for version bumping + cargo install cargo-edit + + # Update Cargo.toml version + cargo set-version "$VERSION" + + # Configure git + git config user.name "github-actions[bot]" + git config user.email "github-actions[bot]@users.noreply.github.com" + + # Commit and push changes + git add Cargo.toml Cargo.lock + git commit -m "chore: bump version to $VERSION" || echo "No changes to commit" + git push origin HEAD:main + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - name: Create Release - uses: softprops/action-gh-release@c95fe1489396fe360a41fb53f90de6ddce8c4c8a # v2.2.1 + uses: softprops/action-gh-release@v2 with: files: artifacts/**/* generate_release_notes: true diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..d1f22c1 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,5 @@ +## Pull Requests - Rules +- ONLY signed and verified commits +- ONLY from your name +- DO NOT commit with `codex` or `claude` as author/commiter +- PREFER `flow` branch for development, not `main` diff --git a/Cargo.toml b/Cargo.toml index 72ac944..77e72ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.0.4" +version = "3.0.5" edition = "2024" [dependencies] diff --git a/LICENSING.md b/LICENSING.md new file mode 100644 index 0000000..50d007b --- /dev/null +++ b/LICENSING.md @@ -0,0 +1,17 @@ +# LICENSING +## Licenses for Versions +| Version | License | +|---------|---------------| +| 1.0 | NO LICNESE | +| 1.1 | NO LICENSE | +| 1.2 | NO LICENSE | +| 2.0 | NO LICENSE | +| 3.0 | TELEMT UL 1 | + +### License Types +- **NO LICENSE** = ***ALL RIGHT RESERVED*** +- **TELEMT UL1** - work in progress license for source code of `telemt`, which encourages: + - fair use, + - contributions, + - distribution, + - but prohibits NOT mentioning the authors diff --git a/README.md b/README.md index 65bfa60..cb7cd49 100644 --- a/README.md +++ b/README.md @@ -10,74 +10,40 @@ ### πŸ‡·πŸ‡Ί RU -15 фСвраля ΠΌΡ‹ ΠΎΠΏΡƒΠ±Π»ΠΈΠΊΠΎΠ²Π°Π»ΠΈ `telemt 3` с ΠΏΠΎΠ΄Π΄Π΅Ρ€ΠΆΠΊΠΎΠΉ Middle-End Proxy, Π° Π·Π½Π°Ρ‡ΠΈΡ‚: +18 фСвраля ΠΌΡ‹ ΠΎΠΏΡƒΠ±Π»ΠΈΠΊΠΎΠ²Π°Π»ΠΈ `telemt 3.0.3`, ΠΎΠ½ ΠΈΠΌΠ΅Π΅Ρ‚: -- с Ρ„ΡƒΠ½ΠΊΡ†ΠΈΠΎΠ½Π°Π»ΡŒΠ½Ρ‹ΠΌΠΈ ΠΌΠ΅Π΄ΠΈΠ°, Π² Ρ‚ΠΎΠΌ числС с CDN/DC=203 -- с Ad-tag β€” ΠΏΠΎΠΊΠ°Π·Ρ‹Π²Π°ΠΉΡ‚Π΅ спонсорский ΠΊΠ°Π½Π°Π» ΠΈ собирайтС статистику Ρ‡Π΅Ρ€Π΅Π· ΠΎΡ„ΠΈΡ†ΠΈΠ°Π»ΡŒΠ½ΠΎΠ³ΠΎ Π±ΠΎΡ‚Π° -- с Π½ΠΎΠ²Ρ‹ΠΌ ΠΏΠΎΠ΄Ρ…ΠΎΠ΄ΠΎΠΌ ΠΊ бСзопасности ΠΈ асинхронности -- с высокоточной диагностикой ΠΊΡ€ΠΈΠΏΡ‚ΠΎΠ³Ρ€Π°Ρ„ΠΈΠΈ Ρ‡Π΅Ρ€Π΅Π· `ME_DIAG` +- ΡƒΠ»ΡƒΡ‡ΡˆΠ΅Π½Π½Ρ‹ΠΉ ΠΌΠ΅Ρ…Π°Π½ΠΈΠ·ΠΌ Middle-End Health Check +- высокоскоростноС восстановлСниС ΠΈΠ½ΠΈΡ†ΠΈΠ°Π»ΠΈΠ·Π°Ρ†ΠΈΠΈ Middle-End +- мСньшС Π·Π°Π΄Π΅Ρ€ΠΆΠ΅ΠΊ Π½Π° hot-path +- Π±ΠΎΠ»Π΅Π΅ ΠΊΠΎΡ€Ρ€Π΅ΠΊΡ‚Π½ΡƒΡŽ Ρ€Π°Π±ΠΎΡ‚Ρƒ Π² Dualstack, Π° ΠΈΠΌΠ΅Π½Π½ΠΎ - IPv6 Middle-End +- Π°ΠΊΠΊΡƒΡ€Π°Ρ‚Π½ΠΎΠ΅ ΠΏΠ΅Ρ€Π΅ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ ΠΊΠ»ΠΈΠ΅Π½Ρ‚Π° Π±Π΅Π· Π΄Ρ€ΠΈΡ„Ρ‚Π° сСссий ΠΌΠ΅ΠΆΠ΄Ρƒ Middle-End +- автоматичСская дСградация Π½Π° Direct-DC ΠΏΡ€ΠΈ массовой (>2 ME-DC-Π³Ρ€ΡƒΠΏΠΏ) нСдоступности Middle-End +- Π°Π²Ρ‚ΠΎΠ΄Π΅Ρ‚Π΅ΠΊΡ‚ IP Π·Π° NAT, ΠΏΡ€ΠΈ возмоТности - Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ Ρ…Π΅Π½Π΄ΡˆΠ΅ΠΉΠΊ с ME, ΠΏΡ€ΠΈ Π½Π΅ΡƒΠ΄Π°Ρ‡Π΅ - автодСградация +- СдинствСнный извСстный ΡΠΏΠ΅Ρ†ΠΈΠ°Π»ΡŒΠ½Ρ‹ΠΉ DC=203 ΡƒΠΆΠ΅ Π΄ΠΎΠ±Π°Π²Π»Π΅Π½ Π² ΠΊΠΎΠ΄: ΠΌΠ΅Π΄ΠΈΠ° Π·Π°Π³Ρ€ΡƒΠΆΠ°ΡŽΡ‚ΡΡ с CDN Π² Direct-DC Ρ€Π΅ΠΆΠΈΠΌΠ΅ -Для использования Π½ΡƒΠΆΠ½ΠΎ: +[Π—Π΄Π΅ΡΡŒ Π²Ρ‹ ΠΌΠΎΠΆΠ΅Ρ‚Π΅ Π½Π°ΠΉΡ‚ΠΈ Ρ€Π΅Π»ΠΈΠ·](https://github.com/telemt/telemt/releases/tag/3.0.3) -1. ВСрсия `telemt` β‰₯3.0.0 -2. Π’Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ любого ΠΈΠ· Π½Π°Π±ΠΎΡ€ΠΎΠ² условий: - - ΠΏΡƒΠ±Π»ΠΈΡ‡Π½Ρ‹ΠΉ IP для исходящих соСдинСний установлСн Π½Π° интСрфСйса инстанса с `telemt` - - Π›Π˜Π‘Πž - - Π²Ρ‹ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚Π΅ NAT 1:1 + Π²ΠΊΠ»ΡŽΡ‡ΠΈΠ»ΠΈ STUN-ΠΏΡ€ΠΎΠ±ΠΈΠ½Π³ -3. Π’ ΠΊΠΎΠ½Ρ„ΠΈΠ³Π΅, Π² сСкции `[general]` ΡƒΠΊΠ°Π·Π°Ρ‚ΡŒ: -```toml -use_middle_proxy = true -``` - -Если условия ΠΈΠ· ΠΏΡƒΠ½ΠΊΡ‚Π° 1 Π½Π΅ Π²Ρ‹ΠΏΠΎΠ»Π½ΡΡŽΡ‚ΡΡ: -1. Π’Ρ‹ΠΊΠ»ΡŽΡ‡ΠΈΡ‚Π΅ ME-Ρ€Π΅ΠΆΠΈΠΌ: - - установитС `use_middle_proxy = false` - - Π›Π˜Π‘Πž - - Middle-End Proxy Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹ΠΊΠ»ΡŽΡ‡Π΅Π½ автоматичСски ΠΏΠΎ Ρ‚Π°ΠΉΠΌΠ°ΡƒΡ‚Ρƒ, Π½ΠΎ это Π·Π°ΠΉΠΌΡ‘Ρ‚ большС Π²Ρ€Π΅ΠΌΠ΅Π½ΠΈ ΠΏΡ€ΠΈ запускС -2. Π’ ΠΊΠΎΠ½Ρ„ΠΈΠ³Π΅, Π΄ΠΎΠ±Π°Π²ΡŒΡ‚Π΅ Π² ΠΊΠΎΠ½Π΅Ρ†: -```toml -[dc_overrides] -"203" = "91.105.192.100:443" -``` - -Если Ρƒ вас Π΅ΡΡ‚ΡŒ ΠΊΠΎΠΌΠΏΠ΅Ρ‚Π΅Π½Ρ†ΠΈΠΈ Π² асинхронных сСтСвых прилоТСниях, Π°Π½Π°Π»ΠΈΠ·Π΅ Ρ‚Ρ€Π°Ρ„ΠΈΠΊΠ°, рСвСрс-ΠΈΠ½ΠΆΠΈΠ½ΠΈΡ€ΠΈΠ½Π³Π΅ ΠΈΠ»ΠΈ сСтСвых расслСдованиях β€” ΠΌΡ‹ ΠΎΡ‚ΠΊΡ€Ρ‹Ρ‚Ρ‹ ΠΊ идСям ΠΈ pull requests. +Если Ρƒ вас Π΅ΡΡ‚ΡŒ ΠΊΠΎΠΌΠΏΠ΅Ρ‚Π΅Π½Ρ†ΠΈΠΈ Π² асинхронных сСтСвых прилоТСниях, Π°Π½Π°Π»ΠΈΠ·Π΅ Ρ‚Ρ€Π°Ρ„ΠΈΠΊΠ°, рСвСрс-ΠΈΠ½ΠΆΠΈΠ½ΠΈΡ€ΠΈΠ½Π³Π΅ ΠΈΠ»ΠΈ сСтСвых расслСдованиях - ΠΌΡ‹ ΠΎΡ‚ΠΊΡ€Ρ‹Ρ‚Ρ‹ ΠΊ идСям ΠΈ pull requests! ### πŸ‡¬πŸ‡§ EN -On February 15, we released `telemt 3` with support for Middle-End Proxy, which means: +On February 18, we released `telemt 3.0.3`. This version introduces: -- functional media, including CDN/DC=203 -- Ad-tag support – promote a sponsored channel and collect statistics via Telegram bot -- new approach to security and asynchronicity -- high-precision cryptography diagnostics via `ME_DIAG` +- improved Middle-End Health Check method +- high-speed recovery of Middle-End init +- reduced latency on the hot path +- correct Dualstack support: proper handling of IPv6 Middle-End +- *clean* client reconnection without session "drift" between Middle-End +- automatic degradation to Direct-DC mode in case of large-scale (>2 ME-DC groups) Middle-End unavailability +- automatic public IP detection behind NAT; first - Middle-End handshake is performed, otherwise automatic degradation is applied +- known special DC=203 is now handled natively: media is delivered from the CDN via Direct-DC mode -To use this feature, the following requirements must be met: -1. `telemt` version β‰₯ 3.0.0 -2. One of the following conditions satisfied: - - the instance running `telemt` has a public IP address assigned to its network interface for outbound connections - - OR - - you are using 1:1 NAT and have STUN probing enabled -3. In the config file, under the `[general]` section, specify: -```toml -use_middle_proxy = true -```` +[Release is available here](https://github.com/telemt/telemt/releases/tag/3.0.3) -If the conditions from step 1 are not satisfied: -1. Disable Middle-End mode: - - set `use_middle_proxy = false` - - OR - - Middle-End Proxy will be disabled automatically after a timeout, but this will increase startup time - -2. In the config file, add the following at the end: -```toml -[dc_overrides] -"203" = "91.105.192.100:443" -``` - -If you have expertise in asynchronous network applications, traffic analysis, reverse engineering, or network forensics β€” we welcome ideas, suggestions, and pull requests. +If you have expertise in asynchronous network applications, traffic analysis, reverse engineering, or network forensics - we welcome ideas and pull requests! @@ -86,7 +52,9 @@ If you have expertise in asynchronous network applications, traffic analysis, re # Features πŸ’₯ The configuration structure has changed since version 1.1.0.0. change it in your environment! -βš“ Our implementation of **TLS-fronting** is one of the most deeply debugged, focused, advanced and *almost* **"behaviorally consistent to real"**: we are confident we have it right - [see evidence on our validation and traces](#recognizability-for-dpi-and-crawler) +βš“ Our implementation of **TLS-fronting** is one of the most deeply debugged, focused, advanced and *almost* **"behaviorally consistent to real"**: we are confident we have it right - [see evidence on our validation and traces](#recognizability-for-dpi-and-crawler) + +βš“ Our ***Middle-End Pool*** is fastest by design in standard scenarios, compared to other implementations of connecting to the Middle-End Proxy: non dramatically, but usual # GOTO - [Features](#features) @@ -215,6 +183,7 @@ prefer_ipv6 = false fast_mode = true use_middle_proxy = false # ad_tag = "..." +# disable_colors = false # Disable colored output in logs (useful for files/systemd) [network] ipv4 = true @@ -247,7 +216,9 @@ ip = "::" # Users to show in the startup log (tg:// links) [general.links] -show = ["hello"] # Users to show in the startup log (tg:// links) +show = ["hello"] # Only show links for user "hello" +# show = ["alice", "bob"] # Only show links for alice and bob +# show = "*" # Show links for all users # public_host = "proxy.example.com" # Host (IP or domain) for tg:// links # public_port = 443 # Port for tg:// links (default: server.port) diff --git a/config.toml b/config.toml index 28dd3b6..45d6b75 100644 --- a/config.toml +++ b/config.toml @@ -5,6 +5,38 @@ prefer_ipv6 = false fast_mode = true use_middle_proxy = true #ad_tag = "00000000000000000000000000000000" +# Path to proxy-secret binary (auto-downloaded if missing). +proxy_secret_path = "proxy-secret" + +# === Middle Proxy (ME) === +# Public IP override for ME KDF when behind NAT; leave unset to auto-detect. +#middle_proxy_nat_ip = "203.0.113.10" +# Enable STUN probing to discover public IP:port for ME. +middle_proxy_nat_probe = true +# Primary STUN server (host:port); defaults to Telegram STUN when empty. +middle_proxy_nat_stun = "stun.l.google.com:19302" +# Optional fallback STUN servers list. +middle_proxy_nat_stun_servers = ["stun1.l.google.com:19302", "stun2.l.google.com:19302"] +# Desired number of concurrent ME writers in pool. +middle_proxy_pool_size = 16 +# Pre-initialized warm-standby ME connections kept idle. +middle_proxy_warm_standby = 8 +# Ignore STUN/interface mismatch and keep ME enabled even if IP differs. +stun_iface_mismatch_ignore = false +# Keepalive padding frames - fl==4 +me_keepalive_enabled = true +me_keepalive_interval_secs = 25 # Period between keepalives +me_keepalive_jitter_secs = 5 # Jitter added to interval +me_keepalive_payload_random = true # Randomize 4-byte payload (vs zeros) +# Stagger extra ME connections on warmup to de-phase lifecycles. +me_warmup_stagger_enabled = true +me_warmup_step_delay_ms = 500 # Base delay between extra connects +me_warmup_step_jitter_ms = 300 # Jitter for warmup delay +# Reconnect policy knobs. +me_reconnect_max_concurrent_per_dc = 1 # Parallel reconnects per DC - EXPERIMENTAL! UNSTABLE! +me_reconnect_backoff_base_ms = 500 # Backoff start +me_reconnect_backoff_cap_ms = 30000 # Backoff cap +me_reconnect_fast_retry_count = 11 # Quick retries before backoff [network] # Enable/disable families; ipv6 = true/false/auto(None) @@ -50,10 +82,13 @@ show = ["hello"] # Users to show in the startup log (tg:// links) # === Timeouts (in seconds) === [timeouts] -client_handshake = 15 +client_handshake = 30 tg_connect = 10 client_keepalive = 60 client_ack = 300 +# Quick ME reconnects for single-address DCs (count and per-attempt timeout, ms). +me_one_retry = 12 +me_one_timeout_ms = 1200 # === Anti-Censorship & Masking === [censorship] diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 3f8254c..19269a2 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -74,6 +74,34 @@ pub(crate) fn default_unknown_dc_log_path() -> Option { Some("unknown-dc.txt".to_string()) } +pub(crate) fn default_pool_size() -> usize { + 2 +} + +pub(crate) fn default_keepalive_interval() -> u64 { + 25 +} + +pub(crate) fn default_keepalive_jitter() -> u64 { + 5 +} + +pub(crate) fn default_warmup_step_delay_ms() -> u64 { + 500 +} + +pub(crate) fn default_warmup_step_jitter_ms() -> u64 { + 300 +} + +pub(crate) fn default_reconnect_backoff_base_ms() -> u64 { + 500 +} + +pub(crate) fn default_reconnect_backoff_cap_ms() -> u64 { + 30_000 +} + // Custom deserializer helpers #[derive(Deserialize)] diff --git a/src/config/load.rs b/src/config/load.rs index 512b734..a2fc19b 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -11,6 +11,32 @@ use crate::error::{ProxyError, Result}; use super::defaults::*; use super::types::*; +fn preprocess_includes(content: &str, base_dir: &Path, depth: u8) -> Result { + if depth > 10 { + return Err(ProxyError::Config("Include depth > 10".into())); + } + let mut output = String::with_capacity(content.len()); + for line in content.lines() { + let trimmed = line.trim(); + if let Some(rest) = trimmed.strip_prefix("include") { + let rest = rest.trim(); + if let Some(rest) = rest.strip_prefix('=') { + let path_str = rest.trim().trim_matches('"'); + let resolved = base_dir.join(path_str); + let included = std::fs::read_to_string(&resolved) + .map_err(|e| ProxyError::Config(e.to_string()))?; + let included_dir = resolved.parent().unwrap_or(base_dir); + output.push_str(&preprocess_includes(&included, included_dir, depth + 1)?); + output.push('\n'); + continue; + } + } + output.push_str(line); + output.push('\n'); + } + Ok(output) +} + fn validate_network_cfg(net: &mut NetworkConfig) -> Result<()> { if !net.ipv4 && matches!(net.ipv6, Some(false)) { return Err(ProxyError::Config( @@ -84,10 +110,12 @@ pub struct ProxyConfig { impl ProxyConfig { pub fn load>(path: P) -> Result { let content = - std::fs::read_to_string(path).map_err(|e| ProxyError::Config(e.to_string()))?; + std::fs::read_to_string(&path).map_err(|e| ProxyError::Config(e.to_string()))?; + let base_dir = path.as_ref().parent().unwrap_or(Path::new(".")); + let processed = preprocess_includes(&content, base_dir, 0)?; let mut config: ProxyConfig = - toml::from_str(&content).map_err(|e| ProxyError::Config(e.to_string()))?; + toml::from_str(&processed).map_err(|e| ProxyError::Config(e.to_string()))?; // Validate secrets. for (user, secret) in &config.access.users { @@ -151,8 +179,10 @@ impl ProxyConfig { validate_network_cfg(&mut config.network)?; - // Random fake_cert_len. - config.censorship.fake_cert_len = rand::rng().gen_range(1024..4096); + // Random fake_cert_len only when default is in use. + if config.censorship.fake_cert_len == default_fake_cert_len() { + config.censorship.fake_cert_len = rand::rng().gen_range(1024..4096); + } // Resolve listen_tcp: explicit value wins, otherwise auto-detect. // If unix socket is set β†’ TCP only when listen_addr_ipv4 or listeners are explicitly provided. @@ -208,6 +238,8 @@ impl ProxyConfig { upstream_type: UpstreamType::Direct { interface: None }, weight: 1, enabled: true, + scopes: String::new(), + selected_scope: String::new(), }); } diff --git a/src/config/types.rs b/src/config/types.rs index ef8fef7..9f6467a 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -143,6 +143,62 @@ pub struct GeneralConfig { #[serde(default)] pub middle_proxy_nat_stun: Option, + /// Optional list of STUN servers for NAT probing fallback. + #[serde(default)] + pub middle_proxy_nat_stun_servers: Vec, + + /// Desired size of active Middle-Proxy writer pool. + #[serde(default = "default_pool_size")] + pub middle_proxy_pool_size: usize, + + /// Number of warm standby ME connections kept pre-initialized. + #[serde(default)] + pub middle_proxy_warm_standby: usize, + + /// Enable ME keepalive padding frames. + #[serde(default = "default_true")] + pub me_keepalive_enabled: bool, + + /// Keepalive interval in seconds. + #[serde(default = "default_keepalive_interval")] + pub me_keepalive_interval_secs: u64, + + /// Keepalive jitter in seconds. + #[serde(default = "default_keepalive_jitter")] + pub me_keepalive_jitter_secs: u64, + + /// Keepalive payload randomized (4 bytes); otherwise zeros. + #[serde(default = "default_true")] + pub me_keepalive_payload_random: bool, + + /// Enable staggered warmup of extra ME writers. + #[serde(default = "default_true")] + pub me_warmup_stagger_enabled: bool, + + /// Base delay between warmup connections in ms. + #[serde(default = "default_warmup_step_delay_ms")] + pub me_warmup_step_delay_ms: u64, + + /// Jitter for warmup delay in ms. + #[serde(default = "default_warmup_step_jitter_ms")] + pub me_warmup_step_jitter_ms: u64, + + /// Max concurrent reconnect attempts per DC. + #[serde(default)] + pub me_reconnect_max_concurrent_per_dc: u32, + + /// Base backoff in ms for reconnect. + #[serde(default = "default_reconnect_backoff_base_ms")] + pub me_reconnect_backoff_base_ms: u64, + + /// Cap backoff in ms for reconnect. + #[serde(default = "default_reconnect_backoff_cap_ms")] + pub me_reconnect_backoff_cap_ms: u64, + + /// Fast retry attempts before backoff. + #[serde(default)] + pub me_reconnect_fast_retry_count: u32, + /// Ignore STUN/interface IP mismatch (keep using Middle Proxy even if NAT detected). #[serde(default)] pub stun_iface_mismatch_ignore: bool, @@ -175,6 +231,20 @@ impl Default for GeneralConfig { middle_proxy_nat_ip: None, middle_proxy_nat_probe: false, middle_proxy_nat_stun: None, + middle_proxy_nat_stun_servers: Vec::new(), + middle_proxy_pool_size: default_pool_size(), + middle_proxy_warm_standby: 0, + me_keepalive_enabled: true, + me_keepalive_interval_secs: default_keepalive_interval(), + me_keepalive_jitter_secs: default_keepalive_jitter(), + me_keepalive_payload_random: true, + me_warmup_stagger_enabled: true, + me_warmup_step_delay_ms: default_warmup_step_delay_ms(), + me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(), + me_reconnect_max_concurrent_per_dc: 1, + me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(), + me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(), + me_reconnect_fast_retry_count: 1, stun_iface_mismatch_ignore: false, unknown_dc_log_path: default_unknown_dc_log_path(), log_level: LogLevel::Normal, @@ -403,6 +473,10 @@ pub struct UpstreamConfig { pub weight: u16, #[serde(default = "default_true")] pub enabled: bool, + #[serde(default)] + pub scopes: String, + #[serde(skip)] + pub selected_scope: String, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/crypto/random.rs b/src/crypto/random.rs index 18862ab..99aa5f3 100644 --- a/src/crypto/random.rs +++ b/src/crypto/random.rs @@ -11,6 +11,9 @@ pub struct SecureRandom { inner: Mutex, } +unsafe impl Send for SecureRandom {} +unsafe impl Sync for SecureRandom {} + struct SecureRandomInner { rng: StdRng, cipher: AesCtr, @@ -211,4 +214,4 @@ mod tests { assert_ne!(shuffled, original); } -} \ No newline at end of file +} diff --git a/src/main.rs b/src/main.rs index 2660213..d542b63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -92,6 +92,10 @@ fn parse_cli() -> (String, bool, Option) { eprintln!(" --no-start Don't start the service after install"); std::process::exit(0); } + "--version" | "-V" => { + println!("telemt {}", env!("CARGO_PKG_VERSION")); + std::process::exit(0); + } s if !s.starts_with('-') => { config_path = s.to_string(); } @@ -106,18 +110,20 @@ fn parse_cli() -> (String, bool, Option) { } fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { - info!("--- Proxy Links ({}) ---", host); + info!(target: "telemt::links", "--- Proxy Links ({}) ---", host); for user_name in config.general.links.show.resolve_users(&config.access.users) { if let Some(secret) = config.access.users.get(user_name) { - info!("User: {}", user_name); + info!(target: "telemt::links", "User: {}", user_name); if config.general.modes.classic { info!( + target: "telemt::links", " Classic: tg://proxy?server={}&port={}&secret={}", host, port, secret ); } if config.general.modes.secure { info!( + target: "telemt::links", " DD: tg://proxy?server={}&port={}&secret=dd{}", host, port, secret ); @@ -125,15 +131,16 @@ fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { if config.general.modes.tls { let domain_hex = hex::encode(&config.censorship.tls_domain); info!( + target: "telemt::links", " EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}", host, port, secret, domain_hex ); } } else { - warn!("User '{}' in show_link not found", user_name); + warn!(target: "telemt::links", "User '{}' in show_link not found", user_name); } } - info!("------------------------"); + info!(target: "telemt::links", "------------------------"); } #[tokio::main] @@ -317,6 +324,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai config.general.middle_proxy_nat_ip, config.general.middle_proxy_nat_probe, config.general.middle_proxy_nat_stun.clone(), + config.general.middle_proxy_nat_stun_servers.clone(), probe.detected_ipv6, config.timeouts.me_one_retry, config.timeouts.me_one_timeout_ms, @@ -325,18 +333,32 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai cfg_v4.default_dc.or(cfg_v6.default_dc), decision.clone(), rng.clone(), + stats.clone(), + config.general.me_keepalive_enabled, + config.general.me_keepalive_interval_secs, + config.general.me_keepalive_jitter_secs, + config.general.me_keepalive_payload_random, + config.general.me_warmup_stagger_enabled, + config.general.me_warmup_step_delay_ms, + config.general.me_warmup_step_jitter_ms, + config.general.me_reconnect_max_concurrent_per_dc, + config.general.me_reconnect_backoff_base_ms, + config.general.me_reconnect_backoff_cap_ms, + config.general.me_reconnect_fast_retry_count, ); - match pool.init(2, &rng).await { + let pool_size = config.general.middle_proxy_pool_size.max(1); + match pool.init(pool_size, &rng).await { Ok(()) => { info!("Middle-End pool initialized successfully"); // Phase 4: Start health monitor let pool_clone = pool.clone(); let rng_clone = rng.clone(); + let min_conns = pool_size; tokio::spawn(async move { crate::transport::middle_proxy::me_health_monitor( - pool_clone, rng_clone, 2, + pool_clone, rng_clone, min_conns, ) .await; }); @@ -740,6 +762,8 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai // Switch to user-configured log level after startup let runtime_filter = if has_rust_log { EnvFilter::from_default_env() + } else if matches!(effective_log_level, LogLevel::Silent) { + EnvFilter::new("warn,telemt::links=info") } else { EnvFilter::new(effective_log_level.to_filter_str()) }; diff --git a/src/metrics.rs b/src/metrics.rs index 24acf30..fa6c680 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -91,6 +91,22 @@ fn render_metrics(stats: &Stats) -> String { let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter"); let _ = writeln!(out, "telemt_handshake_timeouts_total {}", stats.get_handshake_timeouts()); + let _ = writeln!(out, "# HELP telemt_me_keepalive_sent_total ME keepalive frames sent"); + let _ = writeln!(out, "# TYPE telemt_me_keepalive_sent_total counter"); + let _ = writeln!(out, "telemt_me_keepalive_sent_total {}", stats.get_me_keepalive_sent()); + + let _ = writeln!(out, "# HELP telemt_me_keepalive_failed_total ME keepalive send failures"); + let _ = writeln!(out, "# TYPE telemt_me_keepalive_failed_total counter"); + let _ = writeln!(out, "telemt_me_keepalive_failed_total {}", stats.get_me_keepalive_failed()); + + let _ = writeln!(out, "# HELP telemt_me_reconnect_attempts_total ME reconnect attempts"); + let _ = writeln!(out, "# TYPE telemt_me_reconnect_attempts_total counter"); + let _ = writeln!(out, "telemt_me_reconnect_attempts_total {}", stats.get_me_reconnect_attempts()); + + let _ = writeln!(out, "# HELP telemt_me_reconnect_success_total ME reconnect successes"); + let _ = writeln!(out, "# TYPE telemt_me_reconnect_success_total counter"); + let _ = writeln!(out, "telemt_me_reconnect_success_total {}", stats.get_me_reconnect_success()); + let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections"); let _ = writeln!(out, "# TYPE telemt_user_connections_total counter"); let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections"); diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index 537a93e..7b1ac1b 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -45,7 +45,7 @@ where ); let tg_stream = upstream_manager - .connect(dc_addr, Some(success.dc_idx)) + .connect(dc_addr, Some(success.dc_idx), user.strip_prefix("scope_").filter(|s| !s.is_empty())) .await?; debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected, performing TG handshake"); diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 2cdcdf9..38318cc 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -19,6 +19,10 @@ pub struct Stats { connects_all: AtomicU64, connects_bad: AtomicU64, handshake_timeouts: AtomicU64, + me_keepalive_sent: AtomicU64, + me_keepalive_failed: AtomicU64, + me_reconnect_attempts: AtomicU64, + me_reconnect_success: AtomicU64, user_stats: DashMap, start_time: parking_lot::RwLock>, } @@ -43,8 +47,16 @@ impl Stats { pub fn increment_connects_all(&self) { self.connects_all.fetch_add(1, Ordering::Relaxed); } pub fn increment_connects_bad(&self) { self.connects_bad.fetch_add(1, Ordering::Relaxed); } pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); } + pub fn increment_me_keepalive_sent(&self) { self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); } + pub fn increment_me_keepalive_failed(&self) { self.me_keepalive_failed.fetch_add(1, Ordering::Relaxed); } + pub fn increment_me_reconnect_attempt(&self) { self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); } + pub fn increment_me_reconnect_success(&self) { self.me_reconnect_success.fetch_add(1, Ordering::Relaxed); } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } + pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } + pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) } + pub fn get_me_reconnect_attempts(&self) -> u64 { self.me_reconnect_attempts.load(Ordering::Relaxed) } + pub fn get_me_reconnect_success(&self) -> u64 { self.me_reconnect_success.load(Ordering::Relaxed) } pub fn increment_user_connects(&self, user: &str) { self.user_stats.entry(user.to_string()).or_default() diff --git a/src/transport/middle_proxy/codec.rs b/src/transport/middle_proxy/codec.rs index cc33ae8..82f0960 100644 --- a/src/transport/middle_proxy/codec.rs +++ b/src/transport/middle_proxy/codec.rs @@ -4,6 +4,14 @@ use crate::crypto::{AesCbc, crc32}; use crate::error::{ProxyError, Result}; use crate::protocol::constants::*; +/// Commands sent to dedicated writer tasks to avoid mutex contention on TCP writes. +pub(crate) enum WriterCommand { + Data(Vec), + DataAndFlush(Vec), + Keepalive, + Close, +} + pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8]) -> Vec { let total_len = (4 + 4 + payload.len() + 4) as u32; let mut frame = Vec::with_capacity(total_len as usize); @@ -181,4 +189,12 @@ impl RpcWriter { self.send(payload).await?; self.writer.flush().await.map_err(ProxyError::Io) } + + pub(crate) async fn send_keepalive(&mut self, payload: [u8; 4]) -> Result<()> { + // Keepalive is a frame with fl == 4 and 4 bytes payload. + let mut frame = Vec::with_capacity(8); + frame.extend_from_slice(&4u32.to_le_bytes()); + frame.extend_from_slice(&payload); + self.send(&frame).await + } } diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 3c36820..d2bb550 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -13,6 +13,24 @@ use super::secret::download_proxy_secret; use crate::crypto::SecureRandom; use std::time::SystemTime; +async fn retry_fetch(url: &str) -> Option { + let delays = [1u64, 5, 15]; + for (i, d) in delays.iter().enumerate() { + match fetch_proxy_config(url).await { + Ok(cfg) => return Some(cfg), + Err(e) => { + if i == delays.len() - 1 { + warn!(error = %e, url, "fetch_proxy_config failed"); + } else { + debug!(error = %e, url, "fetch_proxy_config retrying"); + tokio::time::sleep(Duration::from_secs(*d)).await; + } + } + } + } + None +} + #[derive(Debug, Clone, Default)] pub struct ProxyConfigData { pub map: HashMap>, @@ -118,7 +136,8 @@ pub async fn me_config_updater(pool: Arc, rng: Arc, interv tick.tick().await; // Update proxy config v4 - if let Ok(cfg) = fetch_proxy_config("https://core.telegram.org/getProxyConfig").await { + let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await; + if let Some(cfg) = cfg_v4 { let changed = pool.update_proxy_maps(cfg.map.clone(), None).await; if let Some(dc) = cfg.default_dc { pool.default_dc.store(dc, std::sync::atomic::Ordering::Relaxed); @@ -129,14 +148,20 @@ pub async fn me_config_updater(pool: Arc, rng: Arc, interv } else { debug!("ME config v4 unchanged"); } - } else { - warn!("getProxyConfig update failed"); } // Update proxy config v6 (optional) - if let Ok(cfg_v6) = fetch_proxy_config("https://core.telegram.org/getProxyConfigV6").await { - let _ = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await; + let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await; + if let Some(cfg_v6) = cfg_v6 { + let changed = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await; + if changed { + info!("ME config updated (v6), reconciling connections"); + pool.reconcile_connections(&rng).await; + } else { + debug!("ME config v6 unchanged"); + } } + pool.reset_stun_state(); // Update proxy-secret match download_proxy_secret().await { diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 7141f94..d4d4a70 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -5,25 +5,30 @@ use std::time::{Duration, Instant}; use tracing::{debug, info, warn}; use rand::seq::SliceRandom; +use rand::Rng; use crate::crypto::SecureRandom; use crate::network::IpFamily; use super::MePool; +const HEALTH_INTERVAL_SECS: u64 = 1; +const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff +const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1; + pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); - let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); - let mut inflight_single: HashSet<(i32, IpFamily)> = HashSet::new(); + let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut inflight: HashMap<(i32, IpFamily), usize> = HashMap::new(); loop { - tokio::time::sleep(Duration::from_secs(30)).await; + tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await; check_family( IpFamily::V4, &pool, &rng, &mut backoff, - &mut last_attempt, - &mut inflight_single, + &mut next_attempt, + &mut inflight, ) .await; check_family( @@ -31,8 +36,8 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &pool, &rng, &mut backoff, - &mut last_attempt, - &mut inflight_single, + &mut next_attempt, + &mut inflight, ) .await; } @@ -43,8 +48,8 @@ async fn check_family( pool: &Arc, rng: &Arc, backoff: &mut HashMap<(i32, IpFamily), u64>, - last_attempt: &mut HashMap<(i32, IpFamily), Instant>, - inflight_single: &mut HashSet<(i32, IpFamily)>, + next_attempt: &mut HashMap<(i32, IpFamily), Instant>, + inflight: &mut HashMap<(i32, IpFamily), usize>, ) { let enabled = match family { IpFamily::V4 => pool.decision.ipv4_me, @@ -80,95 +85,60 @@ async fn check_family( for (dc, dc_addrs) in entries { let has_coverage = dc_addrs.iter().any(|a| writer_addrs.contains(a)); if has_coverage { - inflight_single.remove(&(dc, family)); - continue; - } - let key = (dc, family); - let delay = *backoff.get(&key).unwrap_or(&30); - let now = Instant::now(); - if let Some(last) = last_attempt.get(&key) { - if now.duration_since(*last).as_secs() < delay { - continue; - } - } - if dc_addrs.len() == 1 { - // Single ME address: fast retries then slower background retries. - if inflight_single.contains(&key) { - continue; - } - inflight_single.insert(key); - let addr = dc_addrs[0]; - let dc_id = dc; - let pool_clone = pool.clone(); - let rng_clone = rng.clone(); - let timeout = pool.me_one_timeout; - let quick_attempts = pool.me_one_retry.max(1); - tokio::spawn(async move { - let mut success = false; - for _ in 0..quick_attempts { - let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await; - match res { - Ok(Ok(())) => { - info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage"); - success = true; - break; - } - Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"), - Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"), - } - tokio::time::sleep(Duration::from_millis(1000)).await; - } - if success { - return; - } - let timeout_ms = timeout.as_millis(); - warn!( - dc = %dc_id, - ?family, - attempts = quick_attempts, - timeout_ms, - "DC={} has no ME coverage: {} tries * {} ms... retry in 5 seconds...", - dc_id, - quick_attempts, - timeout_ms - ); - loop { - tokio::time::sleep(Duration::from_secs(5)).await; - let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await; - match res { - Ok(Ok(())) => { - info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage"); - break; - } - Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"), - Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"), - } - } - // will drop inflight flag in outer loop when coverage detected - }); continue; } - warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting..."); - let mut shuffled = dc_addrs.clone(); - shuffled.shuffle(&mut rand::rng()); - let mut reconnected = false; - for addr in shuffled { - match pool.connect_one(addr, rng.as_ref()).await { - Ok(()) => { - info!(%addr, dc = %dc, ?family, "ME reconnected for DC coverage"); - backoff.insert(key, 30); - last_attempt.insert(key, now); - reconnected = true; - break; - } - Err(e) => debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed"), + let key = (dc, family); + let now = Instant::now(); + if let Some(ts) = next_attempt.get(&key) { + if now < *ts { + continue; } } - if !reconnected { - let next = (*backoff.get(&key).unwrap_or(&30)).saturating_mul(2).min(300); - backoff.insert(key, next); - last_attempt.insert(key, now); + + let max_concurrent = pool.me_reconnect_max_concurrent_per_dc.max(1) as usize; + if *inflight.get(&key).unwrap_or(&0) >= max_concurrent { + return; + } + *inflight.entry(key).or_insert(0) += 1; + + let mut shuffled = dc_addrs.clone(); + shuffled.shuffle(&mut rand::rng()); + let mut success = false; + for addr in shuffled { + let res = tokio::time::timeout(pool.me_one_timeout, pool.connect_one(addr, rng.as_ref())).await; + match res { + Ok(Ok(())) => { + info!(%addr, dc = %dc, ?family, "ME reconnected for DC coverage"); + pool.stats.increment_me_reconnect_success(); + backoff.insert(key, pool.me_reconnect_backoff_base.as_millis() as u64); + let jitter = pool.me_reconnect_backoff_base.as_millis() as u64 / JITTER_FRAC_NUM; + let wait = pool.me_reconnect_backoff_base + + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); + next_attempt.insert(key, now + wait); + success = true; + break; + } + Ok(Err(e)) => { + pool.stats.increment_me_reconnect_attempt(); + debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed") + } + Err(_) => debug!(%addr, dc = %dc, ?family, "ME reconnect timed out"), + } + } + if !success { + pool.stats.increment_me_reconnect_attempt(); + let curr = *backoff.get(&key).unwrap_or(&(pool.me_reconnect_backoff_base.as_millis() as u64)); + let next_ms = (curr.saturating_mul(2)).min(pool.me_reconnect_backoff_cap.as_millis() as u64); + backoff.insert(key, next_ms); + let jitter = next_ms / JITTER_FRAC_NUM; + let wait = Duration::from_millis(next_ms) + + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); + next_attempt.insert(key, now + wait); + warn!(dc = %dc, backoff_ms = next_ms, ?family, "DC has no ME coverage, scheduled reconnect"); + } + if let Some(v) = inflight.get_mut(&key) { + *v = v.saturating_sub(1); } } } diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 00041fd..84c526f 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -1,14 +1,14 @@ use std::collections::HashMap; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicUsize, Ordering}; use bytes::BytesMut; use rand::Rng; use rand::seq::SliceRandom; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{Mutex, RwLock, mpsc, Notify}; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; -use std::time::Duration; +use std::time::{Duration, Instant}; use crate::crypto::SecureRandom; use crate::error::{ProxyError, Result}; @@ -18,18 +18,18 @@ use crate::protocol::constants::*; use super::ConnRegistry; use super::registry::{BoundConn, ConnMeta}; -use super::codec::RpcWriter; +use super::codec::{RpcWriter, WriterCommand}; use super::reader::reader_loop; use super::MeResponse; - const ME_ACTIVE_PING_SECS: u64 = 25; const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; +const ME_KEEPALIVE_PAYLOAD_LEN: usize = 4; #[derive(Clone)] pub struct MeWriter { pub id: u64, pub addr: SocketAddr, - pub writer: Arc>, + pub tx: mpsc::Sender, pub cancel: CancellationToken, pub degraded: Arc, pub draining: Arc, @@ -47,11 +47,24 @@ pub struct MePool { pub(super) nat_ip_detected: Arc>>, pub(super) nat_probe: bool, pub(super) nat_stun: Option, + pub(super) nat_stun_servers: Vec, pub(super) detected_ipv6: Option, pub(super) nat_probe_attempts: std::sync::atomic::AtomicU8, pub(super) nat_probe_disabled: std::sync::atomic::AtomicBool, + pub(super) stun_backoff_until: Arc>>, pub(super) me_one_retry: u8, pub(super) me_one_timeout: Duration, + pub(super) me_keepalive_enabled: bool, + pub(super) me_keepalive_interval: Duration, + pub(super) me_keepalive_jitter: Duration, + pub(super) me_keepalive_payload_random: bool, + pub(super) me_warmup_stagger_enabled: bool, + pub(super) me_warmup_step_delay: Duration, + pub(super) me_warmup_step_jitter: Duration, + pub(super) me_reconnect_max_concurrent_per_dc: u32, + pub(super) me_reconnect_backoff_base: Duration, + pub(super) me_reconnect_backoff_cap: Duration, + pub(super) me_reconnect_fast_retry_count: u32, pub(super) proxy_map_v4: Arc>>>, pub(super) proxy_map_v6: Arc>>>, pub(super) default_dc: AtomicI32, @@ -59,6 +72,9 @@ pub struct MePool { pub(super) ping_tracker: Arc>>, pub(super) rtt_stats: Arc>>, pub(super) nat_reflection_cache: Arc>, + pub(super) writer_available: Arc, + pub(super) conn_count: AtomicUsize, + pub(super) stats: Arc, pool_size: usize, } @@ -75,6 +91,7 @@ impl MePool { nat_ip: Option, nat_probe: bool, nat_stun: Option, + nat_stun_servers: Vec, detected_ipv6: Option, me_one_retry: u8, me_one_timeout_ms: u64, @@ -83,6 +100,18 @@ impl MePool { default_dc: Option, decision: NetworkDecision, rng: Arc, + stats: Arc, + me_keepalive_enabled: bool, + me_keepalive_interval_secs: u64, + me_keepalive_jitter_secs: u64, + me_keepalive_payload_random: bool, + me_warmup_stagger_enabled: bool, + me_warmup_step_delay_ms: u64, + me_warmup_step_jitter_ms: u64, + me_reconnect_max_concurrent_per_dc: u32, + me_reconnect_backoff_base_ms: u64, + me_reconnect_backoff_cap_ms: u64, + me_reconnect_fast_retry_count: u32, ) -> Arc { Arc::new(Self { registry: Arc::new(ConnRegistry::new()), @@ -96,11 +125,25 @@ impl MePool { nat_ip_detected: Arc::new(RwLock::new(None)), nat_probe, nat_stun, + nat_stun_servers, detected_ipv6, nat_probe_attempts: std::sync::atomic::AtomicU8::new(0), nat_probe_disabled: std::sync::atomic::AtomicBool::new(false), + stun_backoff_until: Arc::new(RwLock::new(None)), me_one_retry, me_one_timeout: Duration::from_millis(me_one_timeout_ms), + stats, + me_keepalive_enabled, + me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs), + me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs), + me_keepalive_payload_random, + me_warmup_stagger_enabled, + me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms), + me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms), + me_reconnect_max_concurrent_per_dc, + me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms), + me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms), + me_reconnect_fast_retry_count, pool_size: 2, proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), @@ -109,6 +152,8 @@ impl MePool { ping_tracker: Arc::new(Mutex::new(HashMap::new())), rtt_stats: Arc::new(Mutex::new(HashMap::new())), nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())), + writer_available: Arc::new(Notify::new()), + conn_count: AtomicUsize::new(0), }) } @@ -116,6 +161,11 @@ impl MePool { self.proxy_tag.is_some() } + pub fn reset_stun_state(&self) { + self.nat_probe_attempts.store(0, Ordering::Relaxed); + self.nat_probe_disabled.store(false, Ordering::Relaxed); + } + pub fn translate_our_addr(&self, addr: SocketAddr) -> SocketAddr { let ip = self.translate_ip_for_nat(addr.ip()); SocketAddr::new(ip, addr.port()) @@ -132,7 +182,11 @@ impl MePool { pub async fn reconcile_connections(self: &Arc, rng: &SecureRandom) { use std::collections::HashSet; let writers = self.writers.read().await; - let current: HashSet = writers.iter().map(|w| w.addr).collect(); + let current: HashSet = writers + .iter() + .filter(|w| !w.draining.load(Ordering::Relaxed)) + .map(|w| w.addr) + .collect(); drop(writers); for family in self.family_order() { @@ -175,12 +229,36 @@ impl MePool { let mut guard = self.proxy_map_v6.write().await; if !v6.is_empty() && *guard != v6 { *guard = v6; + changed = true; + } + } + // Ensure negative DC entries mirror positives when absent (Telegram convention). + { + let mut guard = self.proxy_map_v4.write().await; + let keys: Vec = guard.keys().cloned().collect(); + for k in keys.iter().cloned().filter(|k| *k > 0) { + if !guard.contains_key(&-k) { + if let Some(addrs) = guard.get(&k).cloned() { + guard.insert(-k, addrs); + } + } + } + } + { + let mut guard = self.proxy_map_v6.write().await; + let keys: Vec = guard.keys().cloned().collect(); + for k in keys.iter().cloned().filter(|k| *k > 0) { + if !guard.contains_key(&-k) { + if let Some(addrs) = guard.get(&k).cloned() { + guard.insert(-k, addrs); + } + } } } changed } - pub async fn update_secret(&self, new_secret: Vec) -> bool { + pub async fn update_secret(self: &Arc, new_secret: Vec) -> bool { if new_secret.len() < 32 { warn!(len = new_secret.len(), "proxy-secret update ignored (too short)"); return false; @@ -195,10 +273,14 @@ impl MePool { false } - pub async fn reconnect_all(&self) { - // Graceful: do not drop all at once. New connections will use updated secret. - // Existing writers remain until health monitor replaces them. - // No-op here to avoid total outage. + pub async fn reconnect_all(self: &Arc) { + let ws = self.writers.read().await.clone(); + for w in ws { + if let Ok(()) = self.connect_one(w.addr, self.rng.as_ref()).await { + self.mark_writer_draining(w.id).await; + tokio::time::sleep(Duration::from_secs(2)).await; + } + } } pub(super) async fn key_selector(&self) -> u32 { @@ -277,7 +359,24 @@ impl MePool { return Err(ProxyError::Proxy("Too many ME DC init failures, falling back to direct".into())); } - // Additional connections up to pool_size total (round-robin across DCs) + // Additional connections up to pool_size total (round-robin across DCs), staggered to de-phase lifecycles. + if self.me_warmup_stagger_enabled { + let mut delay_ms = 0u64; + for (dc, addrs) in dc_addrs.iter() { + for (ip, port) in addrs { + if self.connection_count() >= pool_size { + break; + } + let addr = SocketAddr::new(*ip, *port); + let jitter = rand::rng().random_range(0..=self.me_warmup_step_jitter.as_millis() as u64); + delay_ms = delay_ms.saturating_add(self.me_warmup_step_delay.as_millis() as u64 + jitter); + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + if let Err(e) = self.connect_one(addr, rng.as_ref()).await { + debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed (staggered)"); + } + } + } + } else { for (dc, addrs) in dc_addrs.iter() { for (ip, port) in addrs { if self.connection_count() >= pool_size { @@ -292,6 +391,7 @@ impl MePool { break; } } + } if !self.decision.effective_multipath && self.connection_count() > 0 { break; @@ -317,21 +417,61 @@ impl MePool { let cancel = CancellationToken::new(); let degraded = Arc::new(AtomicBool::new(false)); let draining = Arc::new(AtomicBool::new(false)); - let rpc_w = Arc::new(Mutex::new(RpcWriter { + let (tx, mut rx) = mpsc::channel::(4096); + let tx_for_keepalive = tx.clone(); + let keepalive_random = self.me_keepalive_payload_random; + let stats = self.stats.clone(); + let mut rpc_writer = RpcWriter { writer: hs.wr, key: hs.write_key, iv: hs.write_iv, seq_no: 0, - })); + }; + let cancel_wr = cancel.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + cmd = rx.recv() => { + match cmd { + Some(WriterCommand::Data(payload)) => { + if rpc_writer.send(&payload).await.is_err() { break; } + } + Some(WriterCommand::DataAndFlush(payload)) => { + if rpc_writer.send_and_flush(&payload).await.is_err() { break; } + } + Some(WriterCommand::Keepalive) => { + let mut payload = [0u8; ME_KEEPALIVE_PAYLOAD_LEN]; + if keepalive_random { + rand::rng().fill(&mut payload); + } + match rpc_writer.send_keepalive(payload).await { + Ok(()) => { + stats.increment_me_keepalive_sent(); + } + Err(_) => { + stats.increment_me_keepalive_failed(); + break; + } + } + } + Some(WriterCommand::Close) | None => break, + } + } + _ = cancel_wr.cancelled() => break, + } + } + }); let writer = MeWriter { id: writer_id, addr, - writer: rpc_w.clone(), + tx: tx.clone(), cancel: cancel.clone(), degraded: degraded.clone(), draining: draining.clone(), }; self.writers.write().await.push(writer.clone()); + self.conn_count.fetch_add(1, Ordering::Relaxed); + self.writer_available.notify_waiters(); let reg = self.registry.clone(); let writers_arc = self.writers_arc(); @@ -339,11 +479,19 @@ impl MePool { let rtt_stats = self.rtt_stats.clone(); let pool = Arc::downgrade(self); let cancel_ping = cancel.clone(); - let rpc_w_ping = rpc_w.clone(); + let tx_ping = tx.clone(); let ping_tracker_ping = ping_tracker.clone(); + let cleanup_done = Arc::new(AtomicBool::new(false)); + let cleanup_for_reader = cleanup_done.clone(); + let cleanup_for_ping = cleanup_done.clone(); + let keepalive_enabled = self.me_keepalive_enabled; + let keepalive_interval = self.me_keepalive_interval; + let keepalive_jitter = self.me_keepalive_jitter; + let cancel_reader_token = cancel.clone(); + let cancel_ping_token = cancel_ping.clone(); + let cancel_keepalive_token = cancel.clone(); tokio::spawn(async move { - let cancel_reader = cancel.clone(); let res = reader_loop( hs.rd, hs.read_key, @@ -351,16 +499,21 @@ impl MePool { reg.clone(), BytesMut::new(), BytesMut::new(), - rpc_w.clone(), + tx.clone(), ping_tracker.clone(), rtt_stats.clone(), writer_id, degraded.clone(), - cancel_reader.clone(), + cancel_reader_token.clone(), ) .await; if let Some(pool) = pool.upgrade() { - pool.remove_writer_and_close_clients(writer_id).await; + if cleanup_for_reader + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() + { + pool.remove_writer_and_close_clients(writer_id).await; + } } if let Err(e) = res { warn!(error = %e, "ME reader ended"); @@ -378,7 +531,7 @@ impl MePool { .random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS); let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64; tokio::select! { - _ = cancel_ping.cancelled() => { + _ = cancel_ping_token.cancelled() => { break; } _ = tokio::time::sleep(Duration::from_secs(wait)) => {} @@ -389,20 +542,45 @@ impl MePool { p.extend_from_slice(&sent_id.to_le_bytes()); { let mut tracker = ping_tracker_ping.lock().await; + tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120)); tracker.insert(sent_id, (std::time::Instant::now(), writer_id)); } ping_id = ping_id.wrapping_add(1); - if let Err(e) = rpc_w_ping.lock().await.send_and_flush(&p).await { - debug!(error = %e, "Active ME ping failed, removing dead writer"); + if tx_ping.send(WriterCommand::DataAndFlush(p)).await.is_err() { + debug!("Active ME ping failed, removing dead writer"); cancel_ping.cancel(); if let Some(pool) = pool_ping.upgrade() { - pool.remove_writer_and_close_clients(writer_id).await; + if cleanup_for_ping + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() + { + pool.remove_writer_and_close_clients(writer_id).await; + } } break; } } }); + if keepalive_enabled { + let tx_keepalive = tx_for_keepalive; + let cancel_keepalive = cancel_keepalive_token; + tokio::spawn(async move { + // Per-writer jittered start to avoid phase sync. + let initial_jitter_ms = rand::rng().random_range(0..=keepalive_jitter.as_millis().max(1) as u64); + tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await; + loop { + tokio::select! { + _ = cancel_keepalive.cancelled() => break, + _ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=keepalive_jitter.as_millis() as u64))) => {} + } + if tx_keepalive.send(WriterCommand::Keepalive).await.is_err() { + break; + } + } + }); + } + Ok(()) } @@ -430,7 +608,7 @@ impl MePool { false } - pub(crate) async fn remove_writer_and_close_clients(&self, writer_id: u64) { + pub(crate) async fn remove_writer_and_close_clients(self: &Arc, writer_id: u64) { let conns = self.remove_writer_only(writer_id).await; for bound in conns { let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await; @@ -444,8 +622,11 @@ impl MePool { if let Some(pos) = ws.iter().position(|w| w.id == writer_id) { let w = ws.remove(pos); w.cancel.cancel(); + let _ = w.tx.send(WriterCommand::Close).await; + self.conn_count.fetch_sub(1, Ordering::Relaxed); } } + self.rtt_stats.lock().await.remove(&writer_id); self.registry.writer_lost(writer_id).await } @@ -459,8 +640,14 @@ impl MePool { let pool = Arc::downgrade(self); tokio::spawn(async move { + let deadline = Instant::now() + Duration::from_secs(300); loop { if let Some(p) = pool.upgrade() { + if Instant::now() >= deadline { + warn!(writer_id, "Drain timeout, force-closing"); + let _ = p.remove_writer_and_close_clients(writer_id).await; + break; + } if p.registry.is_writer_empty(writer_id).await { let _ = p.remove_writer_only(writer_id).await; break; diff --git a/src/transport/middle_proxy/pool_nat.rs b/src/transport/middle_proxy/pool_nat.rs index 1b4f7c7..d3dec16 100644 --- a/src/transport/middle_proxy/pool_nat.rs +++ b/src/transport/middle_proxy/pool_nat.rs @@ -98,16 +98,18 @@ impl MePool { family: IpFamily, ) -> Option { const STUN_CACHE_TTL: Duration = Duration::from_secs(600); - // If STUN probing was disabled after attempts, reuse cached (even stale) or skip. - if self.nat_probe_disabled.load(std::sync::atomic::Ordering::Relaxed) { - if let Ok(cache) = self.nat_reflection_cache.try_lock() { - let slot = match family { - IpFamily::V4 => cache.v4, - IpFamily::V6 => cache.v6, - }; - return slot.map(|(_, addr)| addr); + // Backoff window + if let Some(until) = *self.stun_backoff_until.read().await { + if Instant::now() < until { + if let Ok(cache) = self.nat_reflection_cache.try_lock() { + let slot = match family { + IpFamily::V4 => cache.v4, + IpFamily::V6 => cache.v6, + }; + return slot.map(|(_, addr)| addr); + } + return None; } - return None; } if let Ok(mut cache) = self.nat_reflection_cache.try_lock() { @@ -123,48 +125,42 @@ impl MePool { } let attempt = self.nat_probe_attempts.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if attempt >= 2 { - self.nat_probe_disabled.store(true, std::sync::atomic::Ordering::Relaxed); - return None; - } + let servers = if !self.nat_stun_servers.is_empty() { + self.nat_stun_servers.clone() + } else if let Some(s) = &self.nat_stun { + vec![s.clone()] + } else { + vec!["stun.l.google.com:19302".to_string()] + }; - let stun_addr = self - .nat_stun - .clone() - .unwrap_or_else(|| "stun.l.google.com:19302".to_string()); - match stun_probe_dual(&stun_addr).await { - Ok(res) => { - let picked: Option = match family { - IpFamily::V4 => res.v4, - IpFamily::V6 => res.v6, - }; - if let Some(result) = picked { - info!(local = %result.local_addr, reflected = %result.reflected_addr, family = ?family, "NAT probe: reflected address"); - if let Ok(mut cache) = self.nat_reflection_cache.try_lock() { - let slot = match family { - IpFamily::V4 => &mut cache.v4, - IpFamily::V6 => &mut cache.v6, - }; - *slot = Some((Instant::now(), result.reflected_addr)); + for stun_addr in servers { + match stun_probe_dual(&stun_addr).await { + Ok(res) => { + let picked: Option = match family { + IpFamily::V4 => res.v4, + IpFamily::V6 => res.v6, + }; + if let Some(result) = picked { + info!(local = %result.local_addr, reflected = %result.reflected_addr, family = ?family, stun = %stun_addr, "NAT probe: reflected address"); + self.nat_probe_attempts.store(0, std::sync::atomic::Ordering::Relaxed); + if let Ok(mut cache) = self.nat_reflection_cache.try_lock() { + let slot = match family { + IpFamily::V4 => &mut cache.v4, + IpFamily::V6 => &mut cache.v6, + }; + *slot = Some((Instant::now(), result.reflected_addr)); + } + return Some(result.reflected_addr); } - Some(result.reflected_addr) - } else { - None } - } - Err(e) => { - let attempts = attempt + 1; - if attempts <= 2 { - warn!(error = %e, attempt = attempts, "NAT probe failed"); - } else { - debug!(error = %e, attempt = attempts, "NAT probe suppressed after max attempts"); + Err(e) => { + warn!(error = %e, stun = %stun_addr, attempt = attempt + 1, "NAT probe failed, trying next server"); } - if attempts >= 2 { - self.nat_probe_disabled.store(true, std::sync::atomic::Ordering::Relaxed); - } - None } } + let backoff = Duration::from_secs(60 * 2u64.pow((attempt as u32).min(6))); + *self.stun_backoff_until.write().await = Some(Instant::now() + backoff); + None } } diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index fb40fdb..c22ed68 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -6,7 +6,7 @@ use std::time::Instant; use bytes::{Bytes, BytesMut}; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, mpsc}; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; @@ -14,7 +14,7 @@ use crate::crypto::{AesCbc, crc32}; use crate::error::{ProxyError, Result}; use crate::protocol::constants::*; -use super::codec::RpcWriter; +use super::codec::WriterCommand; use super::{ConnRegistry, MeResponse}; pub(crate) async fn reader_loop( @@ -24,7 +24,7 @@ pub(crate) async fn reader_loop( reg: Arc, enc_leftover: BytesMut, mut dec: BytesMut, - writer: Arc>, + tx: mpsc::Sender, ping_tracker: Arc>>, rtt_stats: Arc>>, _writer_id: u64, @@ -33,6 +33,8 @@ pub(crate) async fn reader_loop( ) -> Result<()> { let mut raw = enc_leftover; let mut expected_seq: i32 = 0; + let mut crc_errors = 0u32; + let mut seq_mismatch = 0u32; loop { let mut tmp = [0u8; 16_384]; @@ -80,12 +82,20 @@ pub(crate) async fn reader_loop( let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap()); if crc32(&frame[..pe]) != ec { warn!("CRC mismatch in data frame"); + crc_errors += 1; + if crc_errors > 3 { + return Err(ProxyError::Proxy("Too many CRC mismatches".into())); + } continue; } let seq_no = i32::from_le_bytes(frame[4..8].try_into().unwrap()); if seq_no != expected_seq { warn!(seq_no, expected = expected_seq, "ME RPC seq mismatch"); + seq_mismatch += 1; + if seq_mismatch > 10 { + return Err(ProxyError::Proxy("Too many seq mismatches".into())); + } expected_seq = seq_no.wrapping_add(1); } else { expected_seq = expected_seq.wrapping_add(1); @@ -108,7 +118,7 @@ pub(crate) async fn reader_loop( let routed = reg.route(cid, MeResponse::Data { flags, data }).await; if !routed { reg.unregister(cid).await; - send_close_conn(&writer, cid).await; + send_close_conn(&tx, cid).await; } } else if pt == RPC_SIMPLE_ACK_U32 && body.len() >= 12 { let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); @@ -118,7 +128,7 @@ pub(crate) async fn reader_loop( let routed = reg.route(cid, MeResponse::Ack(cfm)).await; if !routed { reg.unregister(cid).await; - send_close_conn(&writer, cid).await; + send_close_conn(&tx, cid).await; } } else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 { let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); @@ -136,8 +146,8 @@ pub(crate) async fn reader_loop( let mut pong = Vec::with_capacity(12); pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes()); pong.extend_from_slice(&ping_id.to_le_bytes()); - if let Err(e) = writer.lock().await.send_and_flush(&pong).await { - warn!(error = %e, "PONG send failed"); + if tx.send(WriterCommand::DataAndFlush(pong)).await.is_err() { + warn!("PONG send failed"); break; } } else if pt == RPC_PONG_U32 && body.len() >= 8 { @@ -171,12 +181,10 @@ pub(crate) async fn reader_loop( } } -async fn send_close_conn(writer: &Arc>, conn_id: u64) { +async fn send_close_conn(tx: &mpsc::Sender, conn_id: u64) { let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes()); - if let Err(e) = writer.lock().await.send_and_flush(&p).await { - debug!(conn_id, error = %e, "Failed to send RPC_CLOSE_CONN"); - } + let _ = tx.send(WriterCommand::DataAndFlush(p)).await; } diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 04f8baa..ab4f280 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use tokio::sync::{mpsc, Mutex, RwLock}; -use super::codec::RpcWriter; +use super::codec::WriterCommand; use super::MeResponse; #[derive(Clone)] @@ -25,12 +25,12 @@ pub struct BoundConn { #[derive(Clone)] pub struct ConnWriter { pub writer_id: u64, - pub writer: Arc>, + pub tx: mpsc::Sender, } struct RegistryInner { map: HashMap>, - writers: HashMap>>, + writers: HashMap>, writer_for_conn: HashMap, conns_for_writer: HashMap>, meta: HashMap, @@ -96,13 +96,13 @@ impl ConnRegistry { &self, conn_id: u64, writer_id: u64, - writer: Arc>, + tx: mpsc::Sender, meta: ConnMeta, ) { let mut inner = self.inner.write().await; inner.meta.entry(conn_id).or_insert(meta); inner.writer_for_conn.insert(conn_id, writer_id); - inner.writers.entry(writer_id).or_insert_with(|| writer.clone()); + inner.writers.entry(writer_id).or_insert_with(|| tx.clone()); inner .conns_for_writer .entry(writer_id) @@ -114,7 +114,7 @@ impl ConnRegistry { let inner = self.inner.read().await; let writer_id = inner.writer_for_conn.get(&conn_id).cloned()?; let writer = inner.writers.get(&writer_id).cloned()?; - Some(ConnWriter { writer_id, writer }) + Some(ConnWriter { writer_id, tx: writer }) } pub async fn writer_lost(&self, writer_id: u64) -> Vec { diff --git a/src/transport/middle_proxy/rotation.rs b/src/transport/middle_proxy/rotation.rs index 6d94f3e..e141fc4 100644 --- a/src/transport/middle_proxy/rotation.rs +++ b/src/transport/middle_proxy/rotation.rs @@ -31,8 +31,17 @@ pub async fn me_rotation_task(pool: Arc, rng: Arc, interva info!(addr = %w.addr, writer_id = w.id, "Rotating ME connection"); match pool.connect_one(w.addr, rng.as_ref()).await { Ok(()) => { - // Mark old writer for graceful drain; removal happens when sessions finish. - pool.mark_writer_draining(w.id).await; + tokio::time::sleep(Duration::from_secs(2)).await; + let ws = pool.writers.read().await; + let new_alive = ws.iter().any(|nw| + nw.id != w.id && nw.addr == w.addr && !nw.degraded.load(Ordering::Relaxed) && !nw.draining.load(Ordering::Relaxed) + ); + drop(ws); + if new_alive { + pool.mark_writer_draining(w.id).await; + } else { + warn!(addr = %w.addr, writer_id = w.id, "New writer died, keeping old"); + } } Err(e) => { warn!(addr = %w.addr, writer_id = w.id, error = %e, "ME rotation connect failed"); diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 2b0c42e..627906d 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -10,6 +10,7 @@ use crate::network::IpFamily; use crate::protocol::constants::RPC_CLOSE_EXT_U32; use super::MePool; +use super::codec::WriterCommand; use super::wire::build_proxy_req_payload; use rand::seq::SliceRandom; use super::registry::ConnMeta; @@ -43,18 +44,15 @@ impl MePool { loop { if let Some(current) = self.registry.get_writer(conn_id).await { let send_res = { - if let Ok(mut guard) = current.writer.try_lock() { - let r = guard.send(&payload).await; - drop(guard); - r - } else { - current.writer.lock().await.send(&payload).await - } + current + .tx + .send(WriterCommand::Data(payload.clone())) + .await }; match send_res { Ok(()) => return Ok(()), - Err(e) => { - warn!(error = %e, writer_id = current.writer_id, "ME write failed"); + Err(_) => { + warn!(writer_id = current.writer_id, "ME writer channel closed"); self.remove_writer_and_close_clients(current.writer_id).await; continue; } @@ -64,7 +62,26 @@ impl MePool { let mut writers_snapshot = { let ws = self.writers.read().await; if ws.is_empty() { - return Err(ProxyError::Proxy("All ME connections dead".into())); + drop(ws); + for family in self.family_order() { + let map = match family { + IpFamily::V4 => self.proxy_map_v4.read().await.clone(), + IpFamily::V6 => self.proxy_map_v6.read().await.clone(), + }; + for (_dc, addrs) in map.iter() { + for (ip, port) in addrs { + let addr = SocketAddr::new(*ip, *port); + if self.connect_one(addr, self.rng.as_ref()).await.is_ok() { + self.writer_available.notify_waiters(); + break; + } + } + } + } + if tokio::time::timeout(Duration::from_secs(3), self.writer_available.notified()).await.is_err() { + return Err(ProxyError::Proxy("All ME connections dead (waited 3s)".into())); + } + continue; } ws.clone() }; @@ -96,9 +113,10 @@ impl MePool { writers_snapshot = ws2.clone(); drop(ws2); candidate_indices = self.candidate_indices_for_dc(&writers_snapshot, target_dc).await; - break; + if !candidate_indices.is_empty() { + break; + } } - drop(map_guard); } if candidate_indices.is_empty() { return Err(ProxyError::Proxy("No ME writers available for target DC".into())); @@ -120,22 +138,15 @@ impl MePool { if w.draining.load(Ordering::Relaxed) { continue; } - if let Ok(mut guard) = w.writer.try_lock() { - let send_res = guard.send(&payload).await; - drop(guard); - match send_res { - Ok(()) => { - self.registry - .bind_writer(conn_id, w.id, w.writer.clone(), meta.clone()) - .await; - return Ok(()); - } - Err(e) => { - warn!(error = %e, writer_id = w.id, "ME write failed"); - self.remove_writer_and_close_clients(w.id).await; - continue; - } - } + if w.tx.send(WriterCommand::Data(payload.clone())).await.is_ok() { + self.registry + .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) + .await; + return Ok(()); + } else { + warn!(writer_id = w.id, "ME writer channel closed"); + self.remove_writer_and_close_clients(w.id).await; + continue; } } @@ -143,15 +154,15 @@ impl MePool { if w.draining.load(Ordering::Relaxed) { continue; } - match w.writer.lock().await.send(&payload).await { + match w.tx.send(WriterCommand::Data(payload.clone())).await { Ok(()) => { self.registry - .bind_writer(conn_id, w.id, w.writer.clone(), meta.clone()) + .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) .await; return Ok(()); } - Err(e) => { - warn!(error = %e, writer_id = w.id, "ME write failed (blocking)"); + Err(_) => { + warn!(writer_id = w.id, "ME writer channel closed (blocking)"); self.remove_writer_and_close_clients(w.id).await; } } @@ -163,8 +174,8 @@ impl MePool { let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes()); - if let Err(e) = w.writer.lock().await.send_and_flush(&p).await { - debug!(error = %e, "ME close write failed"); + if w.tx.send(WriterCommand::DataAndFlush(p)).await.is_err() { + debug!("ME close write failed"); self.remove_writer_and_close_clients(w.writer_id).await; } } else { @@ -176,7 +187,7 @@ impl MePool { } pub fn connection_count(&self) -> usize { - self.writers.try_read().map(|w| w.len()).unwrap_or(0) + self.conn_count.load(Ordering::Relaxed) } pub(super) async fn candidate_indices_for_dc( diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index bf73408..8fdd437 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -167,20 +167,44 @@ impl UpstreamManager { } /// Select upstream using latency-weighted random selection. - async fn select_upstream(&self, dc_idx: Option) -> Option { + async fn select_upstream(&self, dc_idx: Option, scope: Option<&str>) -> Option { let upstreams = self.upstreams.read().await; if upstreams.is_empty() { return None; } - - let healthy: Vec = upstreams.iter() + // Scope filter: + // If scope is set: only scoped and matched items + // If scope is not set: only unscoped items + let filtered_upstreams : Vec = upstreams.iter() .enumerate() - .filter(|(_, u)| u.healthy) + .filter(|(_, u)| { + scope.map_or( + u.config.scopes.is_empty(), + |req_scope| { + u.config.scopes + .split(',') + .map(str::trim) + .any(|s| s == req_scope) + } + ) + }) .map(|(i, _)| i) .collect(); + // Healthy filter + let healthy: Vec = filtered_upstreams.iter() + .filter(|&&i| upstreams[i].healthy) + .copied() + .collect(); + + if filtered_upstreams.is_empty() { + warn!(scope = scope, "No upstreams available! Using first (direct?)"); + return None; + } + if healthy.is_empty() { - return Some(rand::rng().gen_range(0..upstreams.len())); + warn!(scope = scope, "No healthy upstreams available! Using random."); + return Some(filtered_upstreams[rand::rng().gen_range(0..filtered_upstreams.len())]); } if healthy.len() == 1 { @@ -222,15 +246,20 @@ impl UpstreamManager { } /// Connect to target through a selected upstream. - pub async fn connect(&self, target: SocketAddr, dc_idx: Option) -> Result { - let idx = self.select_upstream(dc_idx).await + pub async fn connect(&self, target: SocketAddr, dc_idx: Option, scope: Option<&str>) -> Result { + let idx = self.select_upstream(dc_idx, scope).await .ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?; - let upstream = { + let mut upstream = { let guard = self.upstreams.read().await; guard[idx].config.clone() }; + // Set scope for configuration copy + if let Some(s) = scope { + upstream.selected_scope = s.to_string(); + } + let start = Instant::now(); match self.connect_via_upstream(&upstream, target).await { @@ -313,8 +342,12 @@ impl UpstreamManager { if let Some(e) = stream.take_error()? { return Err(ProxyError::Io(e)); } + // replace socks user_id with config.selected_scope, if set + let scope: Option<&str> = Some(config.selected_scope.as_str()) + .filter(|s| !s.is_empty()); + let _user_id: Option<&str> = scope.or(user_id.as_deref()); - connect_socks4(&mut stream, target, user_id.as_deref()).await?; + connect_socks4(&mut stream, target, _user_id).await?; Ok(stream) }, UpstreamType::Socks5 { address, interface, username, password } => { @@ -341,7 +374,14 @@ impl UpstreamManager { return Err(ProxyError::Io(e)); } - connect_socks5(&mut stream, target, username.as_deref(), password.as_deref()).await?; + debug!(config = ?config, "Socks5 connection"); + // replace socks user:pass with config.selected_scope, if set + let scope: Option<&str> = Some(config.selected_scope.as_str()) + .filter(|s| !s.is_empty()); + let _username: Option<&str> = scope.or(username.as_deref()); + let _password: Option<&str> = scope.or(password.as_deref()); + + connect_socks5(&mut stream, target, _username, _password).await?; Ok(stream) }, }