Adtag + ME Pool improvements: merge pull request #291 from telemt/flow-adtag

Adtag + ME Pool improvements
This commit is contained in:
Alexey 2026-03-02 00:22:45 +03:00 committed by GitHub
commit ac453638b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 623 additions and 149 deletions

2
Cargo.lock generated
View File

@ -2087,7 +2087,7 @@ dependencies = [
[[package]] [[package]]
name = "telemt" name = "telemt"
version = "3.0.13" version = "3.1.3"
dependencies = [ dependencies = [
"aes", "aes",
"anyhow", "anyhow",

View File

@ -215,10 +215,12 @@ hello = "00000000000000000000000000000000"
``` ```
### Advanced ### Advanced
#### Adtag #### Adtag (per-user)
To use channel advertising and usage statistics from Telegram, get Adtag from [@mtproxybot](https://t.me/mtproxybot), add this parameter to section `[General]` To use channel advertising and usage statistics from Telegram, get an Adtag from [@mtproxybot](https://t.me/mtproxybot). Set it per user in `[access.user_ad_tags]` (32 hex chars):
```toml ```toml
ad_tag = "00000000000000000000000000000000" # Replace zeros to your adtag from @mtproxybot [access.user_ad_tags]
username1 = "11111111111111111111111111111111" # Replace with your tag from @mtproxybot
username2 = "22222222222222222222222222222222"
``` ```
#### Listening and Announce IPs #### Listening and Announce IPs
To specify listening address and/or address in links, add to section `[[server.listeners]]` of config.toml: To specify listening address and/or address in links, add to section `[[server.listeners]]` of config.toml:

View File

@ -5,7 +5,9 @@
# === General Settings === # === General Settings ===
[general] [general]
use_middle_proxy = false use_middle_proxy = false
# Global ad_tag fallback when user has no per-user tag in [access.user_ad_tags]
# ad_tag = "00000000000000000000000000000000" # ad_tag = "00000000000000000000000000000000"
# Per-user ad_tag in [access.user_ad_tags] (32 hex from @MTProxybot)
# === Log Level === # === Log Level ===
# Log level: debug | verbose | normal | silent # Log level: debug | verbose | normal | silent

View File

@ -277,6 +277,18 @@ pub(crate) fn default_me_reinit_every_secs() -> u64 {
15 * 60 15 * 60
} }
pub(crate) fn default_me_reinit_singleflight() -> bool {
true
}
pub(crate) fn default_me_reinit_trigger_channel() -> usize {
64
}
pub(crate) fn default_me_reinit_coalesce_window_ms() -> u64 {
200
}
pub(crate) fn default_me_hardswap_warmup_delay_min_ms() -> u64 { pub(crate) fn default_me_hardswap_warmup_delay_min_ms() -> u64 {
1000 1000
} }
@ -301,6 +313,18 @@ pub(crate) fn default_me_config_apply_cooldown_secs() -> u64 {
300 300
} }
pub(crate) fn default_me_snapshot_require_http_2xx() -> bool {
true
}
pub(crate) fn default_me_snapshot_reject_empty_map() -> bool {
true
}
pub(crate) fn default_me_snapshot_min_proxy_for_lines() -> u32 {
1
}
pub(crate) fn default_proxy_secret_stable_snapshots() -> u8 { pub(crate) fn default_proxy_secret_stable_snapshots() -> u8 {
2 2
} }
@ -309,6 +333,10 @@ pub(crate) fn default_proxy_secret_rotate_runtime() -> bool {
true true
} }
pub(crate) fn default_me_secret_atomic_snapshot() -> bool {
true
}
pub(crate) fn default_proxy_secret_len_max() -> usize { pub(crate) fn default_proxy_secret_len_max() -> usize {
256 256
} }
@ -321,10 +349,18 @@ pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 {
90 90
} }
pub(crate) fn default_me_bind_stale_ttl_secs() -> u64 {
default_me_pool_drain_ttl_secs()
}
pub(crate) fn default_me_pool_min_fresh_ratio() -> f32 { pub(crate) fn default_me_pool_min_fresh_ratio() -> f32 {
0.8 0.8
} }
pub(crate) fn default_me_deterministic_writer_sort() -> bool {
true
}
pub(crate) fn default_hardswap() -> bool { pub(crate) fn default_hardswap() -> bool {
true true
} }

View File

