Merge remote-tracking branch 'upstream/main'

This commit is contained in:
ivulit 2026-02-24 09:01:56 +03:00
commit 8b4bac61dc
No known key found for this signature in database
59 changed files with 691 additions and 151 deletions

View File

@ -3,11 +3,12 @@ name: Release
on: on:
push: push:
tags: tags:
- '[0-9]+.[0-9]+.[0-9]+' # Matches tags like 3.0.0, 3.1.2, etc. - '[0-9]+.[0-9]+.[0-9]+'
workflow_dispatch: # Manual trigger from GitHub Actions UI workflow_dispatch:
permissions: permissions:
contents: read contents: read
packages: write
env: env:
CARGO_TERM_COLOR: always CARGO_TERM_COLOR: always
@ -37,11 +38,9 @@ jobs:
asset_name: telemt-aarch64-linux-musl asset_name: telemt-aarch64-linux-musl
steps: steps:
- name: Checkout repository - uses: actions/checkout@v4
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Install stable Rust toolchain - uses: dtolnay/rust-toolchain@v1
uses: dtolnay/rust-toolchain@888c2e1ea69ab0d4330cbf0af1ecc7b68f368cc1 # v1
with: with:
toolchain: stable toolchain: stable
targets: ${{ matrix.target }} targets: ${{ matrix.target }}
@ -51,8 +50,7 @@ jobs:
sudo apt-get update sudo apt-get update
sudo apt-get install -y gcc-aarch64-linux-gnu sudo apt-get install -y gcc-aarch64-linux-gnu
- name: Cache cargo registry & build artifacts - uses: actions/cache@v4
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
with: with:
path: | path: |
~/.cargo/registry ~/.cargo/registry
@ -76,8 +74,7 @@ jobs:
tar -czvf ${{ matrix.asset_name }}.tar.gz ${{ matrix.artifact_name }} tar -czvf ${{ matrix.asset_name }}.tar.gz ${{ matrix.artifact_name }}
sha256sum ${{ matrix.asset_name }}.tar.gz > ${{ matrix.asset_name }}.sha256 sha256sum ${{ matrix.asset_name }}.tar.gz > ${{ matrix.asset_name }}.sha256
- name: Upload artifact - uses: actions/upload-artifact@v4
uses: actions/upload-artifact@65c4c4a1ddee5b72f698fdd19549f0f0fb45cf08 # v4.6.0
with: with:
name: ${{ matrix.asset_name }} name: ${{ matrix.asset_name }}
path: | path: |
@ -85,30 +82,37 @@ jobs:
target/${{ matrix.target }}/release/${{ matrix.asset_name }}.sha256 target/${{ matrix.target }}/release/${{ matrix.asset_name }}.sha256
build-docker-image: build-docker-image:
needs: build
runs-on: ubuntu-latest runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps: steps:
- name: Checkout - uses: actions/checkout@v4
uses: actions/checkout@v3
- name: Set up QEMU - uses: docker/setup-qemu-action@v3
uses: docker/setup-qemu-action@v3 - uses: docker/setup-buildx-action@v3
- name: Set up Docker Buildx - name: Login to GHCR
uses: docker/setup-buildx-action@v2 uses: docker/login-action@v3
- name: Login to GitHub Container Registry
uses: docker/login-action@v2
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.repository_owner }} username: ${{ github.actor }}
password: ${{ secrets.TOKEN_GH_DEPLOY }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract version
id: vars
run: echo "VERSION=${GITHUB_REF#refs/tags/}" >> $GITHUB_OUTPUT
- name: Build and push - name: Build and push
uses: docker/build-push-action@v6 uses: docker/build-push-action@v6
with: with:
context: . context: .
push: true push: true
tags: ${{ github.ref }} tags: |
ghcr.io/${{ github.repository }}:${{ steps.vars.outputs.VERSION }}
ghcr.io/${{ github.repository }}:latest
release: release:
name: Create Release name: Create Release
@ -118,40 +122,14 @@ jobs:
contents: write contents: write
steps: steps:
- name: Checkout repository - uses: actions/checkout@v4
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with: with:
fetch-depth: 0 fetch-depth: 0
token: ${{ secrets.GITHUB_TOKEN }}
- name: Download all artifacts - uses: actions/download-artifact@v4
uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with: with:
path: artifacts path: artifacts
- name: Update version in Cargo.toml and Cargo.lock
run: |
# Extract version from tag (remove 'v' prefix if present)
VERSION="${GITHUB_REF#refs/tags/}"
VERSION="${VERSION#v}"
# Install cargo-edit for version bumping
cargo install cargo-edit
# Update Cargo.toml version
cargo set-version "$VERSION"
# Configure git
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
# Commit and push changes
#git add Cargo.toml Cargo.lock
#git commit -m "chore: bump version to $VERSION" || echo "No changes to commit"
#git push origin HEAD:main
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Create Release - name: Create Release
uses: softprops/action-gh-release@v2 uses: softprops/action-gh-release@v2
with: with:

4
.gitignore vendored
View File

@ -19,3 +19,7 @@ target
# and can be added to the global gitignore or merged into this file. For a more nuclear # and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ #.idea/
*.rs
target
Cargo.lock
src

2
Cargo.lock generated
View File

@ -2087,7 +2087,7 @@ dependencies = [
[[package]] [[package]]
name = "telemt" name = "telemt"
version = "3.0.10" version = "3.0.13"
dependencies = [ dependencies = [
"aes", "aes",
"anyhow", "anyhow",

View File

@ -31,7 +31,7 @@
- Улучшение обработки ошибок в edge-case транспортных сценариях - Улучшение обработки ошибок в edge-case транспортных сценариях
Релиз: Релиз:
[3.0.9](https://github.com/telemt/telemt/releases/tag/3.0.9) [3.0.12](https://github.com/telemt/telemt/releases/tag/3.0.12)
--- ---
@ -69,7 +69,7 @@ Additionally, we implemented a set of robustness enhancements designed to:
- Improve error handling in edge-case transport scenarios - Improve error handling in edge-case transport scenarios
Release: Release:
[3.0.9](https://github.com/telemt/telemt/releases/tag/3.0.9) [3.0.12](https://github.com/telemt/telemt/releases/tag/3.0.12)
--- ---

View File

@ -38,10 +38,21 @@ me_warmup_stagger_enabled = true
me_warmup_step_delay_ms = 500 # Base delay between extra connects me_warmup_step_delay_ms = 500 # Base delay between extra connects
me_warmup_step_jitter_ms = 300 # Jitter for warmup delay me_warmup_step_jitter_ms = 300 # Jitter for warmup delay
# Reconnect policy knobs. # Reconnect policy knobs.
me_reconnect_max_concurrent_per_dc = 1 # Parallel reconnects per DC - EXPERIMENTAL! UNSTABLE! me_reconnect_max_concurrent_per_dc = 4 # Parallel reconnects per DC - EXPERIMENTAL! UNSTABLE!
me_reconnect_backoff_base_ms = 500 # Backoff start me_reconnect_backoff_base_ms = 500 # Backoff start
me_reconnect_backoff_cap_ms = 30000 # Backoff cap me_reconnect_backoff_cap_ms = 30000 # Backoff cap
me_reconnect_fast_retry_count = 11 # Quick retries before backoff me_reconnect_fast_retry_count = 11 # Quick retries before backoff
update_every = 7200 # Resolve the active updater interval for ME infrastructure refresh tasks.
crypto_pending_buffer = 262144 # Max pending ciphertext buffer per client writer (bytes). Controls FakeTLS backpressure vs throughput.
max_client_frame = 16777216 # Maximum allowed client MTProto frame size (bytes).
desync_all_full = false # Emit full crypto-desync forensic logs for every event. When false, full forensic details are emitted once per key window.
me_reinit_drain_timeout_secs = 300 # Drain timeout in seconds for stale ME writers after endpoint map changes. Set to 0 to keep stale writers draining indefinitely (no force-close).
auto_degradation_enabled = true # Enable auto-degradation from ME to Direct-DC.
degradation_min_unavailable_dc_groups = 2 # Minimum unavailable ME DC groups before degrading.
hardswap = true # Enable C-like hard-swap for ME pool generations. When true, Telemt prewarms a new generation and switches once full coverage is reached.
me_pool_drain_ttl_secs = 90 # Drain-TTL in seconds for stale ME writers after endpoint map changes. During TTL, stale writers may be used only as fallback for new bindings.
me_pool_min_fresh_ratio = 0.8 # Minimum desired-DC coverage ratio required before draining stale writers. Range: 0.0..=1.0.
me_reinit_drain_timeout_secs = 120 # Drain timeout in seconds for stale ME writers after endpoint map changes. Set to 0 to keep stale writers draining indefinitely (no force-close).
[general.modes] [general.modes]
classic = false classic = false

1
proxy-secret Normal file
View File

@ -0,0 +1 @@
ΔωϊΚxζ»H­l~,εΐ<CEB5>D0d]UJέλUA<55>M¦'!ΠFκ«nR«©ZD>Ο³F>y Zfa*ί<>®Ϊι¨ ¦<>o°¦<C2B0>8zψM<CF88>ο:dq>\3wφα£Τ}™υΰΕnμθπ\TΔ<54>°yγο<CEBF>θς°£'V<>IΕςi<>&]²

View File

@ -196,7 +196,10 @@ use_middle_proxy = false
log_level = "normal" log_level = "normal"
desync_all_full = false desync_all_full = false
update_every = 43200 update_every = 43200
me_reinit_drain_timeout_secs = 300 hardswap = false
me_pool_drain_ttl_secs = 90
me_pool_min_fresh_ratio = 0.8
me_reinit_drain_timeout_secs = 120
[network] [network]
ipv4 = true ipv4 = true

View File

@ -1,4 +1,3 @@
use std::net::IpAddr;
use std::collections::HashMap; use std::collections::HashMap;
use ipnetwork::IpNetwork; use ipnetwork::IpNetwork;
use serde::Deserialize; use serde::Deserialize;
@ -83,7 +82,7 @@ pub(crate) fn default_unknown_dc_log_path() -> Option<String> {
} }
pub(crate) fn default_pool_size() -> usize { pub(crate) fn default_pool_size() -> usize {
2 8
} }
pub(crate) fn default_keepalive_interval() -> u64 { pub(crate) fn default_keepalive_interval() -> u64 {
@ -144,10 +143,18 @@ pub(crate) fn default_alpn_enforce() -> bool {
pub(crate) fn default_stun_servers() -> Vec<String> { pub(crate) fn default_stun_servers() -> Vec<String> {
vec![ vec![
"stun.l.google.com:5349".to_string(),
"stun1.l.google.com:3478".to_string(),
"stun.gmx.net:3478".to_string(),
"stun.l.google.com:19302".to_string(), "stun.l.google.com:19302".to_string(),
"stun.1und1.de:3478".to_string(),
"stun1.l.google.com:19302".to_string(), "stun1.l.google.com:19302".to_string(),
"stun2.l.google.com:19302".to_string(), "stun2.l.google.com:19302".to_string(),
"stun3.l.google.com:19302".to_string(),
"stun4.l.google.com:19302".to_string(),
"stun.services.mozilla.com:3478".to_string(),
"stun.stunprotocol.org:3478".to_string(), "stun.stunprotocol.org:3478".to_string(),
"stun.nextcloud.com:3478".to_string(),
"stun.voip.eutelia.it:3478".to_string(), "stun.voip.eutelia.it:3478".to_string(),
] ]
} }
@ -164,19 +171,31 @@ pub(crate) fn default_cache_public_ip_path() -> String {
} }
pub(crate) fn default_proxy_secret_reload_secs() -> u64 { pub(crate) fn default_proxy_secret_reload_secs() -> u64 {
12 * 60 * 60 1 * 60 * 60
} }
pub(crate) fn default_proxy_config_reload_secs() -> u64 { pub(crate) fn default_proxy_config_reload_secs() -> u64 {
12 * 60 * 60 1 * 60 * 60
} }
pub(crate) fn default_update_every_secs() -> u64 { pub(crate) fn default_update_every_secs() -> u64 {
2 * 60 * 60 1 * 30 * 60
} }
pub(crate) fn default_me_reinit_drain_timeout_secs() -> u64 { pub(crate) fn default_me_reinit_drain_timeout_secs() -> u64 {
300 120
}
pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 {
90
}
pub(crate) fn default_me_pool_min_fresh_ratio() -> f32 {
0.8
}
pub(crate) fn default_hardswap() -> bool {
true
} }
pub(crate) fn default_ntp_check() -> bool { pub(crate) fn default_ntp_check() -> bool {

View File

@ -12,6 +12,9 @@
//! | `general` | `me_keepalive_*` | Passed on next connection | //! | `general` | `me_keepalive_*` | Passed on next connection |
//! | `general` | `desync_all_full` | Applied immediately | //! | `general` | `desync_all_full` | Applied immediately |
//! | `general` | `update_every` | Applied to ME updater immediately | //! | `general` | `update_every` | Applied to ME updater immediately |
//! | `general` | `hardswap` | Applied on next ME map update |
//! | `general` | `me_pool_drain_ttl_secs` | Applied on next ME map update |
//! | `general` | `me_pool_min_fresh_ratio` | Applied on next ME map update |
//! | `general` | `me_reinit_drain_timeout_secs`| Applied on next ME map update | //! | `general` | `me_reinit_drain_timeout_secs`| Applied on next ME map update |
//! | `access` | All user/quota fields | Effective immediately | //! | `access` | All user/quota fields | Effective immediately |
//! //!
@ -39,6 +42,9 @@ pub struct HotFields {
pub middle_proxy_pool_size: usize, pub middle_proxy_pool_size: usize,
pub desync_all_full: bool, pub desync_all_full: bool,
pub update_every_secs: u64, pub update_every_secs: u64,
pub hardswap: bool,
pub me_pool_drain_ttl_secs: u64,
pub me_pool_min_fresh_ratio: f32,
pub me_reinit_drain_timeout_secs: u64, pub me_reinit_drain_timeout_secs: u64,
pub me_keepalive_enabled: bool, pub me_keepalive_enabled: bool,
pub me_keepalive_interval_secs: u64, pub me_keepalive_interval_secs: u64,
@ -55,6 +61,9 @@ impl HotFields {
middle_proxy_pool_size: cfg.general.middle_proxy_pool_size, middle_proxy_pool_size: cfg.general.middle_proxy_pool_size,
desync_all_full: cfg.general.desync_all_full, desync_all_full: cfg.general.desync_all_full,
update_every_secs: cfg.general.effective_update_every_secs(), update_every_secs: cfg.general.effective_update_every_secs(),
hardswap: cfg.general.hardswap,
me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs,
me_pool_min_fresh_ratio: cfg.general.me_pool_min_fresh_ratio,
me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs, me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs,
me_keepalive_enabled: cfg.general.me_keepalive_enabled, me_keepalive_enabled: cfg.general.me_keepalive_enabled,
me_keepalive_interval_secs: cfg.general.me_keepalive_interval_secs, me_keepalive_interval_secs: cfg.general.me_keepalive_interval_secs,
@ -198,6 +207,27 @@ fn log_changes(
); );
} }
if old_hot.hardswap != new_hot.hardswap {
info!(
"config reload: hardswap: {} → {}",
old_hot.hardswap, new_hot.hardswap,
);
}
if old_hot.me_pool_drain_ttl_secs != new_hot.me_pool_drain_ttl_secs {
info!(
"config reload: me_pool_drain_ttl_secs: {}s → {}s",
old_hot.me_pool_drain_ttl_secs, new_hot.me_pool_drain_ttl_secs,
);
}
if (old_hot.me_pool_min_fresh_ratio - new_hot.me_pool_min_fresh_ratio).abs() > f32::EPSILON {
info!(
"config reload: me_pool_min_fresh_ratio: {:.3} → {:.3}",
old_hot.me_pool_min_fresh_ratio, new_hot.me_pool_min_fresh_ratio,
);
}
if old_hot.me_reinit_drain_timeout_secs != new_hot.me_reinit_drain_timeout_secs { if old_hot.me_reinit_drain_timeout_secs != new_hot.me_reinit_drain_timeout_secs {
info!( info!(
"config reload: me_reinit_drain_timeout_secs: {}s → {}s", "config reload: me_reinit_drain_timeout_secs: {}s → {}s",

View File

@ -1,3 +1,5 @@
#![allow(deprecated)]
use std::collections::HashMap; use std::collections::HashMap;
use std::net::IpAddr; use std::net::IpAddr;
use std::path::Path; use std::path::Path;
@ -145,6 +147,24 @@ impl ProxyConfig {
} }
} }
if !(0.0..=1.0).contains(&config.general.me_pool_min_fresh_ratio) {
return Err(ProxyError::Config(
"general.me_pool_min_fresh_ratio must be within [0.0, 1.0]".to_string(),
));
}
if config.general.effective_me_pool_force_close_secs() > 0
&& config.general.effective_me_pool_force_close_secs()
< config.general.me_pool_drain_ttl_secs
{
warn!(
me_pool_drain_ttl_secs = config.general.me_pool_drain_ttl_secs,
me_reinit_drain_timeout_secs = config.general.effective_me_pool_force_close_secs(),
"force-close timeout is lower than drain TTL; bumping force-close timeout to TTL"
);
config.general.me_reinit_drain_timeout_secs = config.general.me_pool_drain_ttl_secs;
}
// Validate secrets. // Validate secrets.
for (user, secret) in &config.access.users { for (user, secret) in &config.access.users {
if !secret.chars().all(|c| c.is_ascii_hexdigit()) || secret.len() != 32 { if !secret.chars().all(|c| c.is_ascii_hexdigit()) || secret.len() != 32 {
@ -439,4 +459,45 @@ mod tests {
assert!(err.contains("general.update_every must be > 0")); assert!(err.contains("general.update_every must be > 0"));
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }
#[test]
fn me_pool_min_fresh_ratio_out_of_range_is_rejected() {
let toml = r#"
[general]
me_pool_min_fresh_ratio = 1.5
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_pool_min_ratio_invalid_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_pool_min_fresh_ratio must be within [0.0, 1.0]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn force_close_bumped_when_below_drain_ttl() {
let toml = r#"
[general]
me_pool_drain_ttl_secs = 90
me_reinit_drain_timeout_secs = 30
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_force_close_bump_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert_eq!(cfg.general.me_reinit_drain_timeout_secs, 90);
let _ = std::fs::remove_file(path);
}
} }

View File

@ -206,6 +206,11 @@ pub struct GeneralConfig {
#[serde(default = "default_desync_all_full")] #[serde(default = "default_desync_all_full")]
pub desync_all_full: bool, pub desync_all_full: bool,
/// Enable C-like hard-swap for ME pool generations.
/// When true, Telemt prewarms a new generation and switches once full coverage is reached.
#[serde(default = "default_hardswap")]
pub hardswap: bool,
/// Enable staggered warmup of extra ME writers. /// Enable staggered warmup of extra ME writers.
#[serde(default = "default_true")] #[serde(default = "default_true")]
pub me_warmup_stagger_enabled: bool, pub me_warmup_stagger_enabled: bool,
@ -262,6 +267,16 @@ pub struct GeneralConfig {
#[serde(default)] #[serde(default)]
pub update_every: Option<u64>, pub update_every: Option<u64>,
/// Drain-TTL in seconds for stale ME writers after endpoint map changes.
/// During TTL, stale writers may be used only as fallback for new bindings.
#[serde(default = "default_me_pool_drain_ttl_secs")]
pub me_pool_drain_ttl_secs: u64,
/// Minimum desired-DC coverage ratio required before draining stale writers.
/// Range: 0.0..=1.0.
#[serde(default = "default_me_pool_min_fresh_ratio")]
pub me_pool_min_fresh_ratio: f32,
/// Drain timeout in seconds for stale ME writers after endpoint map changes. /// Drain timeout in seconds for stale ME writers after endpoint map changes.
/// Set to 0 to keep stale writers draining indefinitely (no force-close). /// Set to 0 to keep stale writers draining indefinitely (no force-close).
#[serde(default = "default_me_reinit_drain_timeout_secs")] #[serde(default = "default_me_reinit_drain_timeout_secs")]
@ -308,7 +323,7 @@ impl Default for GeneralConfig {
middle_proxy_nat_stun: None, middle_proxy_nat_stun: None,
middle_proxy_nat_stun_servers: Vec::new(), middle_proxy_nat_stun_servers: Vec::new(),
middle_proxy_pool_size: default_pool_size(), middle_proxy_pool_size: default_pool_size(),
middle_proxy_warm_standby: 8, middle_proxy_warm_standby: 16,
me_keepalive_enabled: true, me_keepalive_enabled: true,
me_keepalive_interval_secs: default_keepalive_interval(), me_keepalive_interval_secs: default_keepalive_interval(),
me_keepalive_jitter_secs: default_keepalive_jitter(), me_keepalive_jitter_secs: default_keepalive_jitter(),
@ -316,7 +331,7 @@ impl Default for GeneralConfig {
me_warmup_stagger_enabled: true, me_warmup_stagger_enabled: true,
me_warmup_step_delay_ms: default_warmup_step_delay_ms(), me_warmup_step_delay_ms: default_warmup_step_delay_ms(),
me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(), me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(),
me_reconnect_max_concurrent_per_dc: 4, me_reconnect_max_concurrent_per_dc: 8,
me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(), me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(),
me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(), me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(),
me_reconnect_fast_retry_count: 8, me_reconnect_fast_retry_count: 8,
@ -328,8 +343,11 @@ impl Default for GeneralConfig {
crypto_pending_buffer: default_crypto_pending_buffer(), crypto_pending_buffer: default_crypto_pending_buffer(),
max_client_frame: default_max_client_frame(), max_client_frame: default_max_client_frame(),
desync_all_full: default_desync_all_full(), desync_all_full: default_desync_all_full(),
hardswap: default_hardswap(),
fast_mode_min_tls_record: default_fast_mode_min_tls_record(), fast_mode_min_tls_record: default_fast_mode_min_tls_record(),
update_every: Some(default_update_every_secs()), update_every: Some(default_update_every_secs()),
me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(),
me_pool_min_fresh_ratio: default_me_pool_min_fresh_ratio(),
me_reinit_drain_timeout_secs: default_me_reinit_drain_timeout_secs(), me_reinit_drain_timeout_secs: default_me_reinit_drain_timeout_secs(),
proxy_secret_auto_reload_secs: default_proxy_secret_reload_secs(), proxy_secret_auto_reload_secs: default_proxy_secret_reload_secs(),
proxy_config_auto_reload_secs: default_proxy_config_reload_secs(), proxy_config_auto_reload_secs: default_proxy_config_reload_secs(),
@ -348,6 +366,12 @@ impl GeneralConfig {
self.update_every self.update_every
.unwrap_or_else(|| self.proxy_secret_auto_reload_secs.min(self.proxy_config_auto_reload_secs)) .unwrap_or_else(|| self.proxy_secret_auto_reload_secs.min(self.proxy_config_auto_reload_secs))
} }
/// Resolve force-close timeout for stale writers.
/// `me_reinit_drain_timeout_secs` remains backward-compatible alias.
pub fn effective_me_pool_force_close_secs(&self) -> u64 {
self.me_reinit_drain_timeout_secs
}
} }
/// `[general.links]` — proxy link generation settings. /// `[general.links]` — proxy link generation settings.

View File

@ -11,6 +11,8 @@
//! `HandshakeSuccess`, `ObfuscationParams`) are responsible for //! `HandshakeSuccess`, `ObfuscationParams`) are responsible for
//! zeroizing their own copies. //! zeroizing their own copies.
#![allow(dead_code)]
use aes::Aes256; use aes::Aes256;
use ctr::{Ctr128BE, cipher::{KeyIvInit, StreamCipher}}; use ctr::{Ctr128BE, cipher::{KeyIvInit, StreamCipher}};
use zeroize::Zeroize; use zeroize::Zeroize;

View File

@ -6,7 +6,6 @@ pub mod random;
pub use aes::{AesCtr, AesCbc}; pub use aes::{AesCtr, AesCbc};
pub use hash::{ pub use hash::{
build_middleproxy_prekey, crc32, crc32c, derive_middleproxy_keys, md5, sha1, sha256, build_middleproxy_prekey, crc32, crc32c, derive_middleproxy_keys, sha256, sha256_hmac,
sha256_hmac,
}; };
pub use random::SecureRandom; pub use random::SecureRandom;

View File

@ -1,5 +1,8 @@
//! Pseudorandom //! Pseudorandom
#![allow(deprecated)]
#![allow(dead_code)]
use rand::{Rng, RngCore, SeedableRng}; use rand::{Rng, RngCore, SeedableRng};
use rand::rngs::StdRng; use rand::rngs::StdRng;
use parking_lot::Mutex; use parking_lot::Mutex;

View File

@ -1,5 +1,7 @@
//! Error Types //! Error Types
#![allow(dead_code)]
use std::fmt; use std::fmt;
use std::net::SocketAddr; use std::net::SocketAddr;
use thiserror::Error; use thiserror::Error;

View File

@ -1,5 +1,7 @@
// src/ip_tracker.rs // src/ip_tracker.rs
// Модуль для отслеживания и ограничения уникальных IP-адресов пользователей // IP address tracking and limiting for users
#![allow(dead_code)]
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::net::IpAddr; use std::net::IpAddr;

View File

@ -1,5 +1,7 @@
//! telemt — Telegram MTProto Proxy //! telemt — Telegram MTProto Proxy
#![allow(unused_assignments)]
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -73,36 +75,27 @@ fn parse_cli() -> (String, bool, Option<String>) {
log_level = Some(s.trim_start_matches("--log-level=").to_string()); log_level = Some(s.trim_start_matches("--log-level=").to_string());
} }
"--help" | "-h" => { "--help" | "-h" => {
eprintln!("telemt - Telegram MTProto Proxy v{}", env!("CARGO_PKG_VERSION")); eprintln!("Usage: telemt [config.toml] [OPTIONS]");
eprintln!(); eprintln!();
eprintln!("USAGE:"); eprintln!("Options:");
eprintln!(" telemt [CONFIG] [OPTIONS]"); eprintln!(" --silent, -s Suppress info logs");
eprintln!(" telemt --init [INIT_OPTIONS]"); eprintln!(" --log-level <LEVEL> debug|verbose|normal|silent");
eprintln!(" --help, -h Show this help");
eprintln!(); eprintln!();
eprintln!("ARGS:"); eprintln!("Setup (fire-and-forget):");
eprintln!(" <CONFIG> Path to config file (default: config.toml)"); eprintln!(
eprintln!(); " --init Generate config, install systemd service, start"
eprintln!("OPTIONS:"); );
eprintln!(" -s, --silent Suppress info logs (equivalent to --log-level silent)");
eprintln!(" --log-level <LEVEL> Set log level [possible values: debug, verbose, normal, silent]");
eprintln!(" -h, --help Show this help message");
eprintln!(" -V, --version Print version number");
eprintln!();
eprintln!("INIT OPTIONS (fire-and-forget setup):");
eprintln!(" --init Generate config, install systemd service, and start");
eprintln!(" --port <PORT> Listen port (default: 443)"); eprintln!(" --port <PORT> Listen port (default: 443)");
eprintln!(" --domain <DOMAIN> TLS domain for masking (default: www.google.com)"); eprintln!(
eprintln!(" --secret <HEX> 32-char hex secret (auto-generated if omitted)"); " --domain <DOMAIN> TLS domain for masking (default: www.google.com)"
eprintln!(" --user <NAME> Username for proxy access (default: user)"); );
eprintln!(
" --secret <HEX> 32-char hex secret (auto-generated if omitted)"
);
eprintln!(" --user <NAME> Username (default: user)");
eprintln!(" --config-dir <DIR> Config directory (default: /etc/telemt)"); eprintln!(" --config-dir <DIR> Config directory (default: /etc/telemt)");
eprintln!(" --no-start Create config and service but don't start"); eprintln!(" --no-start Don't start the service after install");
eprintln!();
eprintln!("EXAMPLES:");
eprintln!(" telemt # Run with default config");
eprintln!(" telemt /etc/telemt/config.toml # Run with specific config");
eprintln!(" telemt --log-level debug # Run with debug logging");
eprintln!(" telemt --init # Quick setup with defaults");
eprintln!(" telemt --init --port 8443 --user admin # Custom setup");
std::process::exit(0); std::process::exit(0);
} }
"--version" | "-V" => { "--version" | "-V" => {
@ -371,6 +364,10 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
config.general.me_reconnect_backoff_base_ms, config.general.me_reconnect_backoff_base_ms,
config.general.me_reconnect_backoff_cap_ms, config.general.me_reconnect_backoff_cap_ms,
config.general.me_reconnect_fast_retry_count, config.general.me_reconnect_fast_retry_count,
config.general.hardswap,
config.general.me_pool_drain_ttl_secs,
config.general.effective_me_pool_force_close_secs(),
config.general.me_pool_min_fresh_ratio,
); );
let pool_size = config.general.middle_proxy_pool_size.max(1); let pool_size = config.general.middle_proxy_pool_size.max(1);
@ -422,6 +419,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
if me_pool.is_some() { if me_pool.is_some() {
info!("Transport: Middle-End Proxy - all DC-over-RPC"); info!("Transport: Middle-End Proxy - all DC-over-RPC");
} else { } else {
let _ = use_middle_proxy;
use_middle_proxy = false; use_middle_proxy = false;
// Make runtime config reflect direct-only mode for handlers. // Make runtime config reflect direct-only mode for handlers.
config.general.use_middle_proxy = false; config.general.use_middle_proxy = false;

View File

@ -2,7 +2,7 @@ use std::convert::Infallible;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use http_body_util::{Full, BodyExt}; use http_body_util::Full;
use hyper::body::Bytes; use hyper::body::Bytes;
use hyper::server::conn::http1; use hyper::server::conn::http1;
use hyper::service::service_fn; use hyper::service::service_fn;
@ -175,6 +175,30 @@ fn render_metrics(stats: &Stats) -> String {
stats.get_desync_frames_bucket_gt_10() stats.get_desync_frames_bucket_gt_10()
); );
let _ = writeln!(out, "# HELP telemt_pool_swap_total Successful ME pool swaps");
let _ = writeln!(out, "# TYPE telemt_pool_swap_total counter");
let _ = writeln!(out, "telemt_pool_swap_total {}", stats.get_pool_swap_total());
let _ = writeln!(out, "# HELP telemt_pool_drain_active Active draining ME writers");
let _ = writeln!(out, "# TYPE telemt_pool_drain_active gauge");
let _ = writeln!(out, "telemt_pool_drain_active {}", stats.get_pool_drain_active());
let _ = writeln!(out, "# HELP telemt_pool_force_close_total Forced close events for draining writers");
let _ = writeln!(out, "# TYPE telemt_pool_force_close_total counter");
let _ = writeln!(
out,
"telemt_pool_force_close_total {}",
stats.get_pool_force_close_total()
);
let _ = writeln!(out, "# HELP telemt_pool_stale_pick_total Stale writer fallback picks for new binds");
let _ = writeln!(out, "# TYPE telemt_pool_stale_pick_total counter");
let _ = writeln!(
out,
"telemt_pool_stale_pick_total {}",
stats.get_pool_stale_pick_total()
);
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections"); let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter"); let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections"); let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");
@ -205,6 +229,7 @@ fn render_metrics(stats: &Stats) -> String {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use http_body_util::BodyExt;
#[test] #[test]
fn test_render_metrics_format() { fn test_render_metrics_format() {

View File

@ -1,3 +1,5 @@
#![allow(dead_code)]
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use tracing::{info, warn}; use tracing::{info, warn};

View File

@ -1,3 +1,6 @@
#![allow(unreachable_code)]
#![allow(dead_code)]
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use tokio::net::{lookup_host, UdpSocket}; use tokio::net::{lookup_host, UdpSocket};

View File

@ -1,6 +1,8 @@
//! Protocol constants and datacenter addresses //! Protocol constants and datacenter addresses
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; #![allow(dead_code)]
use std::net::{IpAddr, Ipv4Addr};
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use std::sync::LazyLock; use std::sync::LazyLock;

View File

@ -1,5 +1,7 @@
//! MTProto frame types and metadata //! MTProto frame types and metadata
#![allow(dead_code)]
use std::collections::HashMap; use std::collections::HashMap;
/// Extra metadata associated with a frame /// Extra metadata associated with a frame

View File

@ -5,7 +5,11 @@ pub mod frame;
pub mod obfuscation; pub mod obfuscation;
pub mod tls; pub mod tls;
#[allow(unused_imports)]
pub use constants::*; pub use constants::*;
#[allow(unused_imports)]
pub use frame::*; pub use frame::*;
#[allow(unused_imports)]
pub use obfuscation::*; pub use obfuscation::*;
#[allow(unused_imports)]
pub use tls::*; pub use tls::*;

View File

@ -1,8 +1,9 @@
//! MTProto Obfuscation //! MTProto Obfuscation
#![allow(dead_code)]
use zeroize::Zeroize; use zeroize::Zeroize;
use crate::crypto::{sha256, AesCtr}; use crate::crypto::{sha256, AesCtr};
use crate::error::Result;
use super::constants::*; use super::constants::*;
/// Obfuscation parameters from handshake /// Obfuscation parameters from handshake

View File

@ -4,8 +4,11 @@
//! for domain fronting. The handshake looks like valid TLS 1.3 but //! for domain fronting. The handshake looks like valid TLS 1.3 but
//! actually carries MTProto authentication data. //! actually carries MTProto authentication data.
#![allow(dead_code)]
use crate::crypto::{sha256_hmac, SecureRandom}; use crate::crypto::{sha256_hmac, SecureRandom};
use crate::error::{ProxyError, Result}; #[cfg(test)]
use crate::error::ProxyError;
use super::constants::*; use super::constants::*;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use num_bigint::BigUint; use num_bigint::BigUint;
@ -613,7 +616,7 @@ pub fn parse_tls_record_header(header: &[u8; 5]) -> Option<(u8, u16)> {
/// ///
/// This is useful for testing that our ServerHello is well-formed. /// This is useful for testing that our ServerHello is well-formed.
#[cfg(test)] #[cfg(test)]
fn validate_server_hello_structure(data: &[u8]) -> Result<()> { fn validate_server_hello_structure(data: &[u8]) -> Result<(), ProxyError> {
if data.len() < 5 { if data.len() < 5 {
return Err(ProxyError::InvalidTlsRecord { return Err(ProxyError::InvalidTlsRecord {
record_type: 0, record_type: 0,

View File

@ -271,7 +271,7 @@ impl RunningClientHandler {
self.peer = normalize_ip(self.peer); self.peer = normalize_ip(self.peer);
let peer = self.peer; let peer = self.peer;
let ip_tracker = self.ip_tracker.clone(); let _ip_tracker = self.ip_tracker.clone();
debug!(peer = %peer, "New connection"); debug!(peer = %peer, "New connection");
if let Err(e) = configure_client_socket( if let Err(e) = configure_client_socket(
@ -331,7 +331,7 @@ impl RunningClientHandler {
let is_tls = tls::is_tls_handshake(&first_bytes[..3]); let is_tls = tls::is_tls_handshake(&first_bytes[..3]);
let peer = self.peer; let peer = self.peer;
let ip_tracker = self.ip_tracker.clone(); let _ip_tracker = self.ip_tracker.clone();
debug!(peer = %peer, is_tls = is_tls, "Handshake type detected"); debug!(peer = %peer, is_tls = is_tls, "Handshake type detected");
@ -344,7 +344,7 @@ impl RunningClientHandler {
async fn handle_tls_client(mut self, first_bytes: [u8; 5]) -> Result<HandshakeOutcome> { async fn handle_tls_client(mut self, first_bytes: [u8; 5]) -> Result<HandshakeOutcome> {
let peer = self.peer; let peer = self.peer;
let ip_tracker = self.ip_tracker.clone(); let _ip_tracker = self.ip_tracker.clone();
let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize; let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize;
@ -440,7 +440,7 @@ impl RunningClientHandler {
async fn handle_direct_client(mut self, first_bytes: [u8; 5]) -> Result<HandshakeOutcome> { async fn handle_direct_client(mut self, first_bytes: [u8; 5]) -> Result<HandshakeOutcome> {
let peer = self.peer; let peer = self.peer;
let ip_tracker = self.ip_tracker.clone(); let _ip_tracker = self.ip_tracker.clone();
if !self.config.general.modes.classic && !self.config.general.modes.secure { if !self.config.general.modes.classic && !self.config.general.modes.secure {
debug!(peer = %peer, "Non-TLS modes disabled"); debug!(peer = %peer, "Non-TLS modes disabled");

View File

@ -1,5 +1,7 @@
//! MTProto Handshake //! MTProto Handshake
#![allow(dead_code)]
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;

View File

@ -184,6 +184,7 @@ where
let user = success.user.clone(); let user = success.user.clone();
let peer = success.peer; let peer = success.peer;
let proto_tag = success.proto_tag; let proto_tag = success.proto_tag;
let pool_generation = me_pool.current_generation();
info!( info!(
user = %user, user = %user,
@ -191,6 +192,7 @@ where
dc = success.dc_idx, dc = success.dc_idx,
proto = ?proto_tag, proto = ?proto_tag,
mode = "middle_proxy", mode = "middle_proxy",
pool_generation,
"Routing via Middle-End" "Routing via Middle-End"
); );
@ -220,6 +222,7 @@ where
peer_hash = format_args!("0x{:016x}", forensics.peer_hash), peer_hash = format_args!("0x{:016x}", forensics.peer_hash),
desync_all_full = forensics.desync_all_full, desync_all_full = forensics.desync_all_full,
proto_flags = format_args!("0x{:08x}", proto_flags), proto_flags = format_args!("0x{:08x}", proto_flags),
pool_generation,
"ME relay started" "ME relay started"
); );

View File

@ -8,6 +8,9 @@ pub mod middle_relay;
pub mod relay; pub mod relay;
pub use client::ClientHandler; pub use client::ClientHandler;
#[allow(unused_imports)]
pub use handshake::*; pub use handshake::*;
#[allow(unused_imports)]
pub use masking::*; pub use masking::*;
#[allow(unused_imports)]
pub use relay::*; pub use relay::*;

View File

@ -1,7 +1,8 @@
//! Statistics and replay protection //! Statistics and replay protection
#![allow(dead_code)]
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
use dashmap::DashMap; use dashmap::DashMap;
use parking_lot::Mutex; use parking_lot::Mutex;
@ -38,6 +39,10 @@ pub struct Stats {
desync_frames_bucket_1_2: AtomicU64, desync_frames_bucket_1_2: AtomicU64,
desync_frames_bucket_3_10: AtomicU64, desync_frames_bucket_3_10: AtomicU64,
desync_frames_bucket_gt_10: AtomicU64, desync_frames_bucket_gt_10: AtomicU64,
pool_swap_total: AtomicU64,
pool_drain_active: AtomicU64,
pool_force_close_total: AtomicU64,
pool_stale_pick_total: AtomicU64,
user_stats: DashMap<String, UserStats>, user_stats: DashMap<String, UserStats>,
start_time: parking_lot::RwLock<Option<Instant>>, start_time: parking_lot::RwLock<Option<Instant>>,
} }
@ -108,6 +113,35 @@ impl Stats {
} }
} }
} }
pub fn increment_pool_swap_total(&self) {
self.pool_swap_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_pool_drain_active(&self) {
self.pool_drain_active.fetch_add(1, Ordering::Relaxed);
}
pub fn decrement_pool_drain_active(&self) {
let mut current = self.pool_drain_active.load(Ordering::Relaxed);
loop {
if current == 0 {
break;
}
match self.pool_drain_active.compare_exchange_weak(
current,
current - 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
pub fn increment_pool_force_close_total(&self) {
self.pool_force_close_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_pool_stale_pick_total(&self) {
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
}
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
@ -149,6 +183,18 @@ impl Stats {
pub fn get_desync_frames_bucket_gt_10(&self) -> u64 { pub fn get_desync_frames_bucket_gt_10(&self) -> u64 {
self.desync_frames_bucket_gt_10.load(Ordering::Relaxed) self.desync_frames_bucket_gt_10.load(Ordering::Relaxed)
} }
pub fn get_pool_swap_total(&self) -> u64 {
self.pool_swap_total.load(Ordering::Relaxed)
}
pub fn get_pool_drain_active(&self) -> u64 {
self.pool_drain_active.load(Ordering::Relaxed)
}
pub fn get_pool_force_close_total(&self) -> u64 {
self.pool_force_close_total.load(Ordering::Relaxed)
}
pub fn get_pool_stale_pick_total(&self) -> u64 {
self.pool_stale_pick_total.load(Ordering::Relaxed)
}
pub fn increment_user_connects(&self, user: &str) { pub fn increment_user_connects(&self, user: &str) {
self.user_stats.entry(user.to_string()).or_default() self.user_stats.entry(user.to_string()).or_default()
@ -451,6 +497,7 @@ impl ReplayStats {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use std::sync::Arc;
#[test] #[test]
fn test_stats_shared_counters() { fn test_stats_shared_counters() {

View File

@ -3,6 +3,8 @@
//! This module provides a thread-safe pool of BytesMut buffers //! This module provides a thread-safe pool of BytesMut buffers
//! that can be reused across connections to reduce allocation pressure. //! that can be reused across connections to reduce allocation pressure.
#![allow(dead_code)]
use bytes::BytesMut; use bytes::BytesMut;
use crossbeam_queue::ArrayQueue; use crossbeam_queue::ArrayQueue;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};

View File

@ -18,6 +18,8 @@
//! is either written to upstream or stored in our pending buffer //! is either written to upstream or stored in our pending buffer
//! - when upstream is pending -> ciphertext is buffered/bounded and backpressure is applied //! - when upstream is pending -> ciphertext is buffered/bounded and backpressure is applied
//! //!
#![allow(dead_code)]
//! ======================= //! =======================
//! Writer state machine //! Writer state machine
//! ======================= //! =======================
@ -55,7 +57,7 @@ use std::io::{self, ErrorKind, Result};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tracing::{debug, trace, warn}; use tracing::{debug, trace};
use crate::crypto::AesCtr; use crate::crypto::AesCtr;
use super::state::{StreamState, YieldBuffer}; use super::state::{StreamState, YieldBuffer};

View File

@ -3,6 +3,8 @@
//! This module defines the common types and traits used by all //! This module defines the common types and traits used by all
//! frame encoding/decoding implementations. //! frame encoding/decoding implementations.
#![allow(dead_code)]
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use std::io::Result; use std::io::Result;
use std::sync::Arc; use std::sync::Arc;

View File

@ -3,6 +3,8 @@
//! This module provides Encoder/Decoder implementations compatible //! This module provides Encoder/Decoder implementations compatible
//! with tokio-util's Framed wrapper for easy async frame I/O. //! with tokio-util's Framed wrapper for easy async frame I/O.
#![allow(dead_code)]
use bytes::{Bytes, BytesMut, BufMut}; use bytes::{Bytes, BytesMut, BufMut};
use std::io::{self, Error, ErrorKind}; use std::io::{self, Error, ErrorKind};
use std::sync::Arc; use std::sync::Arc;

View File

@ -1,6 +1,8 @@
//! MTProto frame stream wrappers //! MTProto frame stream wrappers
use bytes::{Bytes, BytesMut}; #![allow(dead_code)]
use bytes::Bytes;
use std::io::{Error, ErrorKind, Result}; use std::io::{Error, ErrorKind, Result};
use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt};
use crate::protocol::constants::*; use crate::protocol::constants::*;

View File

@ -12,28 +12,34 @@ pub mod frame_codec;
pub mod frame_stream; pub mod frame_stream;
// Re-export state machine types // Re-export state machine types
#[allow(unused_imports)]
pub use state::{ pub use state::{
StreamState, Transition, PollResult, StreamState, Transition, PollResult,
ReadBuffer, WriteBuffer, HeaderBuffer, YieldBuffer, ReadBuffer, WriteBuffer, HeaderBuffer, YieldBuffer,
}; };
// Re-export buffer pool // Re-export buffer pool
#[allow(unused_imports)]
pub use buffer_pool::{BufferPool, PooledBuffer, PoolStats}; pub use buffer_pool::{BufferPool, PooledBuffer, PoolStats};
// Re-export stream implementations // Re-export stream implementations
#[allow(unused_imports)]
pub use crypto_stream::{CryptoReader, CryptoWriter, PassthroughStream}; pub use crypto_stream::{CryptoReader, CryptoWriter, PassthroughStream};
pub use tls_stream::{FakeTlsReader, FakeTlsWriter}; pub use tls_stream::{FakeTlsReader, FakeTlsWriter};
// Re-export frame types // Re-export frame types
#[allow(unused_imports)]
pub use frame::{Frame, FrameMeta, FrameCodec as FrameCodecTrait, create_codec}; pub use frame::{Frame, FrameMeta, FrameCodec as FrameCodecTrait, create_codec};
// Re-export tokio-util compatible codecs // Re-export tokio-util compatible codecs
#[allow(unused_imports)]
pub use frame_codec::{ pub use frame_codec::{
FrameCodec, FrameCodec,
AbridgedCodec, IntermediateCodec, SecureCodec, AbridgedCodec, IntermediateCodec, SecureCodec,
}; };
// Legacy re-exports for compatibility // Legacy re-exports for compatibility
#[allow(unused_imports)]
pub use frame_stream::{ pub use frame_stream::{
AbridgedFrameReader, AbridgedFrameWriter, AbridgedFrameReader, AbridgedFrameWriter,
IntermediateFrameReader, IntermediateFrameWriter, IntermediateFrameReader, IntermediateFrameWriter,

View File

@ -3,6 +3,8 @@
//! This module provides core types and traits for implementing //! This module provides core types and traits for implementing
//! stateful async streams with proper partial read/write handling. //! stateful async streams with proper partial read/write handling.
#![allow(dead_code)]
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use std::io; use std::io;

View File

@ -18,6 +18,8 @@
//! - Explicit state machines for all async operations //! - Explicit state machines for all async operations
//! - Never lose data on partial reads //! - Never lose data on partial reads
//! - Atomic TLS record formation for writes //! - Atomic TLS record formation for writes
#![allow(dead_code)]
//! - Proper handling of all TLS record types //! - Proper handling of all TLS record types
//! //!
//! Important nuance (Telegram FakeTLS): //! Important nuance (Telegram FakeTLS):

View File

@ -1,5 +1,7 @@
//! Stream traits and common types //! Stream traits and common types
#![allow(dead_code)]
use bytes::Bytes; use bytes::Bytes;
use std::io::Result; use std::io::Result;
use std::pin::Pin; use std::pin::Pin;

View File

@ -19,6 +19,7 @@ pub struct TlsFrontCache {
disk_path: PathBuf, disk_path: PathBuf,
} }
#[allow(dead_code)]
impl TlsFrontCache { impl TlsFrontCache {
pub fn new(domains: &[String], default_len: usize, disk_path: impl AsRef<Path>) -> Self { pub fn new(domains: &[String], default_len: usize, disk_path: impl AsRef<Path>) -> Self {
let default_template = ParsedServerHello { let default_template = ParsedServerHello {
@ -173,7 +174,7 @@ impl TlsFrontCache {
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
for domain in &domains { for domain in &domains {
fetcher(domain.clone()).await; let _ = fetcher(domain.clone()).await;
} }
sleep(interval).await; sleep(interval).await;
} }

View File

@ -4,4 +4,5 @@ pub mod fetcher;
pub mod emulator; pub mod emulator;
pub use cache::TlsFrontCache; pub use cache::TlsFrontCache;
#[allow(unused_imports)]
pub use types::{CachedTlsData, TlsFetchResult}; pub use types::{CachedTlsData, TlsFetchResult};

View File

@ -131,6 +131,13 @@ pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
} }
async fn run_update_cycle(pool: &Arc<MePool>, rng: &Arc<SecureRandom>, cfg: &ProxyConfig) { async fn run_update_cycle(pool: &Arc<MePool>, rng: &Arc<SecureRandom>, cfg: &ProxyConfig) {
pool.update_runtime_reinit_policy(
cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs,
cfg.general.effective_me_pool_force_close_secs(),
cfg.general.me_pool_min_fresh_ratio,
);
let mut maps_changed = false; let mut maps_changed = false;
// Update proxy config v4 // Update proxy config v4
@ -162,12 +169,7 @@ async fn run_update_cycle(pool: &Arc<MePool>, rng: &Arc<SecureRandom>, cfg: &Pro
} }
if maps_changed { if maps_changed {
let drain_timeout = if cfg.general.me_reinit_drain_timeout_secs == 0 { pool.zero_downtime_reinit_after_map_change(rng.as_ref())
None
} else {
Some(Duration::from_secs(cfg.general.me_reinit_drain_timeout_secs))
};
pool.zero_downtime_reinit_after_map_change(rng.as_ref(), drain_timeout)
.await; .await;
} }
@ -224,6 +226,12 @@ pub async fn me_config_updater(
break; break;
} }
let cfg = config_rx.borrow().clone(); let cfg = config_rx.borrow().clone();
pool.update_runtime_reinit_policy(
cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs,
cfg.general.effective_me_pool_force_close_secs(),
cfg.general.me_pool_min_fresh_ratio,
);
let new_secs = cfg.general.effective_update_every_secs().max(1); let new_secs = cfg.general.effective_update_every_secs().max(1);
if new_secs == update_every_secs { if new_secs == update_every_secs {
continue; continue;

View File

@ -14,6 +14,7 @@ use super::MePool;
const HEALTH_INTERVAL_SECS: u64 = 1; const HEALTH_INTERVAL_SECS: u64 = 1;
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
#[allow(dead_code)]
const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1; const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1;
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) { pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
@ -68,6 +69,7 @@ async fn check_family(
.read() .read()
.await .await
.iter() .iter()
.filter(|w| !w.draining.load(std::sync::atomic::Ordering::Relaxed))
.map(|w| w.addr) .map(|w| w.addr)
.collect(); .collect();

View File

@ -17,8 +17,10 @@ mod wire;
use bytes::Bytes; use bytes::Bytes;
pub use health::me_health_monitor; pub use health::me_health_monitor;
#[allow(unused_imports)]
pub use ping::{run_me_ping, format_sample_line, MePingReport, MePingSample, MePingFamily}; pub use ping::{run_me_ping, format_sample_line, MePingReport, MePingSample, MePingFamily};
pub use pool::MePool; pub use pool::MePool;
#[allow(unused_imports)]
pub use pool_nat::{stun_probe, detect_public_ip}; pub use pool_nat::{stun_probe, detect_public_ip};
pub use registry::ConnRegistry; pub use registry::ConnRegistry;
pub use secret::fetch_proxy_secret; pub use secret::fetch_proxy_secret;

View File

@ -24,6 +24,7 @@ pub struct MePingSample {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct MePingReport { pub struct MePingReport {
pub dc: i32, pub dc: i32,
pub family: MePingFamily, pub family: MePingFamily,

View File

@ -1,14 +1,14 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, AtomicU64, AtomicUsize, Ordering};
use bytes::BytesMut; use bytes::BytesMut;
use rand::Rng; use rand::Rng;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use tokio::sync::{Mutex, RwLock, mpsc, Notify}; use tokio::sync::{Mutex, RwLock, mpsc, Notify};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
@ -27,12 +27,16 @@ const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
pub struct MeWriter { pub struct MeWriter {
pub id: u64, pub id: u64,
pub addr: SocketAddr, pub addr: SocketAddr,
pub generation: u64,
pub tx: mpsc::Sender<WriterCommand>, pub tx: mpsc::Sender<WriterCommand>,
pub cancel: CancellationToken, pub cancel: CancellationToken,
pub degraded: Arc<AtomicBool>, pub degraded: Arc<AtomicBool>,
pub draining: Arc<AtomicBool>, pub draining: Arc<AtomicBool>,
pub draining_started_at_epoch_secs: Arc<AtomicU64>,
pub allow_drain_fallback: Arc<AtomicBool>,
} }
#[allow(dead_code)]
pub struct MePool { pub struct MePool {
pub(super) registry: Arc<ConnRegistry>, pub(super) registry: Arc<ConnRegistry>,
pub(super) writers: Arc<RwLock<Vec<MeWriter>>>, pub(super) writers: Arc<RwLock<Vec<MeWriter>>>,
@ -73,6 +77,11 @@ pub struct MePool {
pub(super) writer_available: Arc<Notify>, pub(super) writer_available: Arc<Notify>,
pub(super) conn_count: AtomicUsize, pub(super) conn_count: AtomicUsize,
pub(super) stats: Arc<crate::stats::Stats>, pub(super) stats: Arc<crate::stats::Stats>,
pub(super) generation: AtomicU64,
pub(super) hardswap: AtomicBool,
pub(super) me_pool_drain_ttl_secs: AtomicU64,
pub(super) me_pool_force_close_secs: AtomicU64,
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,
pool_size: usize, pool_size: usize,
} }
@ -83,6 +92,22 @@ pub struct NatReflectionCache {
} }
impl MePool { impl MePool {
fn ratio_to_permille(ratio: f32) -> u32 {
let clamped = ratio.clamp(0.0, 1.0);
(clamped * 1000.0).round() as u32
}
fn permille_to_ratio(permille: u32) -> f32 {
(permille.min(1000) as f32) / 1000.0
}
fn now_epoch_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
pub fn new( pub fn new(
proxy_tag: Option<Vec<u8>>, proxy_tag: Option<Vec<u8>>,
proxy_secret: Vec<u8>, proxy_secret: Vec<u8>,
@ -110,6 +135,10 @@ impl MePool {
me_reconnect_backoff_base_ms: u64, me_reconnect_backoff_base_ms: u64,
me_reconnect_backoff_cap_ms: u64, me_reconnect_backoff_cap_ms: u64,
me_reconnect_fast_retry_count: u32, me_reconnect_fast_retry_count: u32,
hardswap: bool,
me_pool_drain_ttl_secs: u64,
me_pool_force_close_secs: u64,
me_pool_min_fresh_ratio: f32,
) -> Arc<Self> { ) -> Arc<Self> {
Arc::new(Self { Arc::new(Self {
registry: Arc::new(ConnRegistry::new()), registry: Arc::new(ConnRegistry::new()),
@ -152,6 +181,11 @@ impl MePool {
nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())), nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())),
writer_available: Arc::new(Notify::new()), writer_available: Arc::new(Notify::new()),
conn_count: AtomicUsize::new(0), conn_count: AtomicUsize::new(0),
generation: AtomicU64::new(1),
hardswap: AtomicBool::new(hardswap),
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs),
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(me_pool_min_fresh_ratio)),
}) })
} }
@ -159,6 +193,25 @@ impl MePool {
self.proxy_tag.is_some() self.proxy_tag.is_some()
} }
pub fn current_generation(&self) -> u64 {
self.generation.load(Ordering::Relaxed)
}
pub fn update_runtime_reinit_policy(
&self,
hardswap: bool,
drain_ttl_secs: u64,
force_close_secs: u64,
min_fresh_ratio: f32,
) {
self.hardswap.store(hardswap, Ordering::Relaxed);
self.me_pool_drain_ttl_secs.store(drain_ttl_secs, Ordering::Relaxed);
self.me_pool_force_close_secs
.store(force_close_secs, Ordering::Relaxed);
self.me_pool_min_fresh_ratio_permille
.store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed);
}
pub fn reset_stun_state(&self) { pub fn reset_stun_state(&self) {
self.nat_probe_attempts.store(0, Ordering::Relaxed); self.nat_probe_attempts.store(0, Ordering::Relaxed);
self.nat_probe_disabled.store(false, Ordering::Relaxed); self.nat_probe_disabled.store(false, Ordering::Relaxed);
@ -177,6 +230,42 @@ impl MePool {
self.writers.clone() self.writers.clone()
} }
fn force_close_timeout(&self) -> Option<Duration> {
let secs = self.me_pool_force_close_secs.load(Ordering::Relaxed);
if secs == 0 {
None
} else {
Some(Duration::from_secs(secs))
}
}
fn coverage_ratio(
desired_by_dc: &HashMap<i32, HashSet<SocketAddr>>,
active_writer_addrs: &HashSet<SocketAddr>,
) -> (f32, Vec<i32>) {
if desired_by_dc.is_empty() {
return (1.0, Vec::new());
}
let mut missing_dc = Vec::<i32>::new();
let mut covered = 0usize;
for (dc, endpoints) in desired_by_dc {
if endpoints.is_empty() {
continue;
}
if endpoints.iter().any(|addr| active_writer_addrs.contains(addr)) {
covered += 1;
} else {
missing_dc.push(*dc);
}
}
missing_dc.sort_unstable();
let total = desired_by_dc.len().max(1);
let ratio = (covered as f32) / (total as f32);
(ratio, missing_dc)
}
pub async fn reconcile_connections(self: &Arc<Self>, rng: &SecureRandom) { pub async fn reconcile_connections(self: &Arc<Self>, rng: &SecureRandom) {
let writers = self.writers.read().await; let writers = self.writers.read().await;
let current: HashSet<SocketAddr> = writers let current: HashSet<SocketAddr> = writers
@ -235,39 +324,104 @@ impl MePool {
out out
} }
async fn warmup_generation_for_all_dcs(
self: &Arc<Self>,
rng: &SecureRandom,
generation: u64,
desired_by_dc: &HashMap<i32, HashSet<SocketAddr>>,
) {
for endpoints in desired_by_dc.values() {
if endpoints.is_empty() {
continue;
}
let has_fresh = {
let ws = self.writers.read().await;
ws.iter().any(|w| {
!w.draining.load(Ordering::Relaxed)
&& w.generation == generation
&& endpoints.contains(&w.addr)
})
};
if has_fresh {
continue;
}
let mut shuffled: Vec<SocketAddr> = endpoints.iter().copied().collect();
shuffled.shuffle(&mut rand::rng());
for addr in shuffled {
if self.connect_one(addr, rng).await.is_ok() {
break;
}
}
}
}
pub async fn zero_downtime_reinit_after_map_change( pub async fn zero_downtime_reinit_after_map_change(
self: &Arc<Self>, self: &Arc<Self>,
rng: &SecureRandom, rng: &SecureRandom,
drain_timeout: Option<Duration>,
) { ) {
// Stage 1: prewarm writers for new endpoint maps before draining old ones.
self.reconcile_connections(rng).await;
let desired_by_dc = self.desired_dc_endpoints().await; let desired_by_dc = self.desired_dc_endpoints().await;
if desired_by_dc.is_empty() { if desired_by_dc.is_empty() {
warn!("ME endpoint map is empty after update; skipping stale writer drain"); warn!("ME endpoint map is empty after update; skipping stale writer drain");
return; return;
} }
let previous_generation = self.current_generation();
let generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1;
let hardswap = self.hardswap.load(Ordering::Relaxed);
if hardswap {
self.warmup_generation_for_all_dcs(rng, generation, &desired_by_dc)
.await;
} else {
self.reconcile_connections(rng).await;
}
let writers = self.writers.read().await; let writers = self.writers.read().await;
let active_writer_addrs: HashSet<SocketAddr> = writers let active_writer_addrs: HashSet<SocketAddr> = writers
.iter() .iter()
.filter(|w| !w.draining.load(Ordering::Relaxed)) .filter(|w| !w.draining.load(Ordering::Relaxed))
.map(|w| w.addr) .map(|w| w.addr)
.collect(); .collect();
let min_ratio = Self::permille_to_ratio(
let mut missing_dc = Vec::<i32>::new(); self.me_pool_min_fresh_ratio_permille
for (dc, endpoints) in &desired_by_dc { .load(Ordering::Relaxed),
if endpoints.is_empty() { );
continue; let (coverage_ratio, missing_dc) = Self::coverage_ratio(&desired_by_dc, &active_writer_addrs);
} if !hardswap && coverage_ratio < min_ratio {
if !endpoints.iter().any(|addr| active_writer_addrs.contains(addr)) { warn!(
missing_dc.push(*dc); previous_generation,
} generation,
coverage_ratio = format_args!("{coverage_ratio:.3}"),
min_ratio = format_args!("{min_ratio:.3}"),
missing_dc = ?missing_dc,
"ME reinit coverage below threshold; keeping stale writers"
);
return;
} }
if !missing_dc.is_empty() { if hardswap {
missing_dc.sort_unstable(); let fresh_writer_addrs: HashSet<SocketAddr> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| w.generation == generation)
.map(|w| w.addr)
.collect();
let (fresh_ratio, fresh_missing_dc) =
Self::coverage_ratio(&desired_by_dc, &fresh_writer_addrs);
if !fresh_missing_dc.is_empty() {
warn!(
previous_generation,
generation,
fresh_ratio = format_args!("{fresh_ratio:.3}"),
missing_dc = ?fresh_missing_dc,
"ME hardswap pending: fresh generation coverage incomplete"
);
return;
}
} else if !missing_dc.is_empty() {
warn!( warn!(
missing_dc = ?missing_dc, missing_dc = ?missing_dc,
// Keep stale writers alive when fresh coverage is incomplete. // Keep stale writers alive when fresh coverage is incomplete.
@ -284,7 +438,13 @@ impl MePool {
let stale_writer_ids: Vec<u64> = writers let stale_writer_ids: Vec<u64> = writers
.iter() .iter()
.filter(|w| !w.draining.load(Ordering::Relaxed)) .filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| !desired_addrs.contains(&w.addr)) .filter(|w| {
if hardswap {
w.generation < generation
} else {
!desired_addrs.contains(&w.addr)
}
})
.map(|w| w.id) .map(|w| w.id)
.collect(); .collect();
drop(writers); drop(writers);
@ -294,14 +454,21 @@ impl MePool {
return; return;
} }
let drain_timeout = self.force_close_timeout();
let drain_timeout_secs = drain_timeout.map(|d| d.as_secs()).unwrap_or(0); let drain_timeout_secs = drain_timeout.map(|d| d.as_secs()).unwrap_or(0);
info!( info!(
stale_writers = stale_writer_ids.len(), stale_writers = stale_writer_ids.len(),
previous_generation,
generation,
hardswap,
coverage_ratio = format_args!("{coverage_ratio:.3}"),
min_ratio = format_args!("{min_ratio:.3}"),
drain_timeout_secs, drain_timeout_secs,
"ME map update covered; draining stale writers" "ME map update covered; draining stale writers"
); );
self.stats.increment_pool_swap_total();
for writer_id in stale_writer_ids { for writer_id in stale_writer_ids {
self.mark_writer_draining_with_timeout(writer_id, drain_timeout) self.mark_writer_draining_with_timeout(writer_id, drain_timeout, !hardswap)
.await; .await;
} }
} }
@ -507,9 +674,12 @@ impl MePool {
let hs = self.handshake_only(stream, addr, rng).await?; let hs = self.handshake_only(stream, addr, rng).await?;
let writer_id = self.next_writer_id.fetch_add(1, Ordering::Relaxed); let writer_id = self.next_writer_id.fetch_add(1, Ordering::Relaxed);
let generation = self.current_generation();
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
let degraded = Arc::new(AtomicBool::new(false)); let degraded = Arc::new(AtomicBool::new(false));
let draining = Arc::new(AtomicBool::new(false)); let draining = Arc::new(AtomicBool::new(false));
let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0));
let allow_drain_fallback = Arc::new(AtomicBool::new(false));
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096); let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
let mut rpc_writer = RpcWriter { let mut rpc_writer = RpcWriter {
writer: hs.wr, writer: hs.wr,
@ -540,10 +710,13 @@ impl MePool {
let writer = MeWriter { let writer = MeWriter {
id: writer_id, id: writer_id,
addr, addr,
generation,
tx: tx.clone(), tx: tx.clone(),
cancel: cancel.clone(), cancel: cancel.clone(),
degraded: degraded.clone(), degraded: degraded.clone(),
draining: draining.clone(), draining: draining.clone(),
draining_started_at_epoch_secs: draining_started_at_epoch_secs.clone(),
allow_drain_fallback: allow_drain_fallback.clone(),
}; };
self.writers.write().await.push(writer.clone()); self.writers.write().await.push(writer.clone());
self.conn_count.fetch_add(1, Ordering::Relaxed); self.conn_count.fetch_add(1, Ordering::Relaxed);
@ -715,6 +888,9 @@ impl MePool {
let mut ws = self.writers.write().await; let mut ws = self.writers.write().await;
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) { if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
let w = ws.remove(pos); let w = ws.remove(pos);
if w.draining.load(Ordering::Relaxed) {
self.stats.decrement_pool_drain_active();
}
w.cancel.cancel(); w.cancel.cancel();
close_tx = Some(w.tx.clone()); close_tx = Some(w.tx.clone());
self.conn_count.fetch_sub(1, Ordering::Relaxed); self.conn_count.fetch_sub(1, Ordering::Relaxed);
@ -731,11 +907,20 @@ impl MePool {
self: &Arc<Self>, self: &Arc<Self>,
writer_id: u64, writer_id: u64,
timeout: Option<Duration>, timeout: Option<Duration>,
allow_drain_fallback: bool,
) { ) {
let timeout = timeout.filter(|d| !d.is_zero()); let timeout = timeout.filter(|d| !d.is_zero());
let found = { let found = {
let mut ws = self.writers.write().await; let mut ws = self.writers.write().await;
if let Some(w) = ws.iter_mut().find(|w| w.id == writer_id) { if let Some(w) = ws.iter_mut().find(|w| w.id == writer_id) {
let already_draining = w.draining.swap(true, Ordering::Relaxed);
w.allow_drain_fallback
.store(allow_drain_fallback, Ordering::Relaxed);
w.draining_started_at_epoch_secs
.store(Self::now_epoch_secs(), Ordering::Relaxed);
if !already_draining {
self.stats.increment_pool_drain_active();
}
w.draining.store(true, Ordering::Relaxed); w.draining.store(true, Ordering::Relaxed);
true true
} else { } else {
@ -748,7 +933,12 @@ impl MePool {
} }
let timeout_secs = timeout.map(|d| d.as_secs()).unwrap_or(0); let timeout_secs = timeout.map(|d| d.as_secs()).unwrap_or(0);
debug!(writer_id, timeout_secs, "ME writer marked draining"); debug!(
writer_id,
timeout_secs,
allow_drain_fallback,
"ME writer marked draining"
);
let pool = Arc::downgrade(self); let pool = Arc::downgrade(self);
tokio::spawn(async move { tokio::spawn(async move {
@ -758,6 +948,7 @@ impl MePool {
if let Some(deadline_at) = deadline { if let Some(deadline_at) = deadline {
if Instant::now() >= deadline_at { if Instant::now() >= deadline_at {
warn!(writer_id, "Drain timeout, force-closing"); warn!(writer_id, "Drain timeout, force-closing");
p.stats.increment_pool_force_close_total();
let _ = p.remove_writer_and_close_clients(writer_id).await; let _ = p.remove_writer_and_close_clients(writer_id).await;
break; break;
} }
@ -775,12 +966,34 @@ impl MePool {
} }
pub(crate) async fn mark_writer_draining(self: &Arc<Self>, writer_id: u64) { pub(crate) async fn mark_writer_draining(self: &Arc<Self>, writer_id: u64) {
self.mark_writer_draining_with_timeout(writer_id, Some(Duration::from_secs(300))) self.mark_writer_draining_with_timeout(writer_id, Some(Duration::from_secs(300)), false)
.await; .await;
} }
pub(super) fn writer_accepts_new_binding(&self, writer: &MeWriter) -> bool {
if !writer.draining.load(Ordering::Relaxed) {
return true;
}
if !writer.allow_drain_fallback.load(Ordering::Relaxed) {
return false;
}
let ttl_secs = self.me_pool_drain_ttl_secs.load(Ordering::Relaxed);
if ttl_secs == 0 {
return true;
}
let started = writer.draining_started_at_epoch_secs.load(Ordering::Relaxed);
if started == 0 {
return false;
}
Self::now_epoch_secs().saturating_sub(started) <= ttl_secs
}
} }
#[allow(dead_code)]
fn hex_dump(data: &[u8]) -> String { fn hex_dump(data: &[u8]) -> String {
const MAX: usize = 64; const MAX: usize = 64;
let mut out = String::with_capacity(data.len() * 2 + 3); let mut out = String::with_capacity(data.len() * 2 + 3);

View File

@ -1,7 +1,7 @@
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};
use std::time::Duration; use std::time::Duration;
use tracing::{info, warn, debug}; use tracing::{info, warn};
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::network::probe::is_bogon; use crate::network::probe::is_bogon;
@ -9,11 +9,14 @@ use crate::network::stun::{stun_probe_dual, IpFamily, StunProbeResult};
use super::MePool; use super::MePool;
use std::time::Instant; use std::time::Instant;
#[allow(dead_code)]
pub async fn stun_probe(stun_addr: Option<String>) -> Result<crate::network::stun::DualStunResult> { pub async fn stun_probe(stun_addr: Option<String>) -> Result<crate::network::stun::DualStunResult> {
let stun_addr = stun_addr.unwrap_or_else(|| "stun.l.google.com:19302".to_string()); let stun_addr = stun_addr.unwrap_or_else(|| "stun.l.google.com:19302".to_string());
stun_probe_dual(&stun_addr).await stun_probe_dual(&stun_addr).await
} }
#[allow(dead_code)]
pub async fn detect_public_ip() -> Option<IpAddr> { pub async fn detect_public_ip() -> Option<IpAddr> {
fetch_public_ipv4_with_retry().await.ok().flatten().map(IpAddr::V4) fetch_public_ipv4_with_retry().await.ok().flatten().map(IpAddr::V4)
} }

View File

@ -21,6 +21,7 @@ pub enum RouteResult {
} }
#[derive(Clone)] #[derive(Clone)]
#[allow(dead_code)]
pub struct ConnMeta { pub struct ConnMeta {
pub target_dc: i16, pub target_dc: i16,
pub client_addr: SocketAddr, pub client_addr: SocketAddr,
@ -29,6 +30,7 @@ pub struct ConnMeta {
} }
#[derive(Clone)] #[derive(Clone)]
#[allow(dead_code)]
pub struct BoundConn { pub struct BoundConn {
pub conn_id: u64, pub conn_id: u64,
pub meta: ConnMeta, pub meta: ConnMeta,
@ -167,6 +169,7 @@ impl ConnRegistry {
out out
} }
#[allow(dead_code)]
pub async fn get_meta(&self, conn_id: u64) -> Option<ConnMeta> { pub async fn get_meta(&self, conn_id: u64) -> Option<ConnMeta> {
let inner = self.inner.read().await; let inner = self.inner.read().await;
inner.meta.get(&conn_id).cloned() inner.meta.get(&conn_id).cloned()

View File

@ -1,5 +1,3 @@
use std::time::Duration;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use std::time::SystemTime; use std::time::SystemTime;
use httpdate; use httpdate;

View File

@ -134,8 +134,8 @@ impl MePool {
candidate_indices.sort_by_key(|idx| { candidate_indices.sort_by_key(|idx| {
let w = &writers_snapshot[*idx]; let w = &writers_snapshot[*idx];
let degraded = w.degraded.load(Ordering::Relaxed); let degraded = w.degraded.load(Ordering::Relaxed);
let draining = w.draining.load(Ordering::Relaxed); let stale = (w.generation < self.current_generation()) as usize;
(draining as usize, degraded as usize) (stale, degraded as usize)
}); });
let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len(); let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len();
@ -143,13 +143,23 @@ impl MePool {
for offset in 0..candidate_indices.len() { for offset in 0..candidate_indices.len() {
let idx = candidate_indices[(start + offset) % candidate_indices.len()]; let idx = candidate_indices[(start + offset) % candidate_indices.len()];
let w = &writers_snapshot[idx]; let w = &writers_snapshot[idx];
if w.draining.load(Ordering::Relaxed) { if !self.writer_accepts_new_binding(w) {
continue; continue;
} }
if w.tx.send(WriterCommand::Data(payload.clone())).await.is_ok() { if w.tx.send(WriterCommand::Data(payload.clone())).await.is_ok() {
self.registry self.registry
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
.await; .await;
if w.generation < self.current_generation() {
self.stats.increment_pool_stale_pick_total();
debug!(
conn_id,
writer_id = w.id,
writer_generation = w.generation,
current_generation = self.current_generation(),
"Selected stale ME writer for fallback bind"
);
}
return Ok(()); return Ok(());
} else { } else {
warn!(writer_id = w.id, "ME writer channel closed"); warn!(writer_id = w.id, "ME writer channel closed");
@ -159,7 +169,7 @@ impl MePool {
} }
let w = writers_snapshot[candidate_indices[start]].clone(); let w = writers_snapshot[candidate_indices[start]].clone();
if w.draining.load(Ordering::Relaxed) { if !self.writer_accepts_new_binding(&w) {
continue; continue;
} }
match w.tx.send(WriterCommand::Data(payload.clone())).await { match w.tx.send(WriterCommand::Data(payload.clone())).await {
@ -167,6 +177,9 @@ impl MePool {
self.registry self.registry
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
.await; .await;
if w.generation < self.current_generation() {
self.stats.increment_pool_stale_pick_total();
}
return Ok(()); return Ok(());
} }
Err(_) => { Err(_) => {
@ -245,13 +258,13 @@ impl MePool {
if preferred.is_empty() { if preferred.is_empty() {
return (0..writers.len()) return (0..writers.len())
.filter(|i| !writers[*i].draining.load(Ordering::Relaxed)) .filter(|i| self.writer_accepts_new_binding(&writers[*i]))
.collect(); .collect();
} }
let mut out = Vec::new(); let mut out = Vec::new();
for (idx, w) in writers.iter().enumerate() { for (idx, w) in writers.iter().enumerate() {
if w.draining.load(Ordering::Relaxed) { if !self.writer_accepts_new_binding(w) {
continue; continue;
} }
if preferred.iter().any(|p| *p == w.addr) { if preferred.iter().any(|p| *p == w.addr) {
@ -260,7 +273,7 @@ impl MePool {
} }
if out.is_empty() { if out.is_empty() {
return (0..writers.len()) return (0..writers.len())
.filter(|i| !writers[*i].draining.load(Ordering::Relaxed)) .filter(|i| self.writer_accepts_new_binding(&writers[*i]))
.collect(); .collect();
} }
out out

View File

@ -6,9 +6,13 @@ pub mod socket;
pub mod socks; pub mod socks;
pub mod upstream; pub mod upstream;
#[allow(unused_imports)]
pub use pool::ConnectionPool; pub use pool::ConnectionPool;
#[allow(unused_imports)]
pub use proxy_protocol::{ProxyProtocolInfo, parse_proxy_protocol}; pub use proxy_protocol::{ProxyProtocolInfo, parse_proxy_protocol};
pub use socket::*; pub use socket::*;
#[allow(unused_imports)]
pub use socks::*; pub use socks::*;
#[allow(unused_imports)]
pub use upstream::{DcPingResult, StartupPingResult, UpstreamManager}; pub use upstream::{DcPingResult, StartupPingResult, UpstreamManager};
pub mod middle_proxy; pub mod middle_proxy;

View File

@ -1,5 +1,7 @@
//! Connection Pool //! Connection Pool
#![allow(dead_code)]
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
@ -8,7 +10,7 @@ use tokio::net::TcpStream;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::time::timeout; use tokio::time::timeout;
use parking_lot::RwLock; use parking_lot::RwLock;
use tracing::{debug, warn}; use tracing::debug;
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use super::socket::configure_tcp_socket; use super::socket::configure_tcp_socket;

View File

@ -28,6 +28,7 @@ mod address_family {
/// Information extracted from PROXY protocol header /// Information extracted from PROXY protocol header
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct ProxyProtocolInfo { pub struct ProxyProtocolInfo {
/// Source (client) address /// Source (client) address
pub src_addr: SocketAddr, pub src_addr: SocketAddr,
@ -37,6 +38,7 @@ pub struct ProxyProtocolInfo {
pub version: u8, pub version: u8,
} }
#[allow(dead_code)]
impl ProxyProtocolInfo { impl ProxyProtocolInfo {
/// Create info with just source address /// Create info with just source address
pub fn new(src_addr: SocketAddr) -> Self { pub fn new(src_addr: SocketAddr) -> Self {
@ -231,12 +233,14 @@ async fn parse_v2<R: AsyncRead + Unpin>(
} }
/// Builder for PROXY protocol v1 header /// Builder for PROXY protocol v1 header
#[allow(dead_code)]
pub struct ProxyProtocolV1Builder { pub struct ProxyProtocolV1Builder {
family: &'static str, family: &'static str,
src_addr: Option<SocketAddr>, src_addr: Option<SocketAddr>,
dst_addr: Option<SocketAddr>, dst_addr: Option<SocketAddr>,
} }
#[allow(dead_code)]
impl ProxyProtocolV1Builder { impl ProxyProtocolV1Builder {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
@ -284,11 +288,13 @@ impl Default for ProxyProtocolV1Builder {
} }
/// Builder for PROXY protocol v2 header /// Builder for PROXY protocol v2 header
#[allow(dead_code)]
pub struct ProxyProtocolV2Builder { pub struct ProxyProtocolV2Builder {
src: Option<SocketAddr>, src: Option<SocketAddr>,
dst: Option<SocketAddr>, dst: Option<SocketAddr>,
} }
#[allow(dead_code)]
impl ProxyProtocolV2Builder { impl ProxyProtocolV2Builder {
pub fn new() -> Self { pub fn new() -> Self {
Self { src: None, dst: None } Self { src: None, dst: None }

View File

@ -10,6 +10,7 @@ use socket2::{Socket, TcpKeepalive, Domain, Type, Protocol};
use tracing::debug; use tracing::debug;
/// Configure TCP socket with recommended settings for proxy use /// Configure TCP socket with recommended settings for proxy use
#[allow(dead_code)]
pub fn configure_tcp_socket( pub fn configure_tcp_socket(
stream: &TcpStream, stream: &TcpStream,
keepalive: bool, keepalive: bool,
@ -82,6 +83,7 @@ pub fn configure_client_socket(
} }
/// Set socket to send RST on close (for masking) /// Set socket to send RST on close (for masking)
#[allow(dead_code)]
pub fn set_linger_zero(stream: &TcpStream) -> Result<()> { pub fn set_linger_zero(stream: &TcpStream) -> Result<()> {
let socket = socket2::SockRef::from(stream); let socket = socket2::SockRef::from(stream);
socket.set_linger(Some(Duration::ZERO))?; socket.set_linger(Some(Duration::ZERO))?;
@ -89,6 +91,7 @@ pub fn set_linger_zero(stream: &TcpStream) -> Result<()> {
} }
/// Create a new TCP socket for outgoing connections /// Create a new TCP socket for outgoing connections
#[allow(dead_code)]
pub fn create_outgoing_socket(addr: SocketAddr) -> Result<Socket> { pub fn create_outgoing_socket(addr: SocketAddr) -> Result<Socket> {
create_outgoing_socket_bound(addr, None) create_outgoing_socket_bound(addr, None)
} }
@ -120,6 +123,7 @@ pub fn create_outgoing_socket_bound(addr: SocketAddr, bind_addr: Option<IpAddr>)
/// Get local address of a socket /// Get local address of a socket
#[allow(dead_code)]
pub fn get_local_addr(stream: &TcpStream) -> Option<SocketAddr> { pub fn get_local_addr(stream: &TcpStream) -> Option<SocketAddr> {
stream.local_addr().ok() stream.local_addr().ok()
} }
@ -157,11 +161,13 @@ pub fn resolve_interface_ip(_name: &str, _want_ipv6: bool) -> Option<IpAddr> {
} }
/// Get peer address of a socket /// Get peer address of a socket
#[allow(dead_code)]
pub fn get_peer_addr(stream: &TcpStream) -> Option<SocketAddr> { pub fn get_peer_addr(stream: &TcpStream) -> Option<SocketAddr> {
stream.peer_addr().ok() stream.peer_addr().ok()
} }
/// Check if address is IPv6 /// Check if address is IPv6
#[allow(dead_code)]
pub fn is_ipv6(addr: &SocketAddr) -> bool { pub fn is_ipv6(addr: &SocketAddr) -> bool {
addr.is_ipv6() addr.is_ipv6()
} }

View File

@ -1,7 +1,7 @@
//! SOCKS4/5 Client Implementation //! SOCKS4/5 Client Implementation
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};

View File

@ -2,6 +2,8 @@
//! //!
//! IPv6/IPv4 connectivity checks with configurable preference. //! IPv6/IPv4 connectivity checks with configurable preference.
#![allow(deprecated)]
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{SocketAddr, IpAddr}; use std::net::{SocketAddr, IpAddr};
use std::sync::Arc; use std::sync::Arc;
@ -549,7 +551,7 @@ impl UpstreamManager {
/// Tests BOTH IPv6 and IPv4, returns separate results for each. /// Tests BOTH IPv6 and IPv4, returns separate results for each.
pub async fn ping_all_dcs( pub async fn ping_all_dcs(
&self, &self,
prefer_ipv6: bool, _prefer_ipv6: bool,
dc_overrides: &HashMap<String, Vec<String>>, dc_overrides: &HashMap<String, Vec<String>>,
ipv4_enabled: bool, ipv4_enabled: bool,
ipv6_enabled: bool, ipv6_enabled: bool,
@ -907,6 +909,7 @@ impl UpstreamManager {
} }
/// Get the preferred IP for a DC (for use by other components) /// Get the preferred IP for a DC (for use by other components)
#[allow(dead_code)]
pub async fn get_dc_ip_preference(&self, dc_idx: i16) -> Option<IpPreference> { pub async fn get_dc_ip_preference(&self, dc_idx: i16) -> Option<IpPreference> {
let guard = self.upstreams.read().await; let guard = self.upstreams.read().await;
if guard.is_empty() { if guard.is_empty() {
@ -918,6 +921,7 @@ impl UpstreamManager {
} }
/// Get preferred DC address based on config preference /// Get preferred DC address based on config preference
#[allow(dead_code)]
pub async fn get_dc_addr(&self, dc_idx: i16, prefer_ipv6: bool) -> Option<SocketAddr> { pub async fn get_dc_addr(&self, dc_idx: i16, prefer_ipv6: bool) -> Option<SocketAddr> {
let arr_idx = UpstreamState::dc_array_idx(dc_idx)?; let arr_idx = UpstreamState::dc_array_idx(dc_idx)?;

View File

@ -1,16 +1,18 @@
//! IP Addr Detect //! IP Addr Detect
use std::net::{IpAddr, SocketAddr, UdpSocket}; use std::net::{IpAddr, UdpSocket};
use std::time::Duration; use std::time::Duration;
use tracing::{debug, warn}; use tracing::{debug, warn};
/// Detected IP addresses /// Detected IP addresses
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
#[allow(dead_code)]
pub struct IpInfo { pub struct IpInfo {
pub ipv4: Option<IpAddr>, pub ipv4: Option<IpAddr>,
pub ipv6: Option<IpAddr>, pub ipv6: Option<IpAddr>,
} }
#[allow(dead_code)]
impl IpInfo { impl IpInfo {
/// Check if any IP is detected /// Check if any IP is detected
pub fn has_any(&self) -> bool { pub fn has_any(&self) -> bool {
@ -28,12 +30,14 @@ impl IpInfo {
} }
/// URLs for IP detection /// URLs for IP detection
#[allow(dead_code)]
const IPV4_URLS: &[&str] = &[ const IPV4_URLS: &[&str] = &[
"http://v4.ident.me/", "http://v4.ident.me/",
"http://ipv4.icanhazip.com/", "http://ipv4.icanhazip.com/",
"http://api.ipify.org/", "http://api.ipify.org/",
]; ];
#[allow(dead_code)]
const IPV6_URLS: &[&str] = &[ const IPV6_URLS: &[&str] = &[
"http://v6.ident.me/", "http://v6.ident.me/",
"http://ipv6.icanhazip.com/", "http://ipv6.icanhazip.com/",
@ -42,12 +46,14 @@ const IPV6_URLS: &[&str] = &[
/// Detect local IP address by connecting to a public DNS /// Detect local IP address by connecting to a public DNS
/// This does not actually send any packets /// This does not actually send any packets
#[allow(dead_code)]
fn get_local_ip(target: &str) -> Option<IpAddr> { fn get_local_ip(target: &str) -> Option<IpAddr> {
let socket = UdpSocket::bind("0.0.0.0:0").ok()?; let socket = UdpSocket::bind("0.0.0.0:0").ok()?;
socket.connect(target).ok()?; socket.connect(target).ok()?;
socket.local_addr().ok().map(|addr| addr.ip()) socket.local_addr().ok().map(|addr| addr.ip())
} }
#[allow(dead_code)]
fn get_local_ipv6(target: &str) -> Option<IpAddr> { fn get_local_ipv6(target: &str) -> Option<IpAddr> {
let socket = UdpSocket::bind("[::]:0").ok()?; let socket = UdpSocket::bind("[::]:0").ok()?;
socket.connect(target).ok()?; socket.connect(target).ok()?;
@ -55,6 +61,7 @@ fn get_local_ipv6(target: &str) -> Option<IpAddr> {
} }
/// Detect public IP addresses /// Detect public IP addresses
#[allow(dead_code)]
pub async fn detect_ip() -> IpInfo { pub async fn detect_ip() -> IpInfo {
let mut info = IpInfo::default(); let mut info = IpInfo::default();
@ -119,6 +126,7 @@ pub async fn detect_ip() -> IpInfo {
info info
} }
#[allow(dead_code)]
fn is_private_ip(ip: IpAddr) -> bool { fn is_private_ip(ip: IpAddr) -> bool {
match ip { match ip {
IpAddr::V4(ipv4) => { IpAddr::V4(ipv4) => {
@ -131,6 +139,7 @@ fn is_private_ip(ip: IpAddr) -> bool {
} }
/// Fetch IP from URL /// Fetch IP from URL
#[allow(dead_code)]
async fn fetch_ip(url: &str) -> Option<IpAddr> { async fn fetch_ip(url: &str) -> Option<IpAddr> {
let client = reqwest::Client::builder() let client = reqwest::Client::builder()
.timeout(Duration::from_secs(5)) .timeout(Duration::from_secs(5))
@ -144,6 +153,7 @@ async fn fetch_ip(url: &str) -> Option<IpAddr> {
} }
/// Synchronous IP detection (for startup) /// Synchronous IP detection (for startup)
#[allow(dead_code)]
pub fn detect_ip_sync() -> IpInfo { pub fn detect_ip_sync() -> IpInfo {
tokio::runtime::Handle::current().block_on(detect_ip()) tokio::runtime::Handle::current().block_on(detect_ip())
} }

View File

@ -3,5 +3,7 @@
pub mod ip; pub mod ip;
pub mod time; pub mod time;
#[allow(unused_imports)]
pub use ip::*; pub use ip::*;
#[allow(unused_imports)]
pub use time::*; pub use time::*;

View File

@ -4,11 +4,14 @@ use std::time::Duration;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use tracing::{debug, warn, error}; use tracing::{debug, warn, error};
#[allow(dead_code)]
const TIME_SYNC_URL: &str = "https://core.telegram.org/getProxySecret"; const TIME_SYNC_URL: &str = "https://core.telegram.org/getProxySecret";
#[allow(dead_code)]
const MAX_TIME_SKEW_SECS: i64 = 30; const MAX_TIME_SKEW_SECS: i64 = 30;
/// Time sync result /// Time sync result
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct TimeSyncResult { pub struct TimeSyncResult {
pub server_time: DateTime<Utc>, pub server_time: DateTime<Utc>,
pub local_time: DateTime<Utc>, pub local_time: DateTime<Utc>,
@ -17,6 +20,7 @@ pub struct TimeSyncResult {
} }
/// Check time synchronization with Telegram servers /// Check time synchronization with Telegram servers
#[allow(dead_code)]
pub async fn check_time_sync() -> Option<TimeSyncResult> { pub async fn check_time_sync() -> Option<TimeSyncResult> {
let client = reqwest::Client::builder() let client = reqwest::Client::builder()
.timeout(Duration::from_secs(10)) .timeout(Duration::from_secs(10))
@ -60,6 +64,7 @@ pub async fn check_time_sync() -> Option<TimeSyncResult> {
} }
/// Background time sync task /// Background time sync task
#[allow(dead_code)]
pub async fn time_sync_task(check_interval: Duration) -> ! { pub async fn time_sync_task(check_interval: Duration) -> ! {
loop { loop {
if let Some(result) = check_time_sync().await { if let Some(result) = check_time_sync().await {