diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 116c1d4..87a8e30 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -3,11 +3,12 @@ name: Release on: push: tags: - - '[0-9]+.[0-9]+.[0-9]+' # Matches tags like 3.0.0, 3.1.2, etc. - workflow_dispatch: # Manual trigger from GitHub Actions UI + - '[0-9]+.[0-9]+.[0-9]+' + workflow_dispatch: permissions: contents: read + packages: write env: CARGO_TERM_COLOR: always @@ -37,11 +38,9 @@ jobs: asset_name: telemt-aarch64-linux-musl steps: - - name: Checkout repository - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - uses: actions/checkout@v4 - - name: Install stable Rust toolchain - uses: dtolnay/rust-toolchain@888c2e1ea69ab0d4330cbf0af1ecc7b68f368cc1 # v1 + - uses: dtolnay/rust-toolchain@v1 with: toolchain: stable targets: ${{ matrix.target }} @@ -51,8 +50,7 @@ jobs: sudo apt-get update sudo apt-get install -y gcc-aarch64-linux-gnu - - name: Cache cargo registry & build artifacts - uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2 + - uses: actions/cache@v4 with: path: | ~/.cargo/registry @@ -76,8 +74,7 @@ jobs: tar -czvf ${{ matrix.asset_name }}.tar.gz ${{ matrix.artifact_name }} sha256sum ${{ matrix.asset_name }}.tar.gz > ${{ matrix.asset_name }}.sha256 - - name: Upload artifact - uses: actions/upload-artifact@65c4c4a1ddee5b72f698fdd19549f0f0fb45cf08 # v4.6.0 + - uses: actions/upload-artifact@v4 with: name: ${{ matrix.asset_name }} path: | @@ -85,30 +82,37 @@ jobs: target/${{ matrix.target }}/release/${{ matrix.asset_name }}.sha256 build-docker-image: + needs: build runs-on: ubuntu-latest + permissions: + contents: read + packages: write + steps: - - name: Checkout - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - - name: Set up QEMU - uses: docker/setup-qemu-action@v3 + - uses: docker/setup-qemu-action@v3 + - uses: docker/setup-buildx-action@v3 - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 - - - name: Login to GitHub Container Registry - uses: docker/login-action@v2 + - name: Login to GHCR + uses: docker/login-action@v3 with: registry: ghcr.io - username: ${{ github.repository_owner }} - password: ${{ secrets.TOKEN_GH_DEPLOY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract version + id: vars + run: echo "VERSION=${GITHUB_REF#refs/tags/}" >> $GITHUB_OUTPUT - name: Build and push uses: docker/build-push-action@v6 with: context: . push: true - tags: ${{ github.ref }} + tags: | + ghcr.io/${{ github.repository }}:${{ steps.vars.outputs.VERSION }} + ghcr.io/${{ github.repository }}:latest release: name: Create Release @@ -118,40 +122,14 @@ jobs: contents: write steps: - - name: Checkout repository - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - uses: actions/checkout@v4 with: fetch-depth: 0 - token: ${{ secrets.GITHUB_TOKEN }} - - name: Download all artifacts - uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8 + - uses: actions/download-artifact@v4 with: path: artifacts - - name: Update version in Cargo.toml and Cargo.lock - run: | - # Extract version from tag (remove 'v' prefix if present) - VERSION="${GITHUB_REF#refs/tags/}" - VERSION="${VERSION#v}" - - # Install cargo-edit for version bumping - cargo install cargo-edit - - # Update Cargo.toml version - cargo set-version "$VERSION" - - # Configure git - git config user.name "github-actions[bot]" - git config user.email "github-actions[bot]@users.noreply.github.com" - - # Commit and push changes - #git add Cargo.toml Cargo.lock - #git commit -m "chore: bump version to $VERSION" || echo "No changes to commit" - #git push origin HEAD:main - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Create Release uses: softprops/action-gh-release@v2 with: diff --git a/.gitignore b/.gitignore index ad67955..6b5f1d5 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,7 @@ target # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +*.rs +target +Cargo.lock +src diff --git a/Cargo.lock b/Cargo.lock index fb45f19..251f0b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2087,7 +2087,7 @@ dependencies = [ [[package]] name = "telemt" -version = "3.0.10" +version = "3.0.13" dependencies = [ "aes", "anyhow", diff --git a/README.md b/README.md index a88b8df..8d0c41a 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ - Улучшение обработки ошибок в edge-case транспортных сценариях Релиз: -[3.0.9](https://github.com/telemt/telemt/releases/tag/3.0.9) +[3.0.12](https://github.com/telemt/telemt/releases/tag/3.0.12) --- @@ -69,7 +69,7 @@ Additionally, we implemented a set of robustness enhancements designed to: - Improve error handling in edge-case transport scenarios Release: -[3.0.9](https://github.com/telemt/telemt/releases/tag/3.0.9) +[3.0.12](https://github.com/telemt/telemt/releases/tag/3.0.12) --- diff --git a/config.toml b/config.toml index be27ca6..375cd7f 100644 --- a/config.toml +++ b/config.toml @@ -38,10 +38,21 @@ me_warmup_stagger_enabled = true me_warmup_step_delay_ms = 500 # Base delay between extra connects me_warmup_step_jitter_ms = 300 # Jitter for warmup delay # Reconnect policy knobs. -me_reconnect_max_concurrent_per_dc = 1 # Parallel reconnects per DC - EXPERIMENTAL! UNSTABLE! +me_reconnect_max_concurrent_per_dc = 4 # Parallel reconnects per DC - EXPERIMENTAL! UNSTABLE! me_reconnect_backoff_base_ms = 500 # Backoff start me_reconnect_backoff_cap_ms = 30000 # Backoff cap me_reconnect_fast_retry_count = 11 # Quick retries before backoff +update_every = 7200 # Resolve the active updater interval for ME infrastructure refresh tasks. +crypto_pending_buffer = 262144 # Max pending ciphertext buffer per client writer (bytes). Controls FakeTLS backpressure vs throughput. +max_client_frame = 16777216 # Maximum allowed client MTProto frame size (bytes). +desync_all_full = false # Emit full crypto-desync forensic logs for every event. When false, full forensic details are emitted once per key window. +me_reinit_drain_timeout_secs = 300 # Drain timeout in seconds for stale ME writers after endpoint map changes. Set to 0 to keep stale writers draining indefinitely (no force-close). +auto_degradation_enabled = true # Enable auto-degradation from ME to Direct-DC. +degradation_min_unavailable_dc_groups = 2 # Minimum unavailable ME DC groups before degrading. +hardswap = true # Enable C-like hard-swap for ME pool generations. When true, Telemt prewarms a new generation and switches once full coverage is reached. +me_pool_drain_ttl_secs = 90 # Drain-TTL in seconds for stale ME writers after endpoint map changes. During TTL, stale writers may be used only as fallback for new bindings. +me_pool_min_fresh_ratio = 0.8 # Minimum desired-DC coverage ratio required before draining stale writers. Range: 0.0..=1.0. +me_reinit_drain_timeout_secs = 120 # Drain timeout in seconds for stale ME writers after endpoint map changes. Set to 0 to keep stale writers draining indefinitely (no force-close). [general.modes] classic = false diff --git a/proxy-secret b/proxy-secret new file mode 100644 index 0000000..ef77163 --- /dev/null +++ b/proxy-secret @@ -0,0 +1 @@ +ʖxHl~,D0d]UJUAM'!FnRZD>ϳF>yZfa*ߜڋ o8zM:dq>\3w}n\TĐy'VIil&] \ No newline at end of file diff --git a/src/cli.rs b/src/cli.rs index 3525a22..a1182a7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -196,7 +196,10 @@ use_middle_proxy = false log_level = "normal" desync_all_full = false update_every = 43200 -me_reinit_drain_timeout_secs = 300 +hardswap = false +me_pool_drain_ttl_secs = 90 +me_pool_min_fresh_ratio = 0.8 +me_reinit_drain_timeout_secs = 120 [network] ipv4 = true diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 01cdcb0..a0443fc 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -1,4 +1,3 @@ -use std::net::IpAddr; use std::collections::HashMap; use ipnetwork::IpNetwork; use serde::Deserialize; @@ -83,7 +82,7 @@ pub(crate) fn default_unknown_dc_log_path() -> Option { } pub(crate) fn default_pool_size() -> usize { - 2 + 8 } pub(crate) fn default_keepalive_interval() -> u64 { @@ -144,10 +143,18 @@ pub(crate) fn default_alpn_enforce() -> bool { pub(crate) fn default_stun_servers() -> Vec { vec![ + "stun.l.google.com:5349".to_string(), + "stun1.l.google.com:3478".to_string(), + "stun.gmx.net:3478".to_string(), "stun.l.google.com:19302".to_string(), + "stun.1und1.de:3478".to_string(), "stun1.l.google.com:19302".to_string(), "stun2.l.google.com:19302".to_string(), + "stun3.l.google.com:19302".to_string(), + "stun4.l.google.com:19302".to_string(), + "stun.services.mozilla.com:3478".to_string(), "stun.stunprotocol.org:3478".to_string(), + "stun.nextcloud.com:3478".to_string(), "stun.voip.eutelia.it:3478".to_string(), ] } @@ -164,19 +171,31 @@ pub(crate) fn default_cache_public_ip_path() -> String { } pub(crate) fn default_proxy_secret_reload_secs() -> u64 { - 12 * 60 * 60 + 1 * 60 * 60 } pub(crate) fn default_proxy_config_reload_secs() -> u64 { - 12 * 60 * 60 + 1 * 60 * 60 } pub(crate) fn default_update_every_secs() -> u64 { - 2 * 60 * 60 + 1 * 30 * 60 } pub(crate) fn default_me_reinit_drain_timeout_secs() -> u64 { - 300 + 120 +} + +pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 { + 90 +} + +pub(crate) fn default_me_pool_min_fresh_ratio() -> f32 { + 0.8 +} + +pub(crate) fn default_hardswap() -> bool { + true } pub(crate) fn default_ntp_check() -> bool { diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 5c7263f..7f121f6 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -12,6 +12,9 @@ //! | `general` | `me_keepalive_*` | Passed on next connection | //! | `general` | `desync_all_full` | Applied immediately | //! | `general` | `update_every` | Applied to ME updater immediately | +//! | `general` | `hardswap` | Applied on next ME map update | +//! | `general` | `me_pool_drain_ttl_secs` | Applied on next ME map update | +//! | `general` | `me_pool_min_fresh_ratio` | Applied on next ME map update | //! | `general` | `me_reinit_drain_timeout_secs`| Applied on next ME map update | //! | `access` | All user/quota fields | Effective immediately | //! @@ -39,6 +42,9 @@ pub struct HotFields { pub middle_proxy_pool_size: usize, pub desync_all_full: bool, pub update_every_secs: u64, + pub hardswap: bool, + pub me_pool_drain_ttl_secs: u64, + pub me_pool_min_fresh_ratio: f32, pub me_reinit_drain_timeout_secs: u64, pub me_keepalive_enabled: bool, pub me_keepalive_interval_secs: u64, @@ -55,6 +61,9 @@ impl HotFields { middle_proxy_pool_size: cfg.general.middle_proxy_pool_size, desync_all_full: cfg.general.desync_all_full, update_every_secs: cfg.general.effective_update_every_secs(), + hardswap: cfg.general.hardswap, + me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs, + me_pool_min_fresh_ratio: cfg.general.me_pool_min_fresh_ratio, me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs, me_keepalive_enabled: cfg.general.me_keepalive_enabled, me_keepalive_interval_secs: cfg.general.me_keepalive_interval_secs, @@ -198,6 +207,27 @@ fn log_changes( ); } + if old_hot.hardswap != new_hot.hardswap { + info!( + "config reload: hardswap: {} → {}", + old_hot.hardswap, new_hot.hardswap, + ); + } + + if old_hot.me_pool_drain_ttl_secs != new_hot.me_pool_drain_ttl_secs { + info!( + "config reload: me_pool_drain_ttl_secs: {}s → {}s", + old_hot.me_pool_drain_ttl_secs, new_hot.me_pool_drain_ttl_secs, + ); + } + + if (old_hot.me_pool_min_fresh_ratio - new_hot.me_pool_min_fresh_ratio).abs() > f32::EPSILON { + info!( + "config reload: me_pool_min_fresh_ratio: {:.3} → {:.3}", + old_hot.me_pool_min_fresh_ratio, new_hot.me_pool_min_fresh_ratio, + ); + } + if old_hot.me_reinit_drain_timeout_secs != new_hot.me_reinit_drain_timeout_secs { info!( "config reload: me_reinit_drain_timeout_secs: {}s → {}s", diff --git a/src/config/load.rs b/src/config/load.rs index fa61539..dce5fbc 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -1,3 +1,5 @@ +#![allow(deprecated)] + use std::collections::HashMap; use std::net::IpAddr; use std::path::Path; @@ -145,6 +147,24 @@ impl ProxyConfig { } } + if !(0.0..=1.0).contains(&config.general.me_pool_min_fresh_ratio) { + return Err(ProxyError::Config( + "general.me_pool_min_fresh_ratio must be within [0.0, 1.0]".to_string(), + )); + } + + if config.general.effective_me_pool_force_close_secs() > 0 + && config.general.effective_me_pool_force_close_secs() + < config.general.me_pool_drain_ttl_secs + { + warn!( + me_pool_drain_ttl_secs = config.general.me_pool_drain_ttl_secs, + me_reinit_drain_timeout_secs = config.general.effective_me_pool_force_close_secs(), + "force-close timeout is lower than drain TTL; bumping force-close timeout to TTL" + ); + config.general.me_reinit_drain_timeout_secs = config.general.me_pool_drain_ttl_secs; + } + // Validate secrets. for (user, secret) in &config.access.users { if !secret.chars().all(|c| c.is_ascii_hexdigit()) || secret.len() != 32 { @@ -439,4 +459,45 @@ mod tests { assert!(err.contains("general.update_every must be > 0")); let _ = std::fs::remove_file(path); } + + #[test] + fn me_pool_min_fresh_ratio_out_of_range_is_rejected() { + let toml = r#" + [general] + me_pool_min_fresh_ratio = 1.5 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_pool_min_ratio_invalid_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.me_pool_min_fresh_ratio must be within [0.0, 1.0]")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn force_close_bumped_when_below_drain_ttl() { + let toml = r#" + [general] + me_pool_drain_ttl_secs = 90 + me_reinit_drain_timeout_secs = 30 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_force_close_bump_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert_eq!(cfg.general.me_reinit_drain_timeout_secs, 90); + let _ = std::fs::remove_file(path); + } } diff --git a/src/config/types.rs b/src/config/types.rs index eb16885..8d573df 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -206,6 +206,11 @@ pub struct GeneralConfig { #[serde(default = "default_desync_all_full")] pub desync_all_full: bool, + /// Enable C-like hard-swap for ME pool generations. + /// When true, Telemt prewarms a new generation and switches once full coverage is reached. + #[serde(default = "default_hardswap")] + pub hardswap: bool, + /// Enable staggered warmup of extra ME writers. #[serde(default = "default_true")] pub me_warmup_stagger_enabled: bool, @@ -262,6 +267,16 @@ pub struct GeneralConfig { #[serde(default)] pub update_every: Option, + /// Drain-TTL in seconds for stale ME writers after endpoint map changes. + /// During TTL, stale writers may be used only as fallback for new bindings. + #[serde(default = "default_me_pool_drain_ttl_secs")] + pub me_pool_drain_ttl_secs: u64, + + /// Minimum desired-DC coverage ratio required before draining stale writers. + /// Range: 0.0..=1.0. + #[serde(default = "default_me_pool_min_fresh_ratio")] + pub me_pool_min_fresh_ratio: f32, + /// Drain timeout in seconds for stale ME writers after endpoint map changes. /// Set to 0 to keep stale writers draining indefinitely (no force-close). #[serde(default = "default_me_reinit_drain_timeout_secs")] @@ -308,7 +323,7 @@ impl Default for GeneralConfig { middle_proxy_nat_stun: None, middle_proxy_nat_stun_servers: Vec::new(), middle_proxy_pool_size: default_pool_size(), - middle_proxy_warm_standby: 8, + middle_proxy_warm_standby: 16, me_keepalive_enabled: true, me_keepalive_interval_secs: default_keepalive_interval(), me_keepalive_jitter_secs: default_keepalive_jitter(), @@ -316,7 +331,7 @@ impl Default for GeneralConfig { me_warmup_stagger_enabled: true, me_warmup_step_delay_ms: default_warmup_step_delay_ms(), me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(), - me_reconnect_max_concurrent_per_dc: 4, + me_reconnect_max_concurrent_per_dc: 8, me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(), me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(), me_reconnect_fast_retry_count: 8, @@ -328,8 +343,11 @@ impl Default for GeneralConfig { crypto_pending_buffer: default_crypto_pending_buffer(), max_client_frame: default_max_client_frame(), desync_all_full: default_desync_all_full(), + hardswap: default_hardswap(), fast_mode_min_tls_record: default_fast_mode_min_tls_record(), update_every: Some(default_update_every_secs()), + me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(), + me_pool_min_fresh_ratio: default_me_pool_min_fresh_ratio(), me_reinit_drain_timeout_secs: default_me_reinit_drain_timeout_secs(), proxy_secret_auto_reload_secs: default_proxy_secret_reload_secs(), proxy_config_auto_reload_secs: default_proxy_config_reload_secs(), @@ -348,6 +366,12 @@ impl GeneralConfig { self.update_every .unwrap_or_else(|| self.proxy_secret_auto_reload_secs.min(self.proxy_config_auto_reload_secs)) } + + /// Resolve force-close timeout for stale writers. + /// `me_reinit_drain_timeout_secs` remains backward-compatible alias. + pub fn effective_me_pool_force_close_secs(&self) -> u64 { + self.me_reinit_drain_timeout_secs + } } /// `[general.links]` — proxy link generation settings. diff --git a/src/crypto/aes.rs b/src/crypto/aes.rs index 9e123bf..674e4cb 100644 --- a/src/crypto/aes.rs +++ b/src/crypto/aes.rs @@ -11,6 +11,8 @@ //! `HandshakeSuccess`, `ObfuscationParams`) are responsible for //! zeroizing their own copies. +#![allow(dead_code)] + use aes::Aes256; use ctr::{Ctr128BE, cipher::{KeyIvInit, StreamCipher}}; use zeroize::Zeroize; diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index 266a3cb..9108f34 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -6,7 +6,6 @@ pub mod random; pub use aes::{AesCtr, AesCbc}; pub use hash::{ - build_middleproxy_prekey, crc32, crc32c, derive_middleproxy_keys, md5, sha1, sha256, - sha256_hmac, + build_middleproxy_prekey, crc32, crc32c, derive_middleproxy_keys, sha256, sha256_hmac, }; pub use random::SecureRandom; diff --git a/src/crypto/random.rs b/src/crypto/random.rs index f3432e0..0dd5f1a 100644 --- a/src/crypto/random.rs +++ b/src/crypto/random.rs @@ -1,5 +1,8 @@ //! Pseudorandom +#![allow(deprecated)] +#![allow(dead_code)] + use rand::{Rng, RngCore, SeedableRng}; use rand::rngs::StdRng; use parking_lot::Mutex; diff --git a/src/error.rs b/src/error.rs index f934672..eaebd88 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,7 @@ //! Error Types +#![allow(dead_code)] + use std::fmt; use std::net::SocketAddr; use thiserror::Error; diff --git a/src/ip_tracker.rs b/src/ip_tracker.rs index 17cebc7..32fcbe3 100644 --- a/src/ip_tracker.rs +++ b/src/ip_tracker.rs @@ -1,5 +1,7 @@ // src/ip_tracker.rs -// Модуль для отслеживания и ограничения уникальных IP-адресов пользователей +// IP address tracking and limiting for users + +#![allow(dead_code)] use std::collections::{HashMap, HashSet}; use std::net::IpAddr; diff --git a/src/main.rs b/src/main.rs index 3a6ad1a..0d1eccc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,7 @@ //! telemt — Telegram MTProto Proxy +#![allow(unused_assignments)] + use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -73,36 +75,27 @@ fn parse_cli() -> (String, bool, Option) { log_level = Some(s.trim_start_matches("--log-level=").to_string()); } "--help" | "-h" => { - eprintln!("telemt - Telegram MTProto Proxy v{}", env!("CARGO_PKG_VERSION")); + eprintln!("Usage: telemt [config.toml] [OPTIONS]"); eprintln!(); - eprintln!("USAGE:"); - eprintln!(" telemt [CONFIG] [OPTIONS]"); - eprintln!(" telemt --init [INIT_OPTIONS]"); + eprintln!("Options:"); + eprintln!(" --silent, -s Suppress info logs"); + eprintln!(" --log-level debug|verbose|normal|silent"); + eprintln!(" --help, -h Show this help"); eprintln!(); - eprintln!("ARGS:"); - eprintln!(" Path to config file (default: config.toml)"); - eprintln!(); - eprintln!("OPTIONS:"); - eprintln!(" -s, --silent Suppress info logs (equivalent to --log-level silent)"); - eprintln!(" --log-level Set log level [possible values: debug, verbose, normal, silent]"); - eprintln!(" -h, --help Show this help message"); - eprintln!(" -V, --version Print version number"); - eprintln!(); - eprintln!("INIT OPTIONS (fire-and-forget setup):"); - eprintln!(" --init Generate config, install systemd service, and start"); + eprintln!("Setup (fire-and-forget):"); + eprintln!( + " --init Generate config, install systemd service, start" + ); eprintln!(" --port Listen port (default: 443)"); - eprintln!(" --domain TLS domain for masking (default: www.google.com)"); - eprintln!(" --secret 32-char hex secret (auto-generated if omitted)"); - eprintln!(" --user Username for proxy access (default: user)"); + eprintln!( + " --domain TLS domain for masking (default: www.google.com)" + ); + eprintln!( + " --secret 32-char hex secret (auto-generated if omitted)" + ); + eprintln!(" --user Username (default: user)"); eprintln!(" --config-dir Config directory (default: /etc/telemt)"); - eprintln!(" --no-start Create config and service but don't start"); - eprintln!(); - eprintln!("EXAMPLES:"); - eprintln!(" telemt # Run with default config"); - eprintln!(" telemt /etc/telemt/config.toml # Run with specific config"); - eprintln!(" telemt --log-level debug # Run with debug logging"); - eprintln!(" telemt --init # Quick setup with defaults"); - eprintln!(" telemt --init --port 8443 --user admin # Custom setup"); + eprintln!(" --no-start Don't start the service after install"); std::process::exit(0); } "--version" | "-V" => { @@ -371,6 +364,10 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai config.general.me_reconnect_backoff_base_ms, config.general.me_reconnect_backoff_cap_ms, config.general.me_reconnect_fast_retry_count, + config.general.hardswap, + config.general.me_pool_drain_ttl_secs, + config.general.effective_me_pool_force_close_secs(), + config.general.me_pool_min_fresh_ratio, ); let pool_size = config.general.middle_proxy_pool_size.max(1); @@ -422,6 +419,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai if me_pool.is_some() { info!("Transport: Middle-End Proxy - all DC-over-RPC"); } else { + let _ = use_middle_proxy; use_middle_proxy = false; // Make runtime config reflect direct-only mode for handlers. config.general.use_middle_proxy = false; diff --git a/src/metrics.rs b/src/metrics.rs index 326d333..53ddd5d 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -2,7 +2,7 @@ use std::convert::Infallible; use std::net::SocketAddr; use std::sync::Arc; -use http_body_util::{Full, BodyExt}; +use http_body_util::Full; use hyper::body::Bytes; use hyper::server::conn::http1; use hyper::service::service_fn; @@ -175,6 +175,30 @@ fn render_metrics(stats: &Stats) -> String { stats.get_desync_frames_bucket_gt_10() ); + let _ = writeln!(out, "# HELP telemt_pool_swap_total Successful ME pool swaps"); + let _ = writeln!(out, "# TYPE telemt_pool_swap_total counter"); + let _ = writeln!(out, "telemt_pool_swap_total {}", stats.get_pool_swap_total()); + + let _ = writeln!(out, "# HELP telemt_pool_drain_active Active draining ME writers"); + let _ = writeln!(out, "# TYPE telemt_pool_drain_active gauge"); + let _ = writeln!(out, "telemt_pool_drain_active {}", stats.get_pool_drain_active()); + + let _ = writeln!(out, "# HELP telemt_pool_force_close_total Forced close events for draining writers"); + let _ = writeln!(out, "# TYPE telemt_pool_force_close_total counter"); + let _ = writeln!( + out, + "telemt_pool_force_close_total {}", + stats.get_pool_force_close_total() + ); + + let _ = writeln!(out, "# HELP telemt_pool_stale_pick_total Stale writer fallback picks for new binds"); + let _ = writeln!(out, "# TYPE telemt_pool_stale_pick_total counter"); + let _ = writeln!( + out, + "telemt_pool_stale_pick_total {}", + stats.get_pool_stale_pick_total() + ); + let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections"); let _ = writeln!(out, "# TYPE telemt_user_connections_total counter"); let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections"); @@ -205,6 +229,7 @@ fn render_metrics(stats: &Stats) -> String { #[cfg(test)] mod tests { use super::*; + use http_body_util::BodyExt; #[test] fn test_render_metrics_format() { diff --git a/src/network/probe.rs b/src/network/probe.rs index d290ac1..eda69b8 100644 --- a/src/network/probe.rs +++ b/src/network/probe.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use tracing::{info, warn}; diff --git a/src/network/stun.rs b/src/network/stun.rs index 6a93339..c47aa49 100644 --- a/src/network/stun.rs +++ b/src/network/stun.rs @@ -1,3 +1,6 @@ +#![allow(unreachable_code)] +#![allow(dead_code)] + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use tokio::net::{lookup_host, UdpSocket}; diff --git a/src/protocol/constants.rs b/src/protocol/constants.rs index c930a1b..e6ddbaf 100644 --- a/src/protocol/constants.rs +++ b/src/protocol/constants.rs @@ -1,6 +1,8 @@ //! Protocol constants and datacenter addresses -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +#![allow(dead_code)] + +use std::net::{IpAddr, Ipv4Addr}; use crate::crypto::SecureRandom; use std::sync::LazyLock; diff --git a/src/protocol/frame.rs b/src/protocol/frame.rs index f4517b4..a332be0 100644 --- a/src/protocol/frame.rs +++ b/src/protocol/frame.rs @@ -1,5 +1,7 @@ //! MTProto frame types and metadata +#![allow(dead_code)] + use std::collections::HashMap; /// Extra metadata associated with a frame diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 4081f1c..5518df2 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -5,7 +5,11 @@ pub mod frame; pub mod obfuscation; pub mod tls; +#[allow(unused_imports)] pub use constants::*; +#[allow(unused_imports)] pub use frame::*; +#[allow(unused_imports)] pub use obfuscation::*; +#[allow(unused_imports)] pub use tls::*; \ No newline at end of file diff --git a/src/protocol/obfuscation.rs b/src/protocol/obfuscation.rs index 4d2197d..d9d1c0a 100644 --- a/src/protocol/obfuscation.rs +++ b/src/protocol/obfuscation.rs @@ -1,8 +1,9 @@ //! MTProto Obfuscation +#![allow(dead_code)] + use zeroize::Zeroize; use crate::crypto::{sha256, AesCtr}; -use crate::error::Result; use super::constants::*; /// Obfuscation parameters from handshake diff --git a/src/protocol/tls.rs b/src/protocol/tls.rs index 93d111f..091092a 100644 --- a/src/protocol/tls.rs +++ b/src/protocol/tls.rs @@ -4,8 +4,11 @@ //! for domain fronting. The handshake looks like valid TLS 1.3 but //! actually carries MTProto authentication data. +#![allow(dead_code)] + use crate::crypto::{sha256_hmac, SecureRandom}; -use crate::error::{ProxyError, Result}; +#[cfg(test)] +use crate::error::ProxyError; use super::constants::*; use std::time::{SystemTime, UNIX_EPOCH}; use num_bigint::BigUint; @@ -613,7 +616,7 @@ pub fn parse_tls_record_header(header: &[u8; 5]) -> Option<(u8, u16)> { /// /// This is useful for testing that our ServerHello is well-formed. #[cfg(test)] -fn validate_server_hello_structure(data: &[u8]) -> Result<()> { +fn validate_server_hello_structure(data: &[u8]) -> Result<(), ProxyError> { if data.len() < 5 { return Err(ProxyError::InvalidTlsRecord { record_type: 0, diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 14b45da..051ce9e 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -271,7 +271,7 @@ impl RunningClientHandler { self.peer = normalize_ip(self.peer); let peer = self.peer; - let ip_tracker = self.ip_tracker.clone(); + let _ip_tracker = self.ip_tracker.clone(); debug!(peer = %peer, "New connection"); if let Err(e) = configure_client_socket( @@ -331,7 +331,7 @@ impl RunningClientHandler { let is_tls = tls::is_tls_handshake(&first_bytes[..3]); let peer = self.peer; - let ip_tracker = self.ip_tracker.clone(); + let _ip_tracker = self.ip_tracker.clone(); debug!(peer = %peer, is_tls = is_tls, "Handshake type detected"); @@ -344,7 +344,7 @@ impl RunningClientHandler { async fn handle_tls_client(mut self, first_bytes: [u8; 5]) -> Result { let peer = self.peer; - let ip_tracker = self.ip_tracker.clone(); + let _ip_tracker = self.ip_tracker.clone(); let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize; @@ -440,7 +440,7 @@ impl RunningClientHandler { async fn handle_direct_client(mut self, first_bytes: [u8; 5]) -> Result { let peer = self.peer; - let ip_tracker = self.ip_tracker.clone(); + let _ip_tracker = self.ip_tracker.clone(); if !self.config.general.modes.classic && !self.config.general.modes.secure { debug!(peer = %peer, "Non-TLS modes disabled"); diff --git a/src/proxy/handshake.rs b/src/proxy/handshake.rs index 750d839..5c63636 100644 --- a/src/proxy/handshake.rs +++ b/src/proxy/handshake.rs @@ -1,5 +1,7 @@ //! MTProto Handshake +#![allow(dead_code)] + use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index d55e5a2..a6a11e1 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -184,6 +184,7 @@ where let user = success.user.clone(); let peer = success.peer; let proto_tag = success.proto_tag; + let pool_generation = me_pool.current_generation(); info!( user = %user, @@ -191,6 +192,7 @@ where dc = success.dc_idx, proto = ?proto_tag, mode = "middle_proxy", + pool_generation, "Routing via Middle-End" ); @@ -220,6 +222,7 @@ where peer_hash = format_args!("0x{:016x}", forensics.peer_hash), desync_all_full = forensics.desync_all_full, proto_flags = format_args!("0x{:08x}", proto_flags), + pool_generation, "ME relay started" ); diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index d6243aa..bedae1a 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -8,6 +8,9 @@ pub mod middle_relay; pub mod relay; pub use client::ClientHandler; +#[allow(unused_imports)] pub use handshake::*; +#[allow(unused_imports)] pub use masking::*; +#[allow(unused_imports)] pub use relay::*; diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 4c16d25..31e9d4f 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -1,7 +1,8 @@ //! Statistics and replay protection +#![allow(dead_code)] + use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; use std::time::{Instant, Duration}; use dashmap::DashMap; use parking_lot::Mutex; @@ -38,6 +39,10 @@ pub struct Stats { desync_frames_bucket_1_2: AtomicU64, desync_frames_bucket_3_10: AtomicU64, desync_frames_bucket_gt_10: AtomicU64, + pool_swap_total: AtomicU64, + pool_drain_active: AtomicU64, + pool_force_close_total: AtomicU64, + pool_stale_pick_total: AtomicU64, user_stats: DashMap, start_time: parking_lot::RwLock>, } @@ -108,6 +113,35 @@ impl Stats { } } } + pub fn increment_pool_swap_total(&self) { + self.pool_swap_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_pool_drain_active(&self) { + self.pool_drain_active.fetch_add(1, Ordering::Relaxed); + } + pub fn decrement_pool_drain_active(&self) { + let mut current = self.pool_drain_active.load(Ordering::Relaxed); + loop { + if current == 0 { + break; + } + match self.pool_drain_active.compare_exchange_weak( + current, + current - 1, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(actual) => current = actual, + } + } + } + pub fn increment_pool_force_close_total(&self) { + self.pool_force_close_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_pool_stale_pick_total(&self) { + self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed); + } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } @@ -149,6 +183,18 @@ impl Stats { pub fn get_desync_frames_bucket_gt_10(&self) -> u64 { self.desync_frames_bucket_gt_10.load(Ordering::Relaxed) } + pub fn get_pool_swap_total(&self) -> u64 { + self.pool_swap_total.load(Ordering::Relaxed) + } + pub fn get_pool_drain_active(&self) -> u64 { + self.pool_drain_active.load(Ordering::Relaxed) + } + pub fn get_pool_force_close_total(&self) -> u64 { + self.pool_force_close_total.load(Ordering::Relaxed) + } + pub fn get_pool_stale_pick_total(&self) -> u64 { + self.pool_stale_pick_total.load(Ordering::Relaxed) + } pub fn increment_user_connects(&self, user: &str) { self.user_stats.entry(user.to_string()).or_default() @@ -451,6 +497,7 @@ impl ReplayStats { #[cfg(test)] mod tests { use super::*; + use std::sync::Arc; #[test] fn test_stats_shared_counters() { diff --git a/src/stream/buffer_pool.rs b/src/stream/buffer_pool.rs index 0de5532..9c46922 100644 --- a/src/stream/buffer_pool.rs +++ b/src/stream/buffer_pool.rs @@ -3,6 +3,8 @@ //! This module provides a thread-safe pool of BytesMut buffers //! that can be reused across connections to reduce allocation pressure. +#![allow(dead_code)] + use bytes::BytesMut; use crossbeam_queue::ArrayQueue; use std::ops::{Deref, DerefMut}; diff --git a/src/stream/crypto_stream.rs b/src/stream/crypto_stream.rs index ebb6f43..67d8c95 100644 --- a/src/stream/crypto_stream.rs +++ b/src/stream/crypto_stream.rs @@ -18,6 +18,8 @@ //! is either written to upstream or stored in our pending buffer //! - when upstream is pending -> ciphertext is buffered/bounded and backpressure is applied //! + +#![allow(dead_code)] //! ======================= //! Writer state machine //! ======================= @@ -55,7 +57,7 @@ use std::io::{self, ErrorKind, Result}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tracing::{debug, trace, warn}; +use tracing::{debug, trace}; use crate::crypto::AesCtr; use super::state::{StreamState, YieldBuffer}; diff --git a/src/stream/frame.rs b/src/stream/frame.rs index b97d4cf..5c93ea7 100644 --- a/src/stream/frame.rs +++ b/src/stream/frame.rs @@ -3,6 +3,8 @@ //! This module defines the common types and traits used by all //! frame encoding/decoding implementations. +#![allow(dead_code)] + use bytes::{Bytes, BytesMut}; use std::io::Result; use std::sync::Arc; diff --git a/src/stream/frame_codec.rs b/src/stream/frame_codec.rs index 6b90892..3de8257 100644 --- a/src/stream/frame_codec.rs +++ b/src/stream/frame_codec.rs @@ -3,6 +3,8 @@ //! This module provides Encoder/Decoder implementations compatible //! with tokio-util's Framed wrapper for easy async frame I/O. +#![allow(dead_code)] + use bytes::{Bytes, BytesMut, BufMut}; use std::io::{self, Error, ErrorKind}; use std::sync::Arc; diff --git a/src/stream/frame_stream.rs b/src/stream/frame_stream.rs index 1726a06..b66c2cd 100644 --- a/src/stream/frame_stream.rs +++ b/src/stream/frame_stream.rs @@ -1,6 +1,8 @@ //! MTProto frame stream wrappers -use bytes::{Bytes, BytesMut}; +#![allow(dead_code)] + +use bytes::Bytes; use std::io::{Error, ErrorKind, Result}; use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt}; use crate::protocol::constants::*; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index ea30e5e..a1cff9a 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -12,28 +12,34 @@ pub mod frame_codec; pub mod frame_stream; // Re-export state machine types +#[allow(unused_imports)] pub use state::{ StreamState, Transition, PollResult, ReadBuffer, WriteBuffer, HeaderBuffer, YieldBuffer, }; // Re-export buffer pool +#[allow(unused_imports)] pub use buffer_pool::{BufferPool, PooledBuffer, PoolStats}; // Re-export stream implementations +#[allow(unused_imports)] pub use crypto_stream::{CryptoReader, CryptoWriter, PassthroughStream}; pub use tls_stream::{FakeTlsReader, FakeTlsWriter}; // Re-export frame types +#[allow(unused_imports)] pub use frame::{Frame, FrameMeta, FrameCodec as FrameCodecTrait, create_codec}; -// Re-export tokio-util compatible codecs +// Re-export tokio-util compatible codecs +#[allow(unused_imports)] pub use frame_codec::{ FrameCodec, AbridgedCodec, IntermediateCodec, SecureCodec, }; // Legacy re-exports for compatibility +#[allow(unused_imports)] pub use frame_stream::{ AbridgedFrameReader, AbridgedFrameWriter, IntermediateFrameReader, IntermediateFrameWriter, diff --git a/src/stream/state.rs b/src/stream/state.rs index c4f52e6..7c57792 100644 --- a/src/stream/state.rs +++ b/src/stream/state.rs @@ -3,6 +3,8 @@ //! This module provides core types and traits for implementing //! stateful async streams with proper partial read/write handling. +#![allow(dead_code)] + use bytes::{Bytes, BytesMut}; use std::io; diff --git a/src/stream/tls_stream.rs b/src/stream/tls_stream.rs index edf970d..fa165db 100644 --- a/src/stream/tls_stream.rs +++ b/src/stream/tls_stream.rs @@ -18,6 +18,8 @@ //! - Explicit state machines for all async operations //! - Never lose data on partial reads //! - Atomic TLS record formation for writes + +#![allow(dead_code)] //! - Proper handling of all TLS record types //! //! Important nuance (Telegram FakeTLS): diff --git a/src/stream/traits.rs b/src/stream/traits.rs index 6419824..f6d7c4f 100644 --- a/src/stream/traits.rs +++ b/src/stream/traits.rs @@ -1,5 +1,7 @@ //! Stream traits and common types +#![allow(dead_code)] + use bytes::Bytes; use std::io::Result; use std::pin::Pin; diff --git a/src/tls_front/cache.rs b/src/tls_front/cache.rs index 15a97af..a425a35 100644 --- a/src/tls_front/cache.rs +++ b/src/tls_front/cache.rs @@ -19,6 +19,7 @@ pub struct TlsFrontCache { disk_path: PathBuf, } +#[allow(dead_code)] impl TlsFrontCache { pub fn new(domains: &[String], default_len: usize, disk_path: impl AsRef) -> Self { let default_template = ParsedServerHello { @@ -173,7 +174,7 @@ impl TlsFrontCache { tokio::spawn(async move { loop { for domain in &domains { - fetcher(domain.clone()).await; + let _ = fetcher(domain.clone()).await; } sleep(interval).await; } diff --git a/src/tls_front/mod.rs b/src/tls_front/mod.rs index 89f3988..311920d 100644 --- a/src/tls_front/mod.rs +++ b/src/tls_front/mod.rs @@ -4,4 +4,5 @@ pub mod fetcher; pub mod emulator; pub use cache::TlsFrontCache; +#[allow(unused_imports)] pub use types::{CachedTlsData, TlsFetchResult}; diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 479a880..96d5f91 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -131,6 +131,13 @@ pub async fn fetch_proxy_config(url: &str) -> Result { } async fn run_update_cycle(pool: &Arc, rng: &Arc, cfg: &ProxyConfig) { + pool.update_runtime_reinit_policy( + cfg.general.hardswap, + cfg.general.me_pool_drain_ttl_secs, + cfg.general.effective_me_pool_force_close_secs(), + cfg.general.me_pool_min_fresh_ratio, + ); + let mut maps_changed = false; // Update proxy config v4 @@ -162,12 +169,7 @@ async fn run_update_cycle(pool: &Arc, rng: &Arc, cfg: &Pro } if maps_changed { - let drain_timeout = if cfg.general.me_reinit_drain_timeout_secs == 0 { - None - } else { - Some(Duration::from_secs(cfg.general.me_reinit_drain_timeout_secs)) - }; - pool.zero_downtime_reinit_after_map_change(rng.as_ref(), drain_timeout) + pool.zero_downtime_reinit_after_map_change(rng.as_ref()) .await; } @@ -224,6 +226,12 @@ pub async fn me_config_updater( break; } let cfg = config_rx.borrow().clone(); + pool.update_runtime_reinit_policy( + cfg.general.hardswap, + cfg.general.me_pool_drain_ttl_secs, + cfg.general.effective_me_pool_force_close_secs(), + cfg.general.me_pool_min_fresh_ratio, + ); let new_secs = cfg.general.effective_update_every_secs().max(1); if new_secs == update_every_secs { continue; diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index d4d4a70..e73e5f1 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -14,6 +14,7 @@ use super::MePool; const HEALTH_INTERVAL_SECS: u64 = 1; const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff +#[allow(dead_code)] const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1; pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { @@ -68,6 +69,7 @@ async fn check_family( .read() .await .iter() + .filter(|w| !w.draining.load(std::sync::atomic::Ordering::Relaxed)) .map(|w| w.addr) .collect(); diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index 1027221..f9f8c85 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -17,8 +17,10 @@ mod wire; use bytes::Bytes; pub use health::me_health_monitor; +#[allow(unused_imports)] pub use ping::{run_me_ping, format_sample_line, MePingReport, MePingSample, MePingFamily}; pub use pool::MePool; +#[allow(unused_imports)] pub use pool_nat::{stun_probe, detect_public_ip}; pub use registry::ConnRegistry; pub use secret::fetch_proxy_secret; diff --git a/src/transport/middle_proxy/ping.rs b/src/transport/middle_proxy/ping.rs index 36ef4e7..a1dd1e6 100644 --- a/src/transport/middle_proxy/ping.rs +++ b/src/transport/middle_proxy/ping.rs @@ -24,6 +24,7 @@ pub struct MePingSample { } #[derive(Debug, Clone)] +#[allow(dead_code)] pub struct MePingReport { pub dc: i32, pub family: MePingFamily, diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index bd7c9cc..2047e80 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -1,14 +1,14 @@ use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, AtomicU64, AtomicUsize, Ordering}; use bytes::BytesMut; use rand::Rng; use rand::seq::SliceRandom; use tokio::sync::{Mutex, RwLock, mpsc, Notify}; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use crate::crypto::SecureRandom; use crate::error::{ProxyError, Result}; @@ -27,12 +27,16 @@ const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; pub struct MeWriter { pub id: u64, pub addr: SocketAddr, + pub generation: u64, pub tx: mpsc::Sender, pub cancel: CancellationToken, pub degraded: Arc, pub draining: Arc, + pub draining_started_at_epoch_secs: Arc, + pub allow_drain_fallback: Arc, } +#[allow(dead_code)] pub struct MePool { pub(super) registry: Arc, pub(super) writers: Arc>>, @@ -73,6 +77,11 @@ pub struct MePool { pub(super) writer_available: Arc, pub(super) conn_count: AtomicUsize, pub(super) stats: Arc, + pub(super) generation: AtomicU64, + pub(super) hardswap: AtomicBool, + pub(super) me_pool_drain_ttl_secs: AtomicU64, + pub(super) me_pool_force_close_secs: AtomicU64, + pub(super) me_pool_min_fresh_ratio_permille: AtomicU32, pool_size: usize, } @@ -83,6 +92,22 @@ pub struct NatReflectionCache { } impl MePool { + fn ratio_to_permille(ratio: f32) -> u32 { + let clamped = ratio.clamp(0.0, 1.0); + (clamped * 1000.0).round() as u32 + } + + fn permille_to_ratio(permille: u32) -> f32 { + (permille.min(1000) as f32) / 1000.0 + } + + fn now_epoch_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + } + pub fn new( proxy_tag: Option>, proxy_secret: Vec, @@ -110,6 +135,10 @@ impl MePool { me_reconnect_backoff_base_ms: u64, me_reconnect_backoff_cap_ms: u64, me_reconnect_fast_retry_count: u32, + hardswap: bool, + me_pool_drain_ttl_secs: u64, + me_pool_force_close_secs: u64, + me_pool_min_fresh_ratio: f32, ) -> Arc { Arc::new(Self { registry: Arc::new(ConnRegistry::new()), @@ -152,6 +181,11 @@ impl MePool { nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())), writer_available: Arc::new(Notify::new()), conn_count: AtomicUsize::new(0), + generation: AtomicU64::new(1), + hardswap: AtomicBool::new(hardswap), + me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs), + me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs), + me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(me_pool_min_fresh_ratio)), }) } @@ -159,6 +193,25 @@ impl MePool { self.proxy_tag.is_some() } + pub fn current_generation(&self) -> u64 { + self.generation.load(Ordering::Relaxed) + } + + pub fn update_runtime_reinit_policy( + &self, + hardswap: bool, + drain_ttl_secs: u64, + force_close_secs: u64, + min_fresh_ratio: f32, + ) { + self.hardswap.store(hardswap, Ordering::Relaxed); + self.me_pool_drain_ttl_secs.store(drain_ttl_secs, Ordering::Relaxed); + self.me_pool_force_close_secs + .store(force_close_secs, Ordering::Relaxed); + self.me_pool_min_fresh_ratio_permille + .store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed); + } + pub fn reset_stun_state(&self) { self.nat_probe_attempts.store(0, Ordering::Relaxed); self.nat_probe_disabled.store(false, Ordering::Relaxed); @@ -177,6 +230,42 @@ impl MePool { self.writers.clone() } + fn force_close_timeout(&self) -> Option { + let secs = self.me_pool_force_close_secs.load(Ordering::Relaxed); + if secs == 0 { + None + } else { + Some(Duration::from_secs(secs)) + } + } + + fn coverage_ratio( + desired_by_dc: &HashMap>, + active_writer_addrs: &HashSet, + ) -> (f32, Vec) { + if desired_by_dc.is_empty() { + return (1.0, Vec::new()); + } + + let mut missing_dc = Vec::::new(); + let mut covered = 0usize; + for (dc, endpoints) in desired_by_dc { + if endpoints.is_empty() { + continue; + } + if endpoints.iter().any(|addr| active_writer_addrs.contains(addr)) { + covered += 1; + } else { + missing_dc.push(*dc); + } + } + + missing_dc.sort_unstable(); + let total = desired_by_dc.len().max(1); + let ratio = (covered as f32) / (total as f32); + (ratio, missing_dc) + } + pub async fn reconcile_connections(self: &Arc, rng: &SecureRandom) { let writers = self.writers.read().await; let current: HashSet = writers @@ -235,39 +324,104 @@ impl MePool { out } + async fn warmup_generation_for_all_dcs( + self: &Arc, + rng: &SecureRandom, + generation: u64, + desired_by_dc: &HashMap>, + ) { + for endpoints in desired_by_dc.values() { + if endpoints.is_empty() { + continue; + } + + let has_fresh = { + let ws = self.writers.read().await; + ws.iter().any(|w| { + !w.draining.load(Ordering::Relaxed) + && w.generation == generation + && endpoints.contains(&w.addr) + }) + }; + + if has_fresh { + continue; + } + + let mut shuffled: Vec = endpoints.iter().copied().collect(); + shuffled.shuffle(&mut rand::rng()); + for addr in shuffled { + if self.connect_one(addr, rng).await.is_ok() { + break; + } + } + } + } + pub async fn zero_downtime_reinit_after_map_change( self: &Arc, rng: &SecureRandom, - drain_timeout: Option, ) { - // Stage 1: prewarm writers for new endpoint maps before draining old ones. - self.reconcile_connections(rng).await; - let desired_by_dc = self.desired_dc_endpoints().await; if desired_by_dc.is_empty() { warn!("ME endpoint map is empty after update; skipping stale writer drain"); return; } + let previous_generation = self.current_generation(); + let generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1; + let hardswap = self.hardswap.load(Ordering::Relaxed); + + if hardswap { + self.warmup_generation_for_all_dcs(rng, generation, &desired_by_dc) + .await; + } else { + self.reconcile_connections(rng).await; + } + let writers = self.writers.read().await; let active_writer_addrs: HashSet = writers .iter() .filter(|w| !w.draining.load(Ordering::Relaxed)) .map(|w| w.addr) .collect(); - - let mut missing_dc = Vec::::new(); - for (dc, endpoints) in &desired_by_dc { - if endpoints.is_empty() { - continue; - } - if !endpoints.iter().any(|addr| active_writer_addrs.contains(addr)) { - missing_dc.push(*dc); - } + let min_ratio = Self::permille_to_ratio( + self.me_pool_min_fresh_ratio_permille + .load(Ordering::Relaxed), + ); + let (coverage_ratio, missing_dc) = Self::coverage_ratio(&desired_by_dc, &active_writer_addrs); + if !hardswap && coverage_ratio < min_ratio { + warn!( + previous_generation, + generation, + coverage_ratio = format_args!("{coverage_ratio:.3}"), + min_ratio = format_args!("{min_ratio:.3}"), + missing_dc = ?missing_dc, + "ME reinit coverage below threshold; keeping stale writers" + ); + return; } - if !missing_dc.is_empty() { - missing_dc.sort_unstable(); + if hardswap { + let fresh_writer_addrs: HashSet = writers + .iter() + .filter(|w| !w.draining.load(Ordering::Relaxed)) + .filter(|w| w.generation == generation) + .map(|w| w.addr) + .collect(); + let (fresh_ratio, fresh_missing_dc) = + Self::coverage_ratio(&desired_by_dc, &fresh_writer_addrs); + if !fresh_missing_dc.is_empty() { + warn!( + previous_generation, + generation, + fresh_ratio = format_args!("{fresh_ratio:.3}"), + missing_dc = ?fresh_missing_dc, + "ME hardswap pending: fresh generation coverage incomplete" + ); + return; + } + } else if !missing_dc.is_empty() { warn!( missing_dc = ?missing_dc, // Keep stale writers alive when fresh coverage is incomplete. @@ -284,7 +438,13 @@ impl MePool { let stale_writer_ids: Vec = writers .iter() .filter(|w| !w.draining.load(Ordering::Relaxed)) - .filter(|w| !desired_addrs.contains(&w.addr)) + .filter(|w| { + if hardswap { + w.generation < generation + } else { + !desired_addrs.contains(&w.addr) + } + }) .map(|w| w.id) .collect(); drop(writers); @@ -294,14 +454,21 @@ impl MePool { return; } + let drain_timeout = self.force_close_timeout(); let drain_timeout_secs = drain_timeout.map(|d| d.as_secs()).unwrap_or(0); info!( stale_writers = stale_writer_ids.len(), + previous_generation, + generation, + hardswap, + coverage_ratio = format_args!("{coverage_ratio:.3}"), + min_ratio = format_args!("{min_ratio:.3}"), drain_timeout_secs, "ME map update covered; draining stale writers" ); + self.stats.increment_pool_swap_total(); for writer_id in stale_writer_ids { - self.mark_writer_draining_with_timeout(writer_id, drain_timeout) + self.mark_writer_draining_with_timeout(writer_id, drain_timeout, !hardswap) .await; } } @@ -507,9 +674,12 @@ impl MePool { let hs = self.handshake_only(stream, addr, rng).await?; let writer_id = self.next_writer_id.fetch_add(1, Ordering::Relaxed); + let generation = self.current_generation(); let cancel = CancellationToken::new(); let degraded = Arc::new(AtomicBool::new(false)); let draining = Arc::new(AtomicBool::new(false)); + let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0)); + let allow_drain_fallback = Arc::new(AtomicBool::new(false)); let (tx, mut rx) = mpsc::channel::(4096); let mut rpc_writer = RpcWriter { writer: hs.wr, @@ -540,10 +710,13 @@ impl MePool { let writer = MeWriter { id: writer_id, addr, + generation, tx: tx.clone(), cancel: cancel.clone(), degraded: degraded.clone(), draining: draining.clone(), + draining_started_at_epoch_secs: draining_started_at_epoch_secs.clone(), + allow_drain_fallback: allow_drain_fallback.clone(), }; self.writers.write().await.push(writer.clone()); self.conn_count.fetch_add(1, Ordering::Relaxed); @@ -715,6 +888,9 @@ impl MePool { let mut ws = self.writers.write().await; if let Some(pos) = ws.iter().position(|w| w.id == writer_id) { let w = ws.remove(pos); + if w.draining.load(Ordering::Relaxed) { + self.stats.decrement_pool_drain_active(); + } w.cancel.cancel(); close_tx = Some(w.tx.clone()); self.conn_count.fetch_sub(1, Ordering::Relaxed); @@ -731,11 +907,20 @@ impl MePool { self: &Arc, writer_id: u64, timeout: Option, + allow_drain_fallback: bool, ) { let timeout = timeout.filter(|d| !d.is_zero()); let found = { let mut ws = self.writers.write().await; if let Some(w) = ws.iter_mut().find(|w| w.id == writer_id) { + let already_draining = w.draining.swap(true, Ordering::Relaxed); + w.allow_drain_fallback + .store(allow_drain_fallback, Ordering::Relaxed); + w.draining_started_at_epoch_secs + .store(Self::now_epoch_secs(), Ordering::Relaxed); + if !already_draining { + self.stats.increment_pool_drain_active(); + } w.draining.store(true, Ordering::Relaxed); true } else { @@ -748,7 +933,12 @@ impl MePool { } let timeout_secs = timeout.map(|d| d.as_secs()).unwrap_or(0); - debug!(writer_id, timeout_secs, "ME writer marked draining"); + debug!( + writer_id, + timeout_secs, + allow_drain_fallback, + "ME writer marked draining" + ); let pool = Arc::downgrade(self); tokio::spawn(async move { @@ -758,6 +948,7 @@ impl MePool { if let Some(deadline_at) = deadline { if Instant::now() >= deadline_at { warn!(writer_id, "Drain timeout, force-closing"); + p.stats.increment_pool_force_close_total(); let _ = p.remove_writer_and_close_clients(writer_id).await; break; } @@ -775,12 +966,34 @@ impl MePool { } pub(crate) async fn mark_writer_draining(self: &Arc, writer_id: u64) { - self.mark_writer_draining_with_timeout(writer_id, Some(Duration::from_secs(300))) + self.mark_writer_draining_with_timeout(writer_id, Some(Duration::from_secs(300)), false) .await; } + pub(super) fn writer_accepts_new_binding(&self, writer: &MeWriter) -> bool { + if !writer.draining.load(Ordering::Relaxed) { + return true; + } + if !writer.allow_drain_fallback.load(Ordering::Relaxed) { + return false; + } + + let ttl_secs = self.me_pool_drain_ttl_secs.load(Ordering::Relaxed); + if ttl_secs == 0 { + return true; + } + + let started = writer.draining_started_at_epoch_secs.load(Ordering::Relaxed); + if started == 0 { + return false; + } + + Self::now_epoch_secs().saturating_sub(started) <= ttl_secs + } + } +#[allow(dead_code)] fn hex_dump(data: &[u8]) -> String { const MAX: usize = 64; let mut out = String::with_capacity(data.len() * 2 + 3); diff --git a/src/transport/middle_proxy/pool_nat.rs b/src/transport/middle_proxy/pool_nat.rs index d3dec16..4d9e2a1 100644 --- a/src/transport/middle_proxy/pool_nat.rs +++ b/src/transport/middle_proxy/pool_nat.rs @@ -1,7 +1,7 @@ use std::net::{IpAddr, Ipv4Addr}; use std::time::Duration; -use tracing::{info, warn, debug}; +use tracing::{info, warn}; use crate::error::{ProxyError, Result}; use crate::network::probe::is_bogon; @@ -9,11 +9,14 @@ use crate::network::stun::{stun_probe_dual, IpFamily, StunProbeResult}; use super::MePool; use std::time::Instant; + +#[allow(dead_code)] pub async fn stun_probe(stun_addr: Option) -> Result { let stun_addr = stun_addr.unwrap_or_else(|| "stun.l.google.com:19302".to_string()); stun_probe_dual(&stun_addr).await } +#[allow(dead_code)] pub async fn detect_public_ip() -> Option { fetch_public_ipv4_with_retry().await.ok().flatten().map(IpAddr::V4) } diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 6a9250d..2122ed8 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -21,6 +21,7 @@ pub enum RouteResult { } #[derive(Clone)] +#[allow(dead_code)] pub struct ConnMeta { pub target_dc: i16, pub client_addr: SocketAddr, @@ -29,6 +30,7 @@ pub struct ConnMeta { } #[derive(Clone)] +#[allow(dead_code)] pub struct BoundConn { pub conn_id: u64, pub meta: ConnMeta, @@ -167,6 +169,7 @@ impl ConnRegistry { out } + #[allow(dead_code)] pub async fn get_meta(&self, conn_id: u64) -> Option { let inner = self.inner.read().await; inner.meta.get(&conn_id).cloned() diff --git a/src/transport/middle_proxy/secret.rs b/src/transport/middle_proxy/secret.rs index a9e224d..9641143 100644 --- a/src/transport/middle_proxy/secret.rs +++ b/src/transport/middle_proxy/secret.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use tracing::{debug, info, warn}; use std::time::SystemTime; use httpdate; diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 2ebafea..56bd17a 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -134,8 +134,8 @@ impl MePool { candidate_indices.sort_by_key(|idx| { let w = &writers_snapshot[*idx]; let degraded = w.degraded.load(Ordering::Relaxed); - let draining = w.draining.load(Ordering::Relaxed); - (draining as usize, degraded as usize) + let stale = (w.generation < self.current_generation()) as usize; + (stale, degraded as usize) }); let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len(); @@ -143,13 +143,23 @@ impl MePool { for offset in 0..candidate_indices.len() { let idx = candidate_indices[(start + offset) % candidate_indices.len()]; let w = &writers_snapshot[idx]; - if w.draining.load(Ordering::Relaxed) { + if !self.writer_accepts_new_binding(w) { continue; } if w.tx.send(WriterCommand::Data(payload.clone())).await.is_ok() { self.registry .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) .await; + if w.generation < self.current_generation() { + self.stats.increment_pool_stale_pick_total(); + debug!( + conn_id, + writer_id = w.id, + writer_generation = w.generation, + current_generation = self.current_generation(), + "Selected stale ME writer for fallback bind" + ); + } return Ok(()); } else { warn!(writer_id = w.id, "ME writer channel closed"); @@ -159,7 +169,7 @@ impl MePool { } let w = writers_snapshot[candidate_indices[start]].clone(); - if w.draining.load(Ordering::Relaxed) { + if !self.writer_accepts_new_binding(&w) { continue; } match w.tx.send(WriterCommand::Data(payload.clone())).await { @@ -167,6 +177,9 @@ impl MePool { self.registry .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) .await; + if w.generation < self.current_generation() { + self.stats.increment_pool_stale_pick_total(); + } return Ok(()); } Err(_) => { @@ -245,13 +258,13 @@ impl MePool { if preferred.is_empty() { return (0..writers.len()) - .filter(|i| !writers[*i].draining.load(Ordering::Relaxed)) + .filter(|i| self.writer_accepts_new_binding(&writers[*i])) .collect(); } let mut out = Vec::new(); for (idx, w) in writers.iter().enumerate() { - if w.draining.load(Ordering::Relaxed) { + if !self.writer_accepts_new_binding(w) { continue; } if preferred.iter().any(|p| *p == w.addr) { @@ -260,7 +273,7 @@ impl MePool { } if out.is_empty() { return (0..writers.len()) - .filter(|i| !writers[*i].draining.load(Ordering::Relaxed)) + .filter(|i| self.writer_accepts_new_binding(&writers[*i])) .collect(); } out diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 51cffa4..ead0565 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -6,9 +6,13 @@ pub mod socket; pub mod socks; pub mod upstream; +#[allow(unused_imports)] pub use pool::ConnectionPool; +#[allow(unused_imports)] pub use proxy_protocol::{ProxyProtocolInfo, parse_proxy_protocol}; pub use socket::*; +#[allow(unused_imports)] pub use socks::*; +#[allow(unused_imports)] pub use upstream::{DcPingResult, StartupPingResult, UpstreamManager}; pub mod middle_proxy; diff --git a/src/transport/pool.rs b/src/transport/pool.rs index 8d83321..fa9dcad 100644 --- a/src/transport/pool.rs +++ b/src/transport/pool.rs @@ -1,5 +1,7 @@ //! Connection Pool +#![allow(dead_code)] + use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -8,7 +10,7 @@ use tokio::net::TcpStream; use tokio::sync::Mutex; use tokio::time::timeout; use parking_lot::RwLock; -use tracing::{debug, warn}; +use tracing::debug; use crate::error::{ProxyError, Result}; use super::socket::configure_tcp_socket; diff --git a/src/transport/proxy_protocol.rs b/src/transport/proxy_protocol.rs index 56746c4..770be7e 100644 --- a/src/transport/proxy_protocol.rs +++ b/src/transport/proxy_protocol.rs @@ -28,6 +28,7 @@ mod address_family { /// Information extracted from PROXY protocol header #[derive(Debug, Clone)] +#[allow(dead_code)] pub struct ProxyProtocolInfo { /// Source (client) address pub src_addr: SocketAddr, @@ -37,6 +38,7 @@ pub struct ProxyProtocolInfo { pub version: u8, } +#[allow(dead_code)] impl ProxyProtocolInfo { /// Create info with just source address pub fn new(src_addr: SocketAddr) -> Self { @@ -231,12 +233,14 @@ async fn parse_v2( } /// Builder for PROXY protocol v1 header +#[allow(dead_code)] pub struct ProxyProtocolV1Builder { family: &'static str, src_addr: Option, dst_addr: Option, } +#[allow(dead_code)] impl ProxyProtocolV1Builder { pub fn new() -> Self { Self { @@ -284,11 +288,13 @@ impl Default for ProxyProtocolV1Builder { } /// Builder for PROXY protocol v2 header +#[allow(dead_code)] pub struct ProxyProtocolV2Builder { src: Option, dst: Option, } +#[allow(dead_code)] impl ProxyProtocolV2Builder { pub fn new() -> Self { Self { src: None, dst: None } diff --git a/src/transport/socket.rs b/src/transport/socket.rs index b41cfd1..0a20c3c 100644 --- a/src/transport/socket.rs +++ b/src/transport/socket.rs @@ -10,6 +10,7 @@ use socket2::{Socket, TcpKeepalive, Domain, Type, Protocol}; use tracing::debug; /// Configure TCP socket with recommended settings for proxy use +#[allow(dead_code)] pub fn configure_tcp_socket( stream: &TcpStream, keepalive: bool, @@ -82,6 +83,7 @@ pub fn configure_client_socket( } /// Set socket to send RST on close (for masking) +#[allow(dead_code)] pub fn set_linger_zero(stream: &TcpStream) -> Result<()> { let socket = socket2::SockRef::from(stream); socket.set_linger(Some(Duration::ZERO))?; @@ -89,6 +91,7 @@ pub fn set_linger_zero(stream: &TcpStream) -> Result<()> { } /// Create a new TCP socket for outgoing connections +#[allow(dead_code)] pub fn create_outgoing_socket(addr: SocketAddr) -> Result { create_outgoing_socket_bound(addr, None) } @@ -120,6 +123,7 @@ pub fn create_outgoing_socket_bound(addr: SocketAddr, bind_addr: Option) /// Get local address of a socket +#[allow(dead_code)] pub fn get_local_addr(stream: &TcpStream) -> Option { stream.local_addr().ok() } @@ -157,11 +161,13 @@ pub fn resolve_interface_ip(_name: &str, _want_ipv6: bool) -> Option { } /// Get peer address of a socket +#[allow(dead_code)] pub fn get_peer_addr(stream: &TcpStream) -> Option { stream.peer_addr().ok() } /// Check if address is IPv6 +#[allow(dead_code)] pub fn is_ipv6(addr: &SocketAddr) -> bool { addr.is_ipv6() } diff --git a/src/transport/socks.rs b/src/transport/socks.rs index 35268cb..188d369 100644 --- a/src/transport/socks.rs +++ b/src/transport/socks.rs @@ -1,7 +1,7 @@ //! SOCKS4/5 Client Implementation use std::net::{IpAddr, SocketAddr}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use crate::error::{ProxyError, Result}; diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index 6dcc36f..887fa99 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -1,7 +1,9 @@ //! Upstream Management with per-DC latency-weighted selection -//! +//! //! IPv6/IPv4 connectivity checks with configurable preference. +#![allow(deprecated)] + use std::collections::HashMap; use std::net::{SocketAddr, IpAddr}; use std::sync::Arc; @@ -549,7 +551,7 @@ impl UpstreamManager { /// Tests BOTH IPv6 and IPv4, returns separate results for each. pub async fn ping_all_dcs( &self, - prefer_ipv6: bool, + _prefer_ipv6: bool, dc_overrides: &HashMap>, ipv4_enabled: bool, ipv6_enabled: bool, @@ -907,6 +909,7 @@ impl UpstreamManager { } /// Get the preferred IP for a DC (for use by other components) + #[allow(dead_code)] pub async fn get_dc_ip_preference(&self, dc_idx: i16) -> Option { let guard = self.upstreams.read().await; if guard.is_empty() { @@ -918,6 +921,7 @@ impl UpstreamManager { } /// Get preferred DC address based on config preference + #[allow(dead_code)] pub async fn get_dc_addr(&self, dc_idx: i16, prefer_ipv6: bool) -> Option { let arr_idx = UpstreamState::dc_array_idx(dc_idx)?; diff --git a/src/util/ip.rs b/src/util/ip.rs index 9bde513..f3e774f 100644 --- a/src/util/ip.rs +++ b/src/util/ip.rs @@ -1,22 +1,24 @@ //! IP Addr Detect -use std::net::{IpAddr, SocketAddr, UdpSocket}; +use std::net::{IpAddr, UdpSocket}; use std::time::Duration; use tracing::{debug, warn}; /// Detected IP addresses #[derive(Debug, Clone, Default)] +#[allow(dead_code)] pub struct IpInfo { pub ipv4: Option, pub ipv6: Option, } +#[allow(dead_code)] impl IpInfo { /// Check if any IP is detected pub fn has_any(&self) -> bool { self.ipv4.is_some() || self.ipv6.is_some() } - + /// Get preferred IP (IPv6 if available and preferred) pub fn preferred(&self, prefer_ipv6: bool) -> Option { if prefer_ipv6 { @@ -28,12 +30,14 @@ impl IpInfo { } /// URLs for IP detection +#[allow(dead_code)] const IPV4_URLS: &[&str] = &[ "http://v4.ident.me/", "http://ipv4.icanhazip.com/", "http://api.ipify.org/", ]; +#[allow(dead_code)] const IPV6_URLS: &[&str] = &[ "http://v6.ident.me/", "http://ipv6.icanhazip.com/", @@ -42,12 +46,14 @@ const IPV6_URLS: &[&str] = &[ /// Detect local IP address by connecting to a public DNS /// This does not actually send any packets +#[allow(dead_code)] fn get_local_ip(target: &str) -> Option { let socket = UdpSocket::bind("0.0.0.0:0").ok()?; socket.connect(target).ok()?; socket.local_addr().ok().map(|addr| addr.ip()) } +#[allow(dead_code)] fn get_local_ipv6(target: &str) -> Option { let socket = UdpSocket::bind("[::]:0").ok()?; socket.connect(target).ok()?; @@ -55,6 +61,7 @@ fn get_local_ipv6(target: &str) -> Option { } /// Detect public IP addresses +#[allow(dead_code)] pub async fn detect_ip() -> IpInfo { let mut info = IpInfo::default(); @@ -119,6 +126,7 @@ pub async fn detect_ip() -> IpInfo { info } +#[allow(dead_code)] fn is_private_ip(ip: IpAddr) -> bool { match ip { IpAddr::V4(ipv4) => { @@ -131,19 +139,21 @@ fn is_private_ip(ip: IpAddr) -> bool { } /// Fetch IP from URL +#[allow(dead_code)] async fn fetch_ip(url: &str) -> Option { let client = reqwest::Client::builder() .timeout(Duration::from_secs(5)) .build() .ok()?; - + let response = client.get(url).send().await.ok()?; let text = response.text().await.ok()?; - + text.trim().parse().ok() } /// Synchronous IP detection (for startup) +#[allow(dead_code)] pub fn detect_ip_sync() -> IpInfo { tokio::runtime::Handle::current().block_on(detect_ip()) } diff --git a/src/util/mod.rs b/src/util/mod.rs index 5d293d2..8851f51 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -3,5 +3,7 @@ pub mod ip; pub mod time; +#[allow(unused_imports)] pub use ip::*; +#[allow(unused_imports)] pub use time::*; \ No newline at end of file diff --git a/src/util/time.rs b/src/util/time.rs index 7db1633..310b015 100644 --- a/src/util/time.rs +++ b/src/util/time.rs @@ -4,11 +4,14 @@ use std::time::Duration; use chrono::{DateTime, Utc}; use tracing::{debug, warn, error}; +#[allow(dead_code)] const TIME_SYNC_URL: &str = "https://core.telegram.org/getProxySecret"; +#[allow(dead_code)] const MAX_TIME_SKEW_SECS: i64 = 30; /// Time sync result #[derive(Debug, Clone)] +#[allow(dead_code)] pub struct TimeSyncResult { pub server_time: DateTime, pub local_time: DateTime, @@ -17,6 +20,7 @@ pub struct TimeSyncResult { } /// Check time synchronization with Telegram servers +#[allow(dead_code)] pub async fn check_time_sync() -> Option { let client = reqwest::Client::builder() .timeout(Duration::from_secs(10)) @@ -60,6 +64,7 @@ pub async fn check_time_sync() -> Option { } /// Background time sync task +#[allow(dead_code)] pub async fn time_sync_task(check_interval: Duration) -> ! { loop { if let Some(result) = check_time_sync().await {