@ -4,21 +4,22 @@
//! //!
//! # What can be reloaded without restart //! # What can be reloaded without restart
//! //!
//! | Section | Field | Effect | //! | Section | Field | Effect |
//! |-----------|-------------------------------|-----------------------------------| //! |-----------|--------------------------------|------------------------------------------------|
//! | `general` | `log_level` | Filter updated via `log_level_tx` | //! | `general` | `log_level` | Filter updated via `log_level_tx` |
//! | `general` | `ad_tag` | Passed on next connection | //! | `access` | `user_ad_tags` | Passed on next connection |
//! | `general` | `middle_proxy_pool_size` | Passed on next connection | //! | `general` | `ad_tag` | Passed on next connection (fallback per-user) |
//! | `general` | `me_keepalive_*` | Passed on next connection | //! | `general` | `middle_proxy_pool_size` | Passed on next connection |
//! | `general` | `desync_all_full` | Applied immediately | //! | `general` | `me_keepalive_*` | Passed on next connection |
//! | `general` | `update_every` | Applied to ME updater immediately | //! | `general` | `desync_all_full` | Applied immediately |
//! | `general` | `hardswap` | Applied on next ME map update | //! | `general` | `update_every` | Applied to ME updater immediately |
//! | `general` | `me_pool_drain_ttl_secs` | Applied on next ME map update | //! | `general` | `hardswap` | Applied on next ME map update |
//! | `general` | `me_pool_min_fresh_ratio` | Applied on next ME map update | //! | `general` | `me_pool_drain_ttl_secs` | Applied on next ME map update |
//! | `general` | `me_reinit_drain_timeout_secs`| Applied on next ME map update | //! | `general` | `me_pool_min_fresh_ratio` | Applied on next ME map update |
//! | `general` | `telemetry` / `me_*_policy` | Applied immediately | //! | `general` | `me_reinit_drain_timeout_secs` | Applied on next ME map update |
//! | `network` | `dns_overrides` | Applied immediately | //! | `general` | `telemetry` / `me_*_policy` | Applied immediately |
//! | `access` | All user/quota fields | Effective immediately | //! | `network` | `dns_overrides` | Applied immediately |
//! | `access` | All user/quota fields | Effective immediately |
//! //!
//! Fields that require re-binding sockets (`server.port`, `censorship.*`, //! Fields that require re-binding sockets (`server.port`, `censorship.*`,
//! `network.*`, `use_middle_proxy`) are **not** applied; a warning is emitted. //! `network.*`, `use_middle_proxy`) are **not** applied; a warning is emitted.
@ -207,14 +208,17 @@ fn log_changes(
log_tx.send(new_hot.log_level.clone()).ok(); log_tx.send(new_hot.log_level.clone()).ok();
} }
if old_hot.ad_tag != new_hot.ad_tag { if old_hot.access.user_ad_tags != new_hot.access.user_ad_tags {
info!( info!(
"config reload: ad_tag: {} → {}", "config reload: user_ad_tags updated ({} entries)",
old_hot.ad_tag.as_deref().unwrap_or("none"), new_hot.access.user_ad_tags.len(),
new_hot.ad_tag.as_deref().unwrap_or("none"),
); );
} }
if old_hot.ad_tag != new_hot.ad_tag {
info!("config reload: general.ad_tag updated (applied on next connection)");
}
if old_hot.dns_overrides != new_hot.dns_overrides { if old_hot.dns_overrides != new_hot.dns_overrides {
info!( info!(
"config reload: network.dns_overrides updated ({} entries)", "config reload: network.dns_overrides updated ({} entries)",

View File

@ -305,12 +305,24 @@ impl ProxyConfig {
)); ));
} }
if config.general.me_snapshot_min_proxy_for_lines == 0 {
return Err(ProxyError::Config(
"general.me_snapshot_min_proxy_for_lines must be > 0".to_string(),
));
}
if config.general.proxy_secret_stable_snapshots == 0 { if config.general.proxy_secret_stable_snapshots == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.proxy_secret_stable_snapshots must be > 0".to_string(), "general.proxy_secret_stable_snapshots must be > 0".to_string(),
)); ));
} }
if config.general.me_reinit_trigger_channel == 0 {
return Err(ProxyError::Config(
"general.me_reinit_trigger_channel must be > 0".to_string(),
));
}
if !(32..=4096).contains(&config.general.proxy_secret_len_max) { if !(32..=4096).contains(&config.general.proxy_secret_len_max) {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.proxy_secret_len_max must be within [32, 4096]".to_string(), "general.proxy_secret_len_max must be within [32, 4096]".to_string(),
@ -532,15 +544,16 @@ impl ProxyConfig {
))); )));
} }
if let Some(tag) = &self.general.ad_tag { for (user, tag) in &self.access.user_ad_tags {
let zeros = "00000000000000000000000000000000"; let zeros = "00000000000000000000000000000000";
if !is_valid_ad_tag(tag) { if !is_valid_ad_tag(tag) {
return Err(ProxyError::Config( return Err(ProxyError::Config(format!(
"general.ad_tag must be exactly 32 hex characters".to_string(), "access.user_ad_tags['{}'] must be exactly 32 hex characters",
)); user
)));
} }
if tag == zeros { if tag == zeros {
warn!("ad_tag is all zeros; register a valid proxy tag via @MTProxybot to enable sponsored channel"); warn!(user = %user, "user ad_tag is all zeros; register a valid proxy tag via @MTProxybot to enable sponsored channel");
} }
} }
@ -1100,6 +1113,27 @@ mod tests {
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }
#[test]
fn invalid_user_ad_tag_reports_access_user_ad_tags_key() {
let toml = r#"
[censorship]
tls_domain = "example.com"
[access.users]
alice = "00000000000000000000000000000000"
[access.user_ad_tags]
alice = "not_hex"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_invalid_user_ad_tag_message_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
let err = cfg.validate().unwrap_err().to_string();
assert!(err.contains("access.user_ad_tags['alice'] must be exactly 32 hex characters"));
let _ = std::fs::remove_file(path);
}
#[test] #[test]
fn invalid_dns_override_is_rejected() { fn invalid_dns_override_is_rejected() {
let toml = r#" let toml = r#"

View File

@ -130,6 +130,34 @@ impl MeSocksKdfPolicy {
} }
} }
/// Stale ME writer bind policy during drain window.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum MeBindStaleMode {
Never,
#[default]
Ttl,
Always,
}
impl MeBindStaleMode {
pub fn as_u8(self) -> u8 {
match self {
MeBindStaleMode::Never => 0,
MeBindStaleMode::Ttl => 1,
MeBindStaleMode::Always => 2,
}
}
pub fn from_u8(raw: u8) -> Self {
match raw {
0 => MeBindStaleMode::Never,
2 => MeBindStaleMode::Always,
_ => MeBindStaleMode::Ttl,
}
}
}
/// Telemetry controls for hot-path counters and ME diagnostics. /// Telemetry controls for hot-path counters and ME diagnostics.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TelemetryConfig { pub struct TelemetryConfig {
@ -247,14 +275,15 @@ pub struct GeneralConfig {
#[serde(default = "default_true")] #[serde(default = "default_true")]
pub use_middle_proxy: bool, pub use_middle_proxy: bool,
#[serde(default)]
pub ad_tag: Option<String>,
/// Path to proxy-secret binary file (auto-downloaded if absent). /// Path to proxy-secret binary file (auto-downloaded if absent).
/// Infrastructure secret from https://core.telegram.org/getProxySecret. /// Infrastructure secret from https://core.telegram.org/getProxySecret.
#[serde(default = "default_proxy_secret_path")] #[serde(default = "default_proxy_secret_path")]
pub proxy_secret_path: Option<String>, pub proxy_secret_path: Option<String>,
/// Global ad_tag (32 hex chars from @MTProxybot). Fallback when user has no per-user tag in access.user_ad_tags.
#[serde(default)]
pub ad_tag: Option<String>,
/// Public IP override for middle-proxy NAT environments. /// Public IP override for middle-proxy NAT environments.
/// When set, this IP is used in ME key derivation and RPC_PROXY_REQ "our_addr". /// When set, this IP is used in ME key derivation and RPC_PROXY_REQ "our_addr".
#[serde(default)] #[serde(default)]
@ -453,6 +482,18 @@ pub struct GeneralConfig {
#[serde(default = "default_me_config_apply_cooldown_secs")] #[serde(default = "default_me_config_apply_cooldown_secs")]
pub me_config_apply_cooldown_secs: u64, pub me_config_apply_cooldown_secs: u64,
/// Ensure getProxyConfig snapshots are applied only for 2xx HTTP responses.
#[serde(default = "default_me_snapshot_require_http_2xx")]
pub me_snapshot_require_http_2xx: bool,
/// Reject empty getProxyConfig snapshots instead of marking them applied.
#[serde(default = "default_me_snapshot_reject_empty_map")]
pub me_snapshot_reject_empty_map: bool,
/// Minimum parsed `proxy_for` rows required to accept a snapshot.
#[serde(default = "default_me_snapshot_min_proxy_for_lines")]
pub me_snapshot_min_proxy_for_lines: u32,
/// Number of identical getProxySecret snapshots required before runtime secret rotation. /// Number of identical getProxySecret snapshots required before runtime secret rotation.
#[serde(default = "default_proxy_secret_stable_snapshots")] #[serde(default = "default_proxy_secret_stable_snapshots")]
pub proxy_secret_stable_snapshots: u8, pub proxy_secret_stable_snapshots: u8,
@ -461,6 +502,10 @@ pub struct GeneralConfig {
#[serde(default = "default_proxy_secret_rotate_runtime")] #[serde(default = "default_proxy_secret_rotate_runtime")]
pub proxy_secret_rotate_runtime: bool, pub proxy_secret_rotate_runtime: bool,
/// Keep key-selector and secret bytes from one snapshot during ME handshake.
#[serde(default = "default_me_secret_atomic_snapshot")]
pub me_secret_atomic_snapshot: bool,
/// Maximum allowed proxy-secret length in bytes for startup and runtime refresh. /// Maximum allowed proxy-secret length in bytes for startup and runtime refresh.
#[serde(default = "default_proxy_secret_len_max")] #[serde(default = "default_proxy_secret_len_max")]
pub proxy_secret_len_max: usize, pub proxy_secret_len_max: usize,
@ -470,6 +515,14 @@ pub struct GeneralConfig {
#[serde(default = "default_me_pool_drain_ttl_secs")] #[serde(default = "default_me_pool_drain_ttl_secs")]
pub me_pool_drain_ttl_secs: u64, pub me_pool_drain_ttl_secs: u64,
/// Policy for new binds on stale draining writers.
#[serde(default)]
pub me_bind_stale_mode: MeBindStaleMode,
/// TTL for stale bind allowance when `me_bind_stale_mode = \"ttl\"`.
#[serde(default = "default_me_bind_stale_ttl_secs")]
pub me_bind_stale_ttl_secs: u64,
/// Minimum desired-DC coverage ratio required before draining stale writers. /// Minimum desired-DC coverage ratio required before draining stale writers.
/// Range: 0.0..=1.0. /// Range: 0.0..=1.0.
#[serde(default = "default_me_pool_min_fresh_ratio")] #[serde(default = "default_me_pool_min_fresh_ratio")]
@ -490,6 +543,22 @@ pub struct GeneralConfig {
#[serde(default = "default_proxy_config_reload_secs")] #[serde(default = "default_proxy_config_reload_secs")]
pub proxy_config_auto_reload_secs: u64, pub proxy_config_auto_reload_secs: u64,
/// Serialize ME reinit cycles across all trigger sources.
#[serde(default = "default_me_reinit_singleflight")]
pub me_reinit_singleflight: bool,
/// Trigger queue capacity for reinit scheduler.
#[serde(default = "default_me_reinit_trigger_channel")]
pub me_reinit_trigger_channel: usize,
/// Trigger coalescing window before starting a reinit cycle.
#[serde(default = "default_me_reinit_coalesce_window_ms")]
pub me_reinit_coalesce_window_ms: u64,
/// Deterministic candidate sort for ME writer binding path.
#[serde(default = "default_me_deterministic_writer_sort")]
pub me_deterministic_writer_sort: bool,
/// Enable NTP drift check at startup. /// Enable NTP drift check at startup.
#[serde(default = "default_ntp_check")] #[serde(default = "default_ntp_check")]
pub ntp_check: bool, pub ntp_check: bool,
@ -564,14 +633,24 @@ impl Default for GeneralConfig {
me_hardswap_warmup_pass_backoff_base_ms: default_me_hardswap_warmup_pass_backoff_base_ms(), me_hardswap_warmup_pass_backoff_base_ms: default_me_hardswap_warmup_pass_backoff_base_ms(),
me_config_stable_snapshots: default_me_config_stable_snapshots(), me_config_stable_snapshots: default_me_config_stable_snapshots(),
me_config_apply_cooldown_secs: default_me_config_apply_cooldown_secs(), me_config_apply_cooldown_secs: default_me_config_apply_cooldown_secs(),
me_snapshot_require_http_2xx: default_me_snapshot_require_http_2xx(),
me_snapshot_reject_empty_map: default_me_snapshot_reject_empty_map(),
me_snapshot_min_proxy_for_lines: default_me_snapshot_min_proxy_for_lines(),
proxy_secret_stable_snapshots: default_proxy_secret_stable_snapshots(), proxy_secret_stable_snapshots: default_proxy_secret_stable_snapshots(),
proxy_secret_rotate_runtime: default_proxy_secret_rotate_runtime(), proxy_secret_rotate_runtime: default_proxy_secret_rotate_runtime(),
me_secret_atomic_snapshot: default_me_secret_atomic_snapshot(),
proxy_secret_len_max: default_proxy_secret_len_max(), proxy_secret_len_max: default_proxy_secret_len_max(),
me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(), me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(),
me_bind_stale_mode: MeBindStaleMode::default(),
me_bind_stale_ttl_secs: default_me_bind_stale_ttl_secs(),
me_pool_min_fresh_ratio: default_me_pool_min_fresh_ratio(), me_pool_min_fresh_ratio: default_me_pool_min_fresh_ratio(),
me_reinit_drain_timeout_secs: default_me_reinit_drain_timeout_secs(), me_reinit_drain_timeout_secs: default_me_reinit_drain_timeout_secs(),
proxy_secret_auto_reload_secs: default_proxy_secret_reload_secs(), proxy_secret_auto_reload_secs: default_proxy_secret_reload_secs(),
proxy_config_auto_reload_secs: default_proxy_config_reload_secs(), proxy_config_auto_reload_secs: default_proxy_config_reload_secs(),
me_reinit_singleflight: default_me_reinit_singleflight(),
me_reinit_trigger_channel: default_me_reinit_trigger_channel(),
me_reinit_coalesce_window_ms: default_me_reinit_coalesce_window_ms(),
me_deterministic_writer_sort: default_me_deterministic_writer_sort(),
ntp_check: default_ntp_check(), ntp_check: default_ntp_check(),
ntp_servers: default_ntp_servers(), ntp_servers: default_ntp_servers(),
auto_degradation_enabled: default_true(), auto_degradation_enabled: default_true(),
@ -807,6 +886,10 @@ pub struct AccessConfig {
#[serde(default = "default_access_users")] #[serde(default = "default_access_users")]
pub users: HashMap<String, String>, pub users: HashMap<String, String>,
/// Per-user ad_tag (32 hex chars from @MTProxybot).
#[serde(default)]
pub user_ad_tags: HashMap<String, String>,
#[serde(default)] #[serde(default)]
pub user_max_tcp_conns: HashMap<String, usize>, pub user_max_tcp_conns: HashMap<String, usize>,
@ -833,6 +916,7 @@ impl Default for AccessConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
users: default_access_users(), users: default_access_users(),
user_ad_tags: HashMap::new(),
user_max_tcp_conns: HashMap::new(), user_max_tcp_conns: HashMap::new(),
user_expirations: HashMap::new(), user_expirations: HashMap::new(),
user_data_quota: HashMap::new(), user_data_quota: HashMap::new(),

View File

@ -8,7 +8,7 @@ use std::time::Duration;
use rand::Rng; use rand::Rng;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::signal; use tokio::signal;
use tokio::sync::Semaphore; use tokio::sync::{Semaphore, mpsc};
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload}; use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload};
#[cfg(unix)] #[cfg(unix)]
@ -40,7 +40,7 @@ use crate::stats::telemetry::TelemetryPolicy;
use crate::stats::{ReplayChecker, Stats}; use crate::stats::{ReplayChecker, Stats};
use crate::stream::BufferPool; use crate::stream::BufferPool;
use crate::transport::middle_proxy::{ use crate::transport::middle_proxy::{
MePool, fetch_proxy_config, run_me_ping, MePingFamily, MePingSample, format_sample_line, MePool, fetch_proxy_config, run_me_ping, MePingFamily, MePingSample, MeReinitTrigger, format_sample_line,
format_me_route, format_me_route,
}; };
use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes}; use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes};
@ -448,7 +448,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
info!("Middle-proxy STUN probing disabled by network.stun_use=false"); info!("Middle-proxy STUN probing disabled by network.stun_use=false");
} }
// ad_tag (proxy_tag) for advertising // Global ad_tag (pool default). Used when user has no per-user tag in access.user_ad_tags.
let proxy_tag = config let proxy_tag = config
.general .general
.ad_tag .ad_tag
@ -546,6 +546,10 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
config.general.me_hardswap_warmup_delay_max_ms, config.general.me_hardswap_warmup_delay_max_ms,
config.general.me_hardswap_warmup_extra_passes, config.general.me_hardswap_warmup_extra_passes,
config.general.me_hardswap_warmup_pass_backoff_base_ms, config.general.me_hardswap_warmup_pass_backoff_base_ms,
config.general.me_bind_stale_mode,
config.general.me_bind_stale_ttl_secs,
config.general.me_secret_atomic_snapshot,
config.general.me_deterministic_writer_sort,
config.general.me_socks_kdf_policy, config.general.me_socks_kdf_policy,
config.general.me_route_backpressure_base_timeout_ms, config.general.me_route_backpressure_base_timeout_ms,
config.general.me_route_backpressure_high_timeout_ms, config.general.me_route_backpressure_high_timeout_ms,
@ -849,26 +853,43 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
}); });
if let Some(ref pool) = me_pool { if let Some(ref pool) = me_pool {
let pool_clone = pool.clone(); let reinit_trigger_capacity = config
let rng_clone = rng.clone(); .general
let config_rx_clone = config_rx.clone(); .me_reinit_trigger_channel
.max(1);
let (reinit_tx, reinit_rx) = mpsc::channel::<MeReinitTrigger>(reinit_trigger_capacity);
let pool_clone_sched = pool.clone();
let rng_clone_sched = rng.clone();
let config_rx_clone_sched = config_rx.clone();
tokio::spawn(async move { tokio::spawn(async move {
crate::transport::middle_proxy::me_config_updater( crate::transport::middle_proxy::me_reinit_scheduler(
pool_clone, pool_clone_sched,
rng_clone, rng_clone_sched,
config_rx_clone, config_rx_clone_sched,
reinit_rx,
)
.await;
});
let pool_clone = pool.clone();
let config_rx_clone = config_rx.clone();
let reinit_tx_updater = reinit_tx.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_config_updater(
pool_clone,
config_rx_clone,
reinit_tx_updater,
) )
.await; .await;
}); });
let pool_clone_rot = pool.clone();
let rng_clone_rot = rng.clone();
let config_rx_clone_rot = config_rx.clone(); let config_rx_clone_rot = config_rx.clone();
let reinit_tx_rotation = reinit_tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
crate::transport::middle_proxy::me_rotation_task( crate::transport::middle_proxy::me_rotation_task(
pool_clone_rot,
rng_clone_rot,
config_rx_clone_rot, config_rx_clone_rot,
reinit_tx_rotation,
) )
.await; .await;
}); });

View File

@ -238,7 +238,22 @@ where
stats.increment_user_connects(&user); stats.increment_user_connects(&user);
stats.increment_user_curr_connects(&user); stats.increment_user_curr_connects(&user);
let proto_flags = proto_flags_for_tag(proto_tag, me_pool.has_proxy_tag()); // Per-user ad_tag from access.user_ad_tags; fallback to general.ad_tag (hot-reloadable)
let user_tag: Option<Vec<u8>> = config
.access
.user_ad_tags
.get(&user)
.and_then(|s| hex::decode(s).ok())
.filter(|v| v.len() == 16);
let global_tag: Option<Vec<u8>> = config
.general
.ad_tag
.as_ref()
.and_then(|s| hex::decode(s).ok())
.filter(|v| v.len() == 16);
let effective_tag = user_tag.or(global_tag);
let proto_flags = proto_flags_for_tag(proto_tag, effective_tag.is_some());
debug!( debug!(
trace_id = format_args!("0x{:016x}", trace_id), trace_id = format_args!("0x{:016x}", trace_id),
user = %user, user = %user,
@ -256,6 +271,7 @@ where
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(C2ME_CHANNEL_CAPACITY); let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(C2ME_CHANNEL_CAPACITY);
let me_pool_c2me = me_pool.clone(); let me_pool_c2me = me_pool.clone();
let effective_tag = effective_tag;
let c2me_sender = tokio::spawn(async move { let c2me_sender = tokio::spawn(async move {
let mut sent_since_yield = 0usize; let mut sent_since_yield = 0usize;
while let Some(cmd) = c2me_rx.recv().await { while let Some(cmd) = c2me_rx.recv().await {
@ -268,6 +284,7 @@ where
translated_local_addr, translated_local_addr,
&payload, &payload,
flags, flags,
effective_tag.as_deref(),
).await?; ).await?;
sent_since_yield = sent_since_yield.saturating_add(1); sent_since_yield = sent_since_yield.saturating_add(1);
if should_yield_c2me_sender(sent_since_yield, !c2me_rx.is_empty()) { if should_yield_c2me_sender(sent_since_yield, !c2me_rx.is_empty()) {

View File

@ -5,15 +5,15 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use httpdate; use httpdate;
use tokio::sync::watch; use tokio::sync::{mpsc, watch};
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
use crate::error::Result; use crate::error::Result;
use super::MePool; use super::MePool;
use super::rotation::{MeReinitTrigger, enqueue_reinit_trigger};
use super::secret::download_proxy_secret_with_max_len; use super::secret::download_proxy_secret_with_max_len;
use crate::crypto::SecureRandom;
use std::time::SystemTime; use std::time::SystemTime;
async fn retry_fetch(url: &str) -> Option<ProxyConfigData> { async fn retry_fetch(url: &str) -> Option<ProxyConfigData> {
@ -38,6 +38,8 @@ async fn retry_fetch(url: &str) -> Option<ProxyConfigData> {
pub struct ProxyConfigData { pub struct ProxyConfigData {
pub map: HashMap<i32, Vec<(IpAddr, u16)>>, pub map: HashMap<i32, Vec<(IpAddr, u16)>>,
pub default_dc: Option<i32>, pub default_dc: Option<i32>,
pub http_status: u16,
pub proxy_for_lines: u32,
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -172,6 +174,7 @@ pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
.await .await
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))? .map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))?
; ;
let http_status = resp.status().as_u16();
if let Some(date) = resp.headers().get(reqwest::header::DATE) if let Some(date) = resp.headers().get(reqwest::header::DATE)
&& let Ok(date_str) = date.to_str() && let Ok(date_str) = date.to_str()
@ -194,9 +197,11 @@ pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?; .map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?;
let mut map: HashMap<i32, Vec<(IpAddr, u16)>> = HashMap::new(); let mut map: HashMap<i32, Vec<(IpAddr, u16)>> = HashMap::new();
let mut proxy_for_lines: u32 = 0;
for line in text.lines() { for line in text.lines() {
if let Some((dc, ip, port)) = parse_proxy_line(line) { if let Some((dc, ip, port)) = parse_proxy_line(line) {
map.entry(dc).or_default().push((ip, port)); map.entry(dc).or_default().push((ip, port));
proxy_for_lines = proxy_for_lines.saturating_add(1);
} }
} }
@ -214,14 +219,49 @@ pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
None None
}); });
Ok(ProxyConfigData { map, default_dc }) Ok(ProxyConfigData {
map,
default_dc,
http_status,
proxy_for_lines,
})
}
fn snapshot_passes_guards(
cfg: &ProxyConfig,
snapshot: &ProxyConfigData,
snapshot_name: &'static str,
) -> bool {
if cfg.general.me_snapshot_require_http_2xx
&& !(200..=299).contains(&snapshot.http_status)
{
warn!(
snapshot = snapshot_name,
http_status = snapshot.http_status,
"ME snapshot rejected by non-2xx HTTP status"
);
return false;
}
let min_proxy_for = cfg.general.me_snapshot_min_proxy_for_lines;
if snapshot.proxy_for_lines < min_proxy_for {
warn!(
snapshot = snapshot_name,
parsed_proxy_for_lines = snapshot.proxy_for_lines,
min_proxy_for_lines = min_proxy_for,
"ME snapshot rejected by proxy_for line floor"
);
return false;
}
true
} }
async fn run_update_cycle( async fn run_update_cycle(
pool: &Arc<MePool>, pool: &Arc<MePool>,
rng: &Arc<SecureRandom>,
cfg: &ProxyConfig, cfg: &ProxyConfig,
state: &mut UpdaterState, state: &mut UpdaterState,
reinit_tx: &mpsc::Sender<MeReinitTrigger>,
) { ) {
pool.update_runtime_reinit_policy( pool.update_runtime_reinit_policy(
cfg.general.hardswap, cfg.general.hardswap,
@ -232,6 +272,10 @@ async fn run_update_cycle(
cfg.general.me_hardswap_warmup_delay_max_ms, cfg.general.me_hardswap_warmup_delay_max_ms,
cfg.general.me_hardswap_warmup_extra_passes, cfg.general.me_hardswap_warmup_extra_passes,
cfg.general.me_hardswap_warmup_pass_backoff_base_ms, cfg.general.me_hardswap_warmup_pass_backoff_base_ms,
cfg.general.me_bind_stale_mode,
cfg.general.me_bind_stale_ttl_secs,
cfg.general.me_secret_atomic_snapshot,
cfg.general.me_deterministic_writer_sort,
); );
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1); let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
@ -242,44 +286,48 @@ async fn run_update_cycle(
let mut ready_v4: Option<(ProxyConfigData, u64)> = None; let mut ready_v4: Option<(ProxyConfigData, u64)> = None;
let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await; let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await;
if let Some(cfg_v4) = cfg_v4 { if let Some(cfg_v4) = cfg_v4 {
let cfg_v4_hash = hash_proxy_config(&cfg_v4); if snapshot_passes_guards(cfg, &cfg_v4, "getProxyConfig") {
let stable_hits = state.config_v4.observe(cfg_v4_hash); let cfg_v4_hash = hash_proxy_config(&cfg_v4);
if stable_hits < required_cfg_snapshots { let stable_hits = state.config_v4.observe(cfg_v4_hash);
debug!( if stable_hits < required_cfg_snapshots {
stable_hits, debug!(
required_cfg_snapshots, stable_hits,
snapshot = format_args!("0x{cfg_v4_hash:016x}"), required_cfg_snapshots,
"ME config v4 candidate observed" snapshot = format_args!("0x{cfg_v4_hash:016x}"),
); "ME config v4 candidate observed"
} else if state.config_v4.is_applied(cfg_v4_hash) { );
debug!( } else if state.config_v4.is_applied(cfg_v4_hash) {
snapshot = format_args!("0x{cfg_v4_hash:016x}"), debug!(
"ME config v4 stable snapshot already applied" snapshot = format_args!("0x{cfg_v4_hash:016x}"),
); "ME config v4 stable snapshot already applied"
} else { );
ready_v4 = Some((cfg_v4, cfg_v4_hash)); } else {
ready_v4 = Some((cfg_v4, cfg_v4_hash));
}
} }
} }
let mut ready_v6: Option<(ProxyConfigData, u64)> = None; let mut ready_v6: Option<(ProxyConfigData, u64)> = None;
let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await; let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await;
if let Some(cfg_v6) = cfg_v6 { if let Some(cfg_v6) = cfg_v6 {
let cfg_v6_hash = hash_proxy_config(&cfg_v6); if snapshot_passes_guards(cfg, &cfg_v6, "getProxyConfigV6") {
let stable_hits = state.config_v6.observe(cfg_v6_hash); let cfg_v6_hash = hash_proxy_config(&cfg_v6);
if stable_hits < required_cfg_snapshots { let stable_hits = state.config_v6.observe(cfg_v6_hash);
debug!( if stable_hits < required_cfg_snapshots {
stable_hits, debug!(
required_cfg_snapshots, stable_hits,
snapshot = format_args!("0x{cfg_v6_hash:016x}"), required_cfg_snapshots,
"ME config v6 candidate observed" snapshot = format_args!("0x{cfg_v6_hash:016x}"),
); "ME config v6 candidate observed"
} else if state.config_v6.is_applied(cfg_v6_hash) { );
debug!( } else if state.config_v6.is_applied(cfg_v6_hash) {
snapshot = format_args!("0x{cfg_v6_hash:016x}"), debug!(
"ME config v6 stable snapshot already applied" snapshot = format_args!("0x{cfg_v6_hash:016x}"),
); "ME config v6 stable snapshot already applied"
} else { );
ready_v6 = Some((cfg_v6, cfg_v6_hash)); } else {
ready_v6 = Some((cfg_v6, cfg_v6_hash));
}
} }
} }
@ -292,28 +340,40 @@ async fn run_update_cycle(
let update_v6 = ready_v6 let update_v6 = ready_v6
.as_ref() .as_ref()
.map(|(snapshot, _)| snapshot.map.clone()); .map(|(snapshot, _)| snapshot.map.clone());
let update_is_empty =
let changed = pool.update_proxy_maps(update_v4, update_v6).await; update_v4.is_empty() && update_v6.as_ref().is_none_or(|v| v.is_empty());
let apply_outcome = if update_is_empty && !cfg.general.me_snapshot_reject_empty_map {
if let Some((snapshot, hash)) = ready_v4 { super::pool_config::SnapshotApplyOutcome::AppliedNoDelta
if let Some(dc) = snapshot.default_dc {
pool.default_dc
.store(dc, std::sync::atomic::Ordering::Relaxed);
}
state.config_v4.mark_applied(hash);
}
if let Some((_snapshot, hash)) = ready_v6 {
state.config_v6.mark_applied(hash);
}
state.last_map_apply_at = Some(tokio::time::Instant::now());
if changed {
maps_changed = true;
info!("ME config update applied after stable-gate");
} else { } else {
debug!("ME config stable-gate applied with no map delta"); pool.update_proxy_maps(update_v4, update_v6).await
};
if matches!(
apply_outcome,
super::pool_config::SnapshotApplyOutcome::RejectedEmpty
) {
warn!("ME config stable snapshot rejected (empty endpoint map)");
} else {
if let Some((snapshot, hash)) = ready_v4 {
if let Some(dc) = snapshot.default_dc {
pool.default_dc
.store(dc, std::sync::atomic::Ordering::Relaxed);
}
state.config_v4.mark_applied(hash);
}
if let Some((_snapshot, hash)) = ready_v6 {
state.config_v6.mark_applied(hash);
}
state.last_map_apply_at = Some(tokio::time::Instant::now());
if apply_outcome.changed() {
maps_changed = true;
info!("ME config update applied after stable-gate");
} else {
debug!("ME config stable-gate applied with no map delta");
}
} }
} else if let Some(last) = state.last_map_apply_at { } else if let Some(last) = state.last_map_apply_at {
let wait_secs = map_apply_cooldown_remaining_secs(last, apply_cooldown); let wait_secs = map_apply_cooldown_remaining_secs(last, apply_cooldown);
@ -325,8 +385,7 @@ async fn run_update_cycle(
} }
if maps_changed { if maps_changed {
pool.zero_downtime_reinit_after_map_change(rng.as_ref()) enqueue_reinit_trigger(reinit_tx, MeReinitTrigger::MapChanged);
.await;
} }
pool.reset_stun_state(); pool.reset_stun_state();
@ -367,8 +426,8 @@ async fn run_update_cycle(
pub async fn me_config_updater( pub async fn me_config_updater(
pool: Arc<MePool>, pool: Arc<MePool>,
rng: Arc<SecureRandom>,
mut config_rx: watch::Receiver<Arc<ProxyConfig>>, mut config_rx: watch::Receiver<Arc<ProxyConfig>>,
reinit_tx: mpsc::Sender<MeReinitTrigger>,
) { ) {
let mut state = UpdaterState::default(); let mut state = UpdaterState::default();
let mut update_every_secs = config_rx let mut update_every_secs = config_rx
@ -387,7 +446,7 @@ pub async fn me_config_updater(
tokio::select! { tokio::select! {
_ = &mut sleep => { _ = &mut sleep => {
let cfg = config_rx.borrow().clone(); let cfg = config_rx.borrow().clone();
run_update_cycle(&pool, &rng, cfg.as_ref(), &mut state).await; run_update_cycle(&pool, cfg.as_ref(), &mut state, &reinit_tx).await;
let refreshed_secs = cfg.general.effective_update_every_secs().max(1); let refreshed_secs = cfg.general.effective_update_every_secs().max(1);
if refreshed_secs != update_every_secs { if refreshed_secs != update_every_secs {
info!( info!(
@ -415,6 +474,10 @@ pub async fn me_config_updater(
cfg.general.me_hardswap_warmup_delay_max_ms, cfg.general.me_hardswap_warmup_delay_max_ms,
cfg.general.me_hardswap_warmup_extra_passes, cfg.general.me_hardswap_warmup_extra_passes,
cfg.general.me_hardswap_warmup_pass_backoff_base_ms, cfg.general.me_hardswap_warmup_pass_backoff_base_ms,
cfg.general.me_bind_stale_mode,
cfg.general.me_bind_stale_ttl_secs,
cfg.general.me_secret_atomic_snapshot,
cfg.general.me_deterministic_writer_sort,
); );
let new_secs = cfg.general.effective_update_every_secs().max(1); let new_secs = cfg.general.effective_update_every_secs().max(1);
if new_secs == update_every_secs { if new_secs == update_every_secs {
@ -429,7 +492,7 @@ pub async fn me_config_updater(
); );
update_every_secs = new_secs; update_every_secs = new_secs;
update_every = Duration::from_secs(update_every_secs); update_every = Duration::from_secs(update_every_secs);
run_update_cycle(&pool, &rng, cfg.as_ref(), &mut state).await; run_update_cycle(&pool, cfg.as_ref(), &mut state, &reinit_tx).await;
next_tick = tokio::time::Instant::now() + update_every; next_tick = tokio::time::Instant::now() + update_every;
} else { } else {
info!( info!(

View File

@ -1,4 +1,5 @@
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use socket2::{SockRef, TcpKeepalive}; use socket2::{SockRef, TcpKeepalive};
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
@ -267,7 +268,16 @@ impl MePool {
.unwrap_or_default() .unwrap_or_default()
.as_secs() as u32; .as_secs() as u32;
let ks = self.key_selector().await; let secret_atomic_snapshot = self.secret_atomic_snapshot.load(Ordering::Relaxed);
let (ks, secret) = if secret_atomic_snapshot {
let snapshot = self.secret_snapshot().await;
(snapshot.key_selector, snapshot.secret)
} else {
// Backward-compatible mode: key selector and secret may come from different updates.
let key_selector = self.key_selector().await;
let secret = self.secret_snapshot().await.secret;
(key_selector, secret)
};
let nonce_payload = build_nonce_payload(ks, crypto_ts, &my_nonce); let nonce_payload = build_nonce_payload(ks, crypto_ts, &my_nonce);
let nonce_frame = build_rpc_frame(-2, &nonce_payload, RpcChecksumMode::Crc32); let nonce_frame = build_rpc_frame(-2, &nonce_payload, RpcChecksumMode::Crc32);
let dump = hex_dump(&nonce_frame[..nonce_frame.len().min(44)]); let dump = hex_dump(&nonce_frame[..nonce_frame.len().min(44)]);
@ -357,8 +367,6 @@ impl MePool {
let diag_level: u8 = std::env::var("ME_DIAG").ok().and_then(|v| v.parse().ok()).unwrap_or(0); let diag_level: u8 = std::env::var("ME_DIAG").ok().and_then(|v| v.parse().ok()).unwrap_or(0);
let secret: Vec<u8> = self.proxy_secret.read().await.clone();
let prekey_client = build_middleproxy_prekey( let prekey_client = build_middleproxy_prekey(
&srv_nonce, &srv_nonce,
&my_nonce, &my_nonce,

View File

@ -30,7 +30,7 @@ pub use pool_nat::{stun_probe, detect_public_ip};
pub use registry::ConnRegistry; pub use registry::ConnRegistry;
pub use secret::fetch_proxy_secret; pub use secret::fetch_proxy_secret;
pub use config_updater::{fetch_proxy_config, me_config_updater}; pub use config_updater::{fetch_proxy_config, me_config_updater};
pub use rotation::me_rotation_task; pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task};
pub use wire::proto_flags_for_tag; pub use wire::proto_flags_for_tag;
#[derive(Debug)] #[derive(Debug)]

View File

@ -7,7 +7,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::{Mutex, Notify, RwLock, mpsc}; use tokio::sync::{Mutex, Notify, RwLock, mpsc};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::config::MeSocksKdfPolicy; use crate::config::{MeBindStaleMode, MeSocksKdfPolicy};
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::network::IpFamily; use crate::network::IpFamily;
use crate::network::probe::NetworkDecision; use crate::network::probe::NetworkDecision;
@ -29,6 +29,13 @@ pub struct MeWriter {
pub allow_drain_fallback: Arc<AtomicBool>, pub allow_drain_fallback: Arc<AtomicBool>,
} }
#[derive(Debug, Clone)]
pub struct SecretSnapshot {
pub epoch: u64,
pub key_selector: u32,
pub secret: Vec<u8>,
}
#[allow(dead_code)] #[allow(dead_code)]
pub struct MePool { pub struct MePool {
pub(super) registry: Arc<ConnRegistry>, pub(super) registry: Arc<ConnRegistry>,
@ -38,7 +45,7 @@ pub struct MePool {
pub(super) upstream: Option<Arc<UpstreamManager>>, pub(super) upstream: Option<Arc<UpstreamManager>>,
pub(super) rng: Arc<SecureRandom>, pub(super) rng: Arc<SecureRandom>,
pub(super) proxy_tag: Option<Vec<u8>>, pub(super) proxy_tag: Option<Vec<u8>>,
pub(super) proxy_secret: Arc<RwLock<Vec<u8>>>, pub(super) proxy_secret: Arc<RwLock<SecretSnapshot>>,
pub(super) nat_ip_cfg: Option<IpAddr>, pub(super) nat_ip_cfg: Option<IpAddr>,
pub(super) nat_ip_detected: Arc<RwLock<Option<IpAddr>>>, pub(super) nat_ip_detected: Arc<RwLock<Option<IpAddr>>>,
pub(super) nat_probe: bool, pub(super) nat_probe: bool,
@ -83,6 +90,10 @@ pub struct MePool {
pub(super) me_hardswap_warmup_delay_max_ms: AtomicU64, pub(super) me_hardswap_warmup_delay_max_ms: AtomicU64,
pub(super) me_hardswap_warmup_extra_passes: AtomicU32, pub(super) me_hardswap_warmup_extra_passes: AtomicU32,
pub(super) me_hardswap_warmup_pass_backoff_base_ms: AtomicU64, pub(super) me_hardswap_warmup_pass_backoff_base_ms: AtomicU64,
pub(super) me_bind_stale_mode: AtomicU8,
pub(super) me_bind_stale_ttl_secs: AtomicU64,
pub(super) secret_atomic_snapshot: AtomicBool,
pub(super) me_deterministic_writer_sort: AtomicBool,
pub(super) me_socks_kdf_policy: AtomicU8, pub(super) me_socks_kdf_policy: AtomicU8,
pool_size: usize, pool_size: usize,
} }
@ -147,6 +158,10 @@ impl MePool {
me_hardswap_warmup_delay_max_ms: u64, me_hardswap_warmup_delay_max_ms: u64,
me_hardswap_warmup_extra_passes: u8, me_hardswap_warmup_extra_passes: u8,
me_hardswap_warmup_pass_backoff_base_ms: u64, me_hardswap_warmup_pass_backoff_base_ms: u64,
me_bind_stale_mode: MeBindStaleMode,
me_bind_stale_ttl_secs: u64,
me_secret_atomic_snapshot: bool,
me_deterministic_writer_sort: bool,
me_socks_kdf_policy: MeSocksKdfPolicy, me_socks_kdf_policy: MeSocksKdfPolicy,
me_route_backpressure_base_timeout_ms: u64, me_route_backpressure_base_timeout_ms: u64,
me_route_backpressure_high_timeout_ms: u64, me_route_backpressure_high_timeout_ms: u64,
@ -166,7 +181,20 @@ impl MePool {
upstream, upstream,
rng, rng,
proxy_tag, proxy_tag,
proxy_secret: Arc::new(RwLock::new(proxy_secret)), proxy_secret: Arc::new(RwLock::new(SecretSnapshot {
epoch: 1,
key_selector: if proxy_secret.len() >= 4 {
u32::from_le_bytes([
proxy_secret[0],
proxy_secret[1],
proxy_secret[2],
proxy_secret[3],
])
} else {
0
},
secret: proxy_secret,
})),
nat_ip_cfg: nat_ip, nat_ip_cfg: nat_ip,
nat_ip_detected: Arc::new(RwLock::new(None)), nat_ip_detected: Arc::new(RwLock::new(None)),
nat_probe, nat_probe,
@ -216,6 +244,10 @@ impl MePool {
me_hardswap_warmup_pass_backoff_base_ms: AtomicU64::new( me_hardswap_warmup_pass_backoff_base_ms: AtomicU64::new(
me_hardswap_warmup_pass_backoff_base_ms, me_hardswap_warmup_pass_backoff_base_ms,
), ),
me_bind_stale_mode: AtomicU8::new(me_bind_stale_mode.as_u8()),
me_bind_stale_ttl_secs: AtomicU64::new(me_bind_stale_ttl_secs),
secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot),
me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort),
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
}) })
} }
@ -238,6 +270,10 @@ impl MePool {
hardswap_warmup_delay_max_ms: u64, hardswap_warmup_delay_max_ms: u64,
hardswap_warmup_extra_passes: u8, hardswap_warmup_extra_passes: u8,
hardswap_warmup_pass_backoff_base_ms: u64, hardswap_warmup_pass_backoff_base_ms: u64,
bind_stale_mode: MeBindStaleMode,
bind_stale_ttl_secs: u64,
secret_atomic_snapshot: bool,
deterministic_writer_sort: bool,
) { ) {
self.hardswap.store(hardswap, Ordering::Relaxed); self.hardswap.store(hardswap, Ordering::Relaxed);
self.me_pool_drain_ttl_secs self.me_pool_drain_ttl_secs
@ -254,6 +290,14 @@ impl MePool {
.store(hardswap_warmup_extra_passes as u32, Ordering::Relaxed); .store(hardswap_warmup_extra_passes as u32, Ordering::Relaxed);
self.me_hardswap_warmup_pass_backoff_base_ms self.me_hardswap_warmup_pass_backoff_base_ms
.store(hardswap_warmup_pass_backoff_base_ms, Ordering::Relaxed); .store(hardswap_warmup_pass_backoff_base_ms, Ordering::Relaxed);
self.me_bind_stale_mode
.store(bind_stale_mode.as_u8(), Ordering::Relaxed);
self.me_bind_stale_ttl_secs
.store(bind_stale_ttl_secs, Ordering::Relaxed);
self.secret_atomic_snapshot
.store(secret_atomic_snapshot, Ordering::Relaxed);
self.me_deterministic_writer_sort
.store(deterministic_writer_sort, Ordering::Relaxed);
} }
pub fn reset_stun_state(&self) { pub fn reset_stun_state(&self) {
@ -307,12 +351,15 @@ impl MePool {
} }
pub(super) async fn key_selector(&self) -> u32 { pub(super) async fn key_selector(&self) -> u32 {
let secret = self.proxy_secret.read().await; self.proxy_secret.read().await.key_selector
if secret.len() >= 4 { }
u32::from_le_bytes([secret[0], secret[1], secret[2], secret[3]])
} else { pub(super) async fn secret_snapshot(&self) -> SecretSnapshot {
0 self.proxy_secret.read().await.clone()
} }
pub(super) fn bind_stale_mode(&self) -> MeBindStaleMode {
MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed))
} }
pub(super) fn family_order(&self) -> Vec<IpFamily> { pub(super) fn family_order(&self) -> Vec<IpFamily> {

View File

@ -7,12 +7,29 @@ use tracing::warn;
use super::pool::MePool; use super::pool::MePool;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SnapshotApplyOutcome {
AppliedChanged,
AppliedNoDelta,
RejectedEmpty,
}
impl SnapshotApplyOutcome {
pub fn changed(self) -> bool {
matches!(self, SnapshotApplyOutcome::AppliedChanged)
}
}
impl MePool { impl MePool {
pub async fn update_proxy_maps( pub async fn update_proxy_maps(
&self, &self,
new_v4: HashMap<i32, Vec<(IpAddr, u16)>>, new_v4: HashMap<i32, Vec<(IpAddr, u16)>>,
new_v6: Option<HashMap<i32, Vec<(IpAddr, u16)>>>, new_v6: Option<HashMap<i32, Vec<(IpAddr, u16)>>>,
) -> bool { ) -> SnapshotApplyOutcome {
if new_v4.is_empty() && new_v6.as_ref().is_none_or(|v| v.is_empty()) {
return SnapshotApplyOutcome::RejectedEmpty;
}
let mut changed = false; let mut changed = false;
{ {
let mut guard = self.proxy_map_v4.write().await; let mut guard = self.proxy_map_v4.write().await;
@ -51,7 +68,11 @@ impl MePool {
} }
} }
} }
changed if changed {
SnapshotApplyOutcome::AppliedChanged
} else {
SnapshotApplyOutcome::AppliedNoDelta
}
} }
pub async fn update_secret(self: &Arc<Self>, new_secret: Vec<u8>) -> bool { pub async fn update_secret(self: &Arc<Self>, new_secret: Vec<u8>) -> bool {
@ -60,8 +81,19 @@ impl MePool {
return false; return false;
} }
let mut guard = self.proxy_secret.write().await; let mut guard = self.proxy_secret.write().await;
if *guard != new_secret { if guard.secret != new_secret {
*guard = new_secret; guard.secret = new_secret;
guard.key_selector = if guard.secret.len() >= 4 {
u32::from_le_bytes([
guard.secret[0],
guard.secret[1],
guard.secret[2],
guard.secret[3],
])
} else {
0
};
guard.epoch = guard.epoch.saturating_add(1);
drop(guard); drop(guard);
self.reconnect_all().await; self.reconnect_all().await;
return true; return true;

View File

@ -19,7 +19,7 @@ impl MePool {
me_servers = self.proxy_map_v4.read().await.len(), me_servers = self.proxy_map_v4.read().await.len(),
pool_size, pool_size,
key_selector = format_args!("0x{ks:08x}"), key_selector = format_args!("0x{ks:08x}"),
secret_len = self.proxy_secret.read().await.len(), secret_len = self.proxy_secret.read().await.secret.len(),
"Initializing ME pool" "Initializing ME pool"
); );

View File

@ -9,6 +9,7 @@ use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use crate::config::MeBindStaleMode;
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::protocol::constants::RPC_PING_U32; use crate::protocol::constants::RPC_PING_U32;
@ -42,7 +43,7 @@ impl MePool {
} }
pub(crate) async fn connect_one(self: &Arc<Self>, addr: SocketAddr, rng: &SecureRandom) -> Result<()> { pub(crate) async fn connect_one(self: &Arc<Self>, addr: SocketAddr, rng: &SecureRandom) -> Result<()> {
let secret_len = self.proxy_secret.read().await.len(); let secret_len = self.proxy_secret.read().await.secret.len();
if secret_len < 32 { if secret_len < 32 {
return Err(ProxyError::Proxy("proxy-secret too short for ME auth".into())); return Err(ProxyError::Proxy("proxy-secret too short for ME auth".into()));
} }
@ -351,16 +352,22 @@ impl MePool {
return false; return false;
} }
let ttl_secs = self.me_pool_drain_ttl_secs.load(Ordering::Relaxed); match self.bind_stale_mode() {
if ttl_secs == 0 { MeBindStaleMode::Never => false,
return true; MeBindStaleMode::Always => true,
} MeBindStaleMode::Ttl => {
let ttl_secs = self.me_bind_stale_ttl_secs.load(Ordering::Relaxed);
if ttl_secs == 0 {
return true;
}
let started = writer.draining_started_at_epoch_secs.load(Ordering::Relaxed); let started = writer.draining_started_at_epoch_secs.load(Ordering::Relaxed);
if started == 0 { if started == 0 {
return false; return false;
} }
Self::now_epoch_secs().saturating_sub(started) <= ttl_secs Self::now_epoch_secs().saturating_sub(started) <= ttl_secs
}
}
} }
} }

View File

@ -1,19 +1,111 @@
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::watch; use tokio::sync::{mpsc, watch};
use tracing::{info, warn}; use tracing::{debug, info, warn};
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use super::MePool; use super::MePool;
/// Periodically reinitialize ME generations and swap them after full warmup. #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub async fn me_rotation_task( pub enum MeReinitTrigger {
Periodic,
MapChanged,
}
impl MeReinitTrigger {
fn as_str(self) -> &'static str {
match self {
MeReinitTrigger::Periodic => "periodic",
MeReinitTrigger::MapChanged => "map-change",
}
}
}
pub fn enqueue_reinit_trigger(
tx: &mpsc::Sender<MeReinitTrigger>,
trigger: MeReinitTrigger,
) {
match tx.try_send(trigger) {
Ok(()) => {}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
debug!(trigger = trigger.as_str(), "ME reinit trigger dropped (queue full)");
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
warn!(trigger = trigger.as_str(), "ME reinit trigger dropped (scheduler closed)");
}
}
}
pub async fn me_reinit_scheduler(
pool: Arc<MePool>, pool: Arc<MePool>,
rng: Arc<SecureRandom>, rng: Arc<SecureRandom>,
config_rx: watch::Receiver<Arc<ProxyConfig>>,
mut trigger_rx: mpsc::Receiver<MeReinitTrigger>,
) {
info!("ME reinit scheduler started");
loop {
let Some(first_trigger) = trigger_rx.recv().await else {
warn!("ME reinit scheduler stopped: trigger channel closed");
break;
};
let mut map_change_seen = matches!(first_trigger, MeReinitTrigger::MapChanged);
let mut periodic_seen = matches!(first_trigger, MeReinitTrigger::Periodic);
let cfg = config_rx.borrow().clone();
let coalesce_window = Duration::from_millis(cfg.general.me_reinit_coalesce_window_ms);
if !coalesce_window.is_zero() {
let deadline = tokio::time::Instant::now() + coalesce_window;
loop {
let now = tokio::time::Instant::now();
if now >= deadline {
break;
}
match tokio::time::timeout(deadline - now, trigger_rx.recv()).await {
Ok(Some(next)) => {
if next == MeReinitTrigger::MapChanged {
map_change_seen = true;
} else {
periodic_seen = true;
}
}
Ok(None) => break,
Err(_) => break,
}
}
}
let reason = if map_change_seen && periodic_seen {
"map-change+periodic"
} else if map_change_seen {
"map-change"
} else {
"periodic"
};
if cfg.general.me_reinit_singleflight {
debug!(reason, "ME reinit scheduled (single-flight)");
pool.zero_downtime_reinit_periodic(rng.as_ref()).await;
} else {
debug!(reason, "ME reinit scheduled (concurrent mode)");
let pool_clone = pool.clone();
let rng_clone = rng.clone();
tokio::spawn(async move {
pool_clone
.zero_downtime_reinit_periodic(rng_clone.as_ref())
.await;
});
}
}
}
/// Periodically enqueue reinitialization triggers for ME generations.
pub async fn me_rotation_task(
mut config_rx: watch::Receiver<Arc<ProxyConfig>>, mut config_rx: watch::Receiver<Arc<ProxyConfig>>,
reinit_tx: mpsc::Sender<MeReinitTrigger>,
) { ) {
let mut interval_secs = config_rx let mut interval_secs = config_rx
.borrow() .borrow()
@ -31,7 +123,7 @@ pub async fn me_rotation_task(
tokio::select! { tokio::select! {
_ = &mut sleep => { _ = &mut sleep => {
pool.zero_downtime_reinit_periodic(rng.as_ref()).await; enqueue_reinit_trigger(&reinit_tx, MeReinitTrigger::Periodic);
let refreshed_secs = config_rx let refreshed_secs = config_rx
.borrow() .borrow()
.general .general
@ -70,7 +162,7 @@ pub async fn me_rotation_task(
); );
interval_secs = new_secs; interval_secs = new_secs;
interval = Duration::from_secs(interval_secs); interval = Duration::from_secs(interval_secs);
pool.zero_downtime_reinit_periodic(rng.as_ref()).await; enqueue_reinit_trigger(&reinit_tx, MeReinitTrigger::Periodic);
next_tick = tokio::time::Instant::now() + interval; next_tick = tokio::time::Instant::now() + interval;
} else { } else {
info!( info!(

View File

@ -18,6 +18,7 @@ use rand::seq::SliceRandom;
use super::registry::ConnMeta; use super::registry::ConnMeta;
impl MePool { impl MePool {
/// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default.
pub async fn send_proxy_req( pub async fn send_proxy_req(
self: &Arc<Self>, self: &Arc<Self>,
conn_id: u64, conn_id: u64,
@ -26,13 +27,15 @@ impl MePool {
our_addr: SocketAddr, our_addr: SocketAddr,
data: &[u8], data: &[u8],
proto_flags: u32, proto_flags: u32,
tag_override: Option<&[u8]>,
) -> Result<()> { ) -> Result<()> {
let tag = tag_override.or(self.proxy_tag.as_deref());
let payload = build_proxy_req_payload( let payload = build_proxy_req_payload(
conn_id, conn_id,
client_addr, client_addr,
our_addr, our_addr,
data, data,
self.proxy_tag.as_deref(), tag,
proto_flags, proto_flags,
); );
let meta = ConnMeta { let meta = ConnMeta {
@ -135,12 +138,34 @@ impl MePool {
} }
} }
candidate_indices.sort_by_key(|idx| { if self.me_deterministic_writer_sort.load(Ordering::Relaxed) {
let w = &writers_snapshot[*idx]; candidate_indices.sort_by(|lhs, rhs| {
let degraded = w.degraded.load(Ordering::Relaxed); let left = &writers_snapshot[*lhs];
let stale = (w.generation < self.current_generation()) as usize; let right = &writers_snapshot[*rhs];
(stale, degraded as usize, Reverse(w.tx.capacity())) let left_key = (
}); (left.generation < self.current_generation()) as usize,
left.degraded.load(Ordering::Relaxed) as usize,
Reverse(left.tx.capacity()),
left.addr,
left.id,
);
let right_key = (
(right.generation < self.current_generation()) as usize,
right.degraded.load(Ordering::Relaxed) as usize,
Reverse(right.tx.capacity()),
right.addr,
right.id,
);
left_key.cmp(&right_key)
});
} else {
candidate_indices.sort_by_key(|idx| {
let w = &writers_snapshot[*idx];
let degraded = w.degraded.load(Ordering::Relaxed);
let stale = (w.generation < self.current_generation()) as usize;
(stale, degraded as usize, Reverse(w.tx.capacity()))
});
}
let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len(); let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len();
let mut fallback_blocking_idx: Option<usize> = None; let mut fallback_blocking_idx: Option<usize> = None;