mirror of
https://github.com/telemt/telemt.git
synced 2026-04-16 18:14:10 +03:00
ME Pool improvements
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
@@ -5,15 +5,15 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use httpdate;
|
||||
use tokio::sync::watch;
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::error::Result;
|
||||
|
||||
use super::MePool;
|
||||
use super::rotation::{MeReinitTrigger, enqueue_reinit_trigger};
|
||||
use super::secret::download_proxy_secret_with_max_len;
|
||||
use crate::crypto::SecureRandom;
|
||||
use std::time::SystemTime;
|
||||
|
||||
async fn retry_fetch(url: &str) -> Option<ProxyConfigData> {
|
||||
@@ -38,6 +38,8 @@ async fn retry_fetch(url: &str) -> Option<ProxyConfigData> {
|
||||
pub struct ProxyConfigData {
|
||||
pub map: HashMap<i32, Vec<(IpAddr, u16)>>,
|
||||
pub default_dc: Option<i32>,
|
||||
pub http_status: u16,
|
||||
pub proxy_for_lines: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -172,6 +174,7 @@ pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
|
||||
.await
|
||||
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))?
|
||||
;
|
||||
let http_status = resp.status().as_u16();
|
||||
|
||||
if let Some(date) = resp.headers().get(reqwest::header::DATE)
|
||||
&& let Ok(date_str) = date.to_str()
|
||||
@@ -194,9 +197,11 @@ pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
|
||||
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?;
|
||||
|
||||
let mut map: HashMap<i32, Vec<(IpAddr, u16)>> = HashMap::new();
|
||||
let mut proxy_for_lines: u32 = 0;
|
||||
for line in text.lines() {
|
||||
if let Some((dc, ip, port)) = parse_proxy_line(line) {
|
||||
map.entry(dc).or_default().push((ip, port));
|
||||
proxy_for_lines = proxy_for_lines.saturating_add(1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -214,14 +219,49 @@ pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
|
||||
None
|
||||
});
|
||||
|
||||
Ok(ProxyConfigData { map, default_dc })
|
||||
Ok(ProxyConfigData {
|
||||
map,
|
||||
default_dc,
|
||||
http_status,
|
||||
proxy_for_lines,
|
||||
})
|
||||
}
|
||||
|
||||
fn snapshot_passes_guards(
|
||||
cfg: &ProxyConfig,
|
||||
snapshot: &ProxyConfigData,
|
||||
snapshot_name: &'static str,
|
||||
) -> bool {
|
||||
if cfg.general.me_snapshot_require_http_2xx
|
||||
&& !(200..=299).contains(&snapshot.http_status)
|
||||
{
|
||||
warn!(
|
||||
snapshot = snapshot_name,
|
||||
http_status = snapshot.http_status,
|
||||
"ME snapshot rejected by non-2xx HTTP status"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
let min_proxy_for = cfg.general.me_snapshot_min_proxy_for_lines;
|
||||
if snapshot.proxy_for_lines < min_proxy_for {
|
||||
warn!(
|
||||
snapshot = snapshot_name,
|
||||
parsed_proxy_for_lines = snapshot.proxy_for_lines,
|
||||
min_proxy_for_lines = min_proxy_for,
|
||||
"ME snapshot rejected by proxy_for line floor"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
async fn run_update_cycle(
|
||||
pool: &Arc<MePool>,
|
||||
rng: &Arc<SecureRandom>,
|
||||
cfg: &ProxyConfig,
|
||||
state: &mut UpdaterState,
|
||||
reinit_tx: &mpsc::Sender<MeReinitTrigger>,
|
||||
) {
|
||||
pool.update_runtime_reinit_policy(
|
||||
cfg.general.hardswap,
|
||||
@@ -232,6 +272,10 @@ async fn run_update_cycle(
|
||||
cfg.general.me_hardswap_warmup_delay_max_ms,
|
||||
cfg.general.me_hardswap_warmup_extra_passes,
|
||||
cfg.general.me_hardswap_warmup_pass_backoff_base_ms,
|
||||
cfg.general.me_bind_stale_mode,
|
||||
cfg.general.me_bind_stale_ttl_secs,
|
||||
cfg.general.me_secret_atomic_snapshot,
|
||||
cfg.general.me_deterministic_writer_sort,
|
||||
);
|
||||
|
||||
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
|
||||
@@ -242,44 +286,48 @@ async fn run_update_cycle(
|
||||
let mut ready_v4: Option<(ProxyConfigData, u64)> = None;
|
||||
let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await;
|
||||
if let Some(cfg_v4) = cfg_v4 {
|
||||
let cfg_v4_hash = hash_proxy_config(&cfg_v4);
|
||||
let stable_hits = state.config_v4.observe(cfg_v4_hash);
|
||||
if stable_hits < required_cfg_snapshots {
|
||||
debug!(
|
||||
stable_hits,
|
||||
required_cfg_snapshots,
|
||||
snapshot = format_args!("0x{cfg_v4_hash:016x}"),
|
||||
"ME config v4 candidate observed"
|
||||
);
|
||||
} else if state.config_v4.is_applied(cfg_v4_hash) {
|
||||
debug!(
|
||||
snapshot = format_args!("0x{cfg_v4_hash:016x}"),
|
||||
"ME config v4 stable snapshot already applied"
|
||||
);
|
||||
} else {
|
||||
ready_v4 = Some((cfg_v4, cfg_v4_hash));
|
||||
if snapshot_passes_guards(cfg, &cfg_v4, "getProxyConfig") {
|
||||
let cfg_v4_hash = hash_proxy_config(&cfg_v4);
|
||||
let stable_hits = state.config_v4.observe(cfg_v4_hash);
|
||||
if stable_hits < required_cfg_snapshots {
|
||||
debug!(
|
||||
stable_hits,
|
||||
required_cfg_snapshots,
|
||||
snapshot = format_args!("0x{cfg_v4_hash:016x}"),
|
||||
"ME config v4 candidate observed"
|
||||
);
|
||||
} else if state.config_v4.is_applied(cfg_v4_hash) {
|
||||
debug!(
|
||||
snapshot = format_args!("0x{cfg_v4_hash:016x}"),
|
||||
"ME config v4 stable snapshot already applied"
|
||||
);
|
||||
} else {
|
||||
ready_v4 = Some((cfg_v4, cfg_v4_hash));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut ready_v6: Option<(ProxyConfigData, u64)> = None;
|
||||
let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await;
|
||||
if let Some(cfg_v6) = cfg_v6 {
|
||||
let cfg_v6_hash = hash_proxy_config(&cfg_v6);
|
||||
let stable_hits = state.config_v6.observe(cfg_v6_hash);
|
||||
if stable_hits < required_cfg_snapshots {
|
||||
debug!(
|
||||
stable_hits,
|
||||
required_cfg_snapshots,
|
||||
snapshot = format_args!("0x{cfg_v6_hash:016x}"),
|
||||
"ME config v6 candidate observed"
|
||||
);
|
||||
} else if state.config_v6.is_applied(cfg_v6_hash) {
|
||||
debug!(
|
||||
snapshot = format_args!("0x{cfg_v6_hash:016x}"),
|
||||
"ME config v6 stable snapshot already applied"
|
||||
);
|
||||
} else {
|
||||
ready_v6 = Some((cfg_v6, cfg_v6_hash));
|
||||
if snapshot_passes_guards(cfg, &cfg_v6, "getProxyConfigV6") {
|
||||
let cfg_v6_hash = hash_proxy_config(&cfg_v6);
|
||||
let stable_hits = state.config_v6.observe(cfg_v6_hash);
|
||||
if stable_hits < required_cfg_snapshots {
|
||||
debug!(
|
||||
stable_hits,
|
||||
required_cfg_snapshots,
|
||||
snapshot = format_args!("0x{cfg_v6_hash:016x}"),
|
||||
"ME config v6 candidate observed"
|
||||
);
|
||||
} else if state.config_v6.is_applied(cfg_v6_hash) {
|
||||
debug!(
|
||||
snapshot = format_args!("0x{cfg_v6_hash:016x}"),
|
||||
"ME config v6 stable snapshot already applied"
|
||||
);
|
||||
} else {
|
||||
ready_v6 = Some((cfg_v6, cfg_v6_hash));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -292,28 +340,40 @@ async fn run_update_cycle(
|
||||
let update_v6 = ready_v6
|
||||
.as_ref()
|
||||
.map(|(snapshot, _)| snapshot.map.clone());
|
||||
|
||||
let changed = pool.update_proxy_maps(update_v4, update_v6).await;
|
||||
|
||||
if let Some((snapshot, hash)) = ready_v4 {
|
||||
if let Some(dc) = snapshot.default_dc {
|
||||
pool.default_dc
|
||||
.store(dc, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
state.config_v4.mark_applied(hash);
|
||||
}
|
||||
|
||||
if let Some((_snapshot, hash)) = ready_v6 {
|
||||
state.config_v6.mark_applied(hash);
|
||||
}
|
||||
|
||||
state.last_map_apply_at = Some(tokio::time::Instant::now());
|
||||
|
||||
if changed {
|
||||
maps_changed = true;
|
||||
info!("ME config update applied after stable-gate");
|
||||
let update_is_empty =
|
||||
update_v4.is_empty() && update_v6.as_ref().is_none_or(|v| v.is_empty());
|
||||
let apply_outcome = if update_is_empty && !cfg.general.me_snapshot_reject_empty_map {
|
||||
super::pool_config::SnapshotApplyOutcome::AppliedNoDelta
|
||||
} else {
|
||||
debug!("ME config stable-gate applied with no map delta");
|
||||
pool.update_proxy_maps(update_v4, update_v6).await
|
||||
};
|
||||
|
||||
if matches!(
|
||||
apply_outcome,
|
||||
super::pool_config::SnapshotApplyOutcome::RejectedEmpty
|
||||
) {
|
||||
warn!("ME config stable snapshot rejected (empty endpoint map)");
|
||||
} else {
|
||||
if let Some((snapshot, hash)) = ready_v4 {
|
||||
if let Some(dc) = snapshot.default_dc {
|
||||
pool.default_dc
|
||||
.store(dc, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
state.config_v4.mark_applied(hash);
|
||||
}
|
||||
|
||||
if let Some((_snapshot, hash)) = ready_v6 {
|
||||
state.config_v6.mark_applied(hash);
|
||||
}
|
||||
|
||||
state.last_map_apply_at = Some(tokio::time::Instant::now());
|
||||
|
||||
if apply_outcome.changed() {
|
||||
maps_changed = true;
|
||||
info!("ME config update applied after stable-gate");
|
||||
} else {
|
||||
debug!("ME config stable-gate applied with no map delta");
|
||||
}
|
||||
}
|
||||
} else if let Some(last) = state.last_map_apply_at {
|
||||
let wait_secs = map_apply_cooldown_remaining_secs(last, apply_cooldown);
|
||||
@@ -325,8 +385,7 @@ async fn run_update_cycle(
|
||||
}
|
||||
|
||||
if maps_changed {
|
||||
pool.zero_downtime_reinit_after_map_change(rng.as_ref())
|
||||
.await;
|
||||
enqueue_reinit_trigger(reinit_tx, MeReinitTrigger::MapChanged);
|
||||
}
|
||||
|
||||
pool.reset_stun_state();
|
||||
@@ -367,8 +426,8 @@ async fn run_update_cycle(
|
||||
|
||||
pub async fn me_config_updater(
|
||||
pool: Arc<MePool>,
|
||||
rng: Arc<SecureRandom>,
|
||||
mut config_rx: watch::Receiver<Arc<ProxyConfig>>,
|
||||
reinit_tx: mpsc::Sender<MeReinitTrigger>,
|
||||
) {
|
||||
let mut state = UpdaterState::default();
|
||||
let mut update_every_secs = config_rx
|
||||
@@ -387,7 +446,7 @@ pub async fn me_config_updater(
|
||||
tokio::select! {
|
||||
_ = &mut sleep => {
|
||||
let cfg = config_rx.borrow().clone();
|
||||
run_update_cycle(&pool, &rng, cfg.as_ref(), &mut state).await;
|
||||
run_update_cycle(&pool, cfg.as_ref(), &mut state, &reinit_tx).await;
|
||||
let refreshed_secs = cfg.general.effective_update_every_secs().max(1);
|
||||
if refreshed_secs != update_every_secs {
|
||||
info!(
|
||||
@@ -415,6 +474,10 @@ pub async fn me_config_updater(
|
||||
cfg.general.me_hardswap_warmup_delay_max_ms,
|
||||
cfg.general.me_hardswap_warmup_extra_passes,
|
||||
cfg.general.me_hardswap_warmup_pass_backoff_base_ms,
|
||||
cfg.general.me_bind_stale_mode,
|
||||
cfg.general.me_bind_stale_ttl_secs,
|
||||
cfg.general.me_secret_atomic_snapshot,
|
||||
cfg.general.me_deterministic_writer_sort,
|
||||
);
|
||||
let new_secs = cfg.general.effective_update_every_secs().max(1);
|
||||
if new_secs == update_every_secs {
|
||||
@@ -429,7 +492,7 @@ pub async fn me_config_updater(
|
||||
);
|
||||
update_every_secs = new_secs;
|
||||
update_every = Duration::from_secs(update_every_secs);
|
||||
run_update_cycle(&pool, &rng, cfg.as_ref(), &mut state).await;
|
||||
run_update_cycle(&pool, cfg.as_ref(), &mut state, &reinit_tx).await;
|
||||
next_tick = tokio::time::Instant::now() + update_every;
|
||||
} else {
|
||||
info!(
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::{Duration, Instant};
|
||||
use socket2::{SockRef, TcpKeepalive};
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -267,7 +268,16 @@ impl MePool {
|
||||
.unwrap_or_default()
|
||||
.as_secs() as u32;
|
||||
|
||||
let ks = self.key_selector().await;
|
||||
let secret_atomic_snapshot = self.secret_atomic_snapshot.load(Ordering::Relaxed);
|
||||
let (ks, secret) = if secret_atomic_snapshot {
|
||||
let snapshot = self.secret_snapshot().await;
|
||||
(snapshot.key_selector, snapshot.secret)
|
||||
} else {
|
||||
// Backward-compatible mode: key selector and secret may come from different updates.
|
||||
let key_selector = self.key_selector().await;
|
||||
let secret = self.secret_snapshot().await.secret;
|
||||
(key_selector, secret)
|
||||
};
|
||||
let nonce_payload = build_nonce_payload(ks, crypto_ts, &my_nonce);
|
||||
let nonce_frame = build_rpc_frame(-2, &nonce_payload, RpcChecksumMode::Crc32);
|
||||
let dump = hex_dump(&nonce_frame[..nonce_frame.len().min(44)]);
|
||||
@@ -357,8 +367,6 @@ impl MePool {
|
||||
|
||||
let diag_level: u8 = std::env::var("ME_DIAG").ok().and_then(|v| v.parse().ok()).unwrap_or(0);
|
||||
|
||||
let secret: Vec<u8> = self.proxy_secret.read().await.clone();
|
||||
|
||||
let prekey_client = build_middleproxy_prekey(
|
||||
&srv_nonce,
|
||||
&my_nonce,
|
||||
|
||||
@@ -30,7 +30,7 @@ pub use pool_nat::{stun_probe, detect_public_ip};
|
||||
pub use registry::ConnRegistry;
|
||||
pub use secret::fetch_proxy_secret;
|
||||
pub use config_updater::{fetch_proxy_config, me_config_updater};
|
||||
pub use rotation::me_rotation_task;
|
||||
pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task};
|
||||
pub use wire::proto_flags_for_tag;
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
use tokio::sync::{Mutex, Notify, RwLock, mpsc};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::config::MeSocksKdfPolicy;
|
||||
use crate::config::{MeBindStaleMode, MeSocksKdfPolicy};
|
||||
use crate::crypto::SecureRandom;
|
||||
use crate::network::IpFamily;
|
||||
use crate::network::probe::NetworkDecision;
|
||||
@@ -29,6 +29,13 @@ pub struct MeWriter {
|
||||
pub allow_drain_fallback: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SecretSnapshot {
|
||||
pub epoch: u64,
|
||||
pub key_selector: u32,
|
||||
pub secret: Vec<u8>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct MePool {
|
||||
pub(super) registry: Arc<ConnRegistry>,
|
||||
@@ -38,7 +45,7 @@ pub struct MePool {
|
||||
pub(super) upstream: Option<Arc<UpstreamManager>>,
|
||||
pub(super) rng: Arc<SecureRandom>,
|
||||
pub(super) proxy_tag: Option<Vec<u8>>,
|
||||
pub(super) proxy_secret: Arc<RwLock<Vec<u8>>>,
|
||||
pub(super) proxy_secret: Arc<RwLock<SecretSnapshot>>,
|
||||
pub(super) nat_ip_cfg: Option<IpAddr>,
|
||||
pub(super) nat_ip_detected: Arc<RwLock<Option<IpAddr>>>,
|
||||
pub(super) nat_probe: bool,
|
||||
@@ -83,6 +90,10 @@ pub struct MePool {
|
||||
pub(super) me_hardswap_warmup_delay_max_ms: AtomicU64,
|
||||
pub(super) me_hardswap_warmup_extra_passes: AtomicU32,
|
||||
pub(super) me_hardswap_warmup_pass_backoff_base_ms: AtomicU64,
|
||||
pub(super) me_bind_stale_mode: AtomicU8,
|
||||
pub(super) me_bind_stale_ttl_secs: AtomicU64,
|
||||
pub(super) secret_atomic_snapshot: AtomicBool,
|
||||
pub(super) me_deterministic_writer_sort: AtomicBool,
|
||||
pub(super) me_socks_kdf_policy: AtomicU8,
|
||||
pool_size: usize,
|
||||
}
|
||||
@@ -147,6 +158,10 @@ impl MePool {
|
||||
me_hardswap_warmup_delay_max_ms: u64,
|
||||
me_hardswap_warmup_extra_passes: u8,
|
||||
me_hardswap_warmup_pass_backoff_base_ms: u64,
|
||||
me_bind_stale_mode: MeBindStaleMode,
|
||||
me_bind_stale_ttl_secs: u64,
|
||||
me_secret_atomic_snapshot: bool,
|
||||
me_deterministic_writer_sort: bool,
|
||||
me_socks_kdf_policy: MeSocksKdfPolicy,
|
||||
me_route_backpressure_base_timeout_ms: u64,
|
||||
me_route_backpressure_high_timeout_ms: u64,
|
||||
@@ -166,7 +181,20 @@ impl MePool {
|
||||
upstream,
|
||||
rng,
|
||||
proxy_tag,
|
||||
proxy_secret: Arc::new(RwLock::new(proxy_secret)),
|
||||
proxy_secret: Arc::new(RwLock::new(SecretSnapshot {
|
||||
epoch: 1,
|
||||
key_selector: if proxy_secret.len() >= 4 {
|
||||
u32::from_le_bytes([
|
||||
proxy_secret[0],
|
||||
proxy_secret[1],
|
||||
proxy_secret[2],
|
||||
proxy_secret[3],
|
||||
])
|
||||
} else {
|
||||
0
|
||||
},
|
||||
secret: proxy_secret,
|
||||
})),
|
||||
nat_ip_cfg: nat_ip,
|
||||
nat_ip_detected: Arc::new(RwLock::new(None)),
|
||||
nat_probe,
|
||||
@@ -216,6 +244,10 @@ impl MePool {
|
||||
me_hardswap_warmup_pass_backoff_base_ms: AtomicU64::new(
|
||||
me_hardswap_warmup_pass_backoff_base_ms,
|
||||
),
|
||||
me_bind_stale_mode: AtomicU8::new(me_bind_stale_mode.as_u8()),
|
||||
me_bind_stale_ttl_secs: AtomicU64::new(me_bind_stale_ttl_secs),
|
||||
secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot),
|
||||
me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort),
|
||||
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
|
||||
})
|
||||
}
|
||||
@@ -238,6 +270,10 @@ impl MePool {
|
||||
hardswap_warmup_delay_max_ms: u64,
|
||||
hardswap_warmup_extra_passes: u8,
|
||||
hardswap_warmup_pass_backoff_base_ms: u64,
|
||||
bind_stale_mode: MeBindStaleMode,
|
||||
bind_stale_ttl_secs: u64,
|
||||
secret_atomic_snapshot: bool,
|
||||
deterministic_writer_sort: bool,
|
||||
) {
|
||||
self.hardswap.store(hardswap, Ordering::Relaxed);
|
||||
self.me_pool_drain_ttl_secs
|
||||
@@ -254,6 +290,14 @@ impl MePool {
|
||||
.store(hardswap_warmup_extra_passes as u32, Ordering::Relaxed);
|
||||
self.me_hardswap_warmup_pass_backoff_base_ms
|
||||
.store(hardswap_warmup_pass_backoff_base_ms, Ordering::Relaxed);
|
||||
self.me_bind_stale_mode
|
||||
.store(bind_stale_mode.as_u8(), Ordering::Relaxed);
|
||||
self.me_bind_stale_ttl_secs
|
||||
.store(bind_stale_ttl_secs, Ordering::Relaxed);
|
||||
self.secret_atomic_snapshot
|
||||
.store(secret_atomic_snapshot, Ordering::Relaxed);
|
||||
self.me_deterministic_writer_sort
|
||||
.store(deterministic_writer_sort, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn reset_stun_state(&self) {
|
||||
@@ -307,12 +351,15 @@ impl MePool {
|
||||
}
|
||||
|
||||
pub(super) async fn key_selector(&self) -> u32 {
|
||||
let secret = self.proxy_secret.read().await;
|
||||
if secret.len() >= 4 {
|
||||
u32::from_le_bytes([secret[0], secret[1], secret[2], secret[3]])
|
||||
} else {
|
||||
0
|
||||
}
|
||||
self.proxy_secret.read().await.key_selector
|
||||
}
|
||||
|
||||
pub(super) async fn secret_snapshot(&self) -> SecretSnapshot {
|
||||
self.proxy_secret.read().await.clone()
|
||||
}
|
||||
|
||||
pub(super) fn bind_stale_mode(&self) -> MeBindStaleMode {
|
||||
MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed))
|
||||
}
|
||||
|
||||
pub(super) fn family_order(&self) -> Vec<IpFamily> {
|
||||
|
||||
@@ -7,12 +7,29 @@ use tracing::warn;
|
||||
|
||||
use super::pool::MePool;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum SnapshotApplyOutcome {
|
||||
AppliedChanged,
|
||||
AppliedNoDelta,
|
||||
RejectedEmpty,
|
||||
}
|
||||
|
||||
impl SnapshotApplyOutcome {
|
||||
pub fn changed(self) -> bool {
|
||||
matches!(self, SnapshotApplyOutcome::AppliedChanged)
|
||||
}
|
||||
}
|
||||
|
||||
impl MePool {
|
||||
pub async fn update_proxy_maps(
|
||||
&self,
|
||||
new_v4: HashMap<i32, Vec<(IpAddr, u16)>>,
|
||||
new_v6: Option<HashMap<i32, Vec<(IpAddr, u16)>>>,
|
||||
) -> bool {
|
||||
) -> SnapshotApplyOutcome {
|
||||
if new_v4.is_empty() && new_v6.as_ref().is_none_or(|v| v.is_empty()) {
|
||||
return SnapshotApplyOutcome::RejectedEmpty;
|
||||
}
|
||||
|
||||
let mut changed = false;
|
||||
{
|
||||
let mut guard = self.proxy_map_v4.write().await;
|
||||
@@ -51,7 +68,11 @@ impl MePool {
|
||||
}
|
||||
}
|
||||
}
|
||||
changed
|
||||
if changed {
|
||||
SnapshotApplyOutcome::AppliedChanged
|
||||
} else {
|
||||
SnapshotApplyOutcome::AppliedNoDelta
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn update_secret(self: &Arc<Self>, new_secret: Vec<u8>) -> bool {
|
||||
@@ -60,8 +81,19 @@ impl MePool {
|
||||
return false;
|
||||
}
|
||||
let mut guard = self.proxy_secret.write().await;
|
||||
if *guard != new_secret {
|
||||
*guard = new_secret;
|
||||
if guard.secret != new_secret {
|
||||
guard.secret = new_secret;
|
||||
guard.key_selector = if guard.secret.len() >= 4 {
|
||||
u32::from_le_bytes([
|
||||
guard.secret[0],
|
||||
guard.secret[1],
|
||||
guard.secret[2],
|
||||
guard.secret[3],
|
||||
])
|
||||
} else {
|
||||
0
|
||||
};
|
||||
guard.epoch = guard.epoch.saturating_add(1);
|
||||
drop(guard);
|
||||
self.reconnect_all().await;
|
||||
return true;
|
||||
|
||||
@@ -19,7 +19,7 @@ impl MePool {
|
||||
me_servers = self.proxy_map_v4.read().await.len(),
|
||||
pool_size,
|
||||
key_selector = format_args!("0x{ks:08x}"),
|
||||
secret_len = self.proxy_secret.read().await.len(),
|
||||
secret_len = self.proxy_secret.read().await.secret.len(),
|
||||
"Initializing ME pool"
|
||||
);
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ use tokio::sync::mpsc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::config::MeBindStaleMode;
|
||||
use crate::crypto::SecureRandom;
|
||||
use crate::error::{ProxyError, Result};
|
||||
use crate::protocol::constants::RPC_PING_U32;
|
||||
@@ -42,7 +43,7 @@ impl MePool {
|
||||
}
|
||||
|
||||
pub(crate) async fn connect_one(self: &Arc<Self>, addr: SocketAddr, rng: &SecureRandom) -> Result<()> {
|
||||
let secret_len = self.proxy_secret.read().await.len();
|
||||
let secret_len = self.proxy_secret.read().await.secret.len();
|
||||
if secret_len < 32 {
|
||||
return Err(ProxyError::Proxy("proxy-secret too short for ME auth".into()));
|
||||
}
|
||||
@@ -351,16 +352,22 @@ impl MePool {
|
||||
return false;
|
||||
}
|
||||
|
||||
let ttl_secs = self.me_pool_drain_ttl_secs.load(Ordering::Relaxed);
|
||||
if ttl_secs == 0 {
|
||||
return true;
|
||||
}
|
||||
match self.bind_stale_mode() {
|
||||
MeBindStaleMode::Never => false,
|
||||
MeBindStaleMode::Always => true,
|
||||
MeBindStaleMode::Ttl => {
|
||||
let ttl_secs = self.me_bind_stale_ttl_secs.load(Ordering::Relaxed);
|
||||
if ttl_secs == 0 {
|
||||
return true;
|
||||
}
|
||||
|
||||
let started = writer.draining_started_at_epoch_secs.load(Ordering::Relaxed);
|
||||
if started == 0 {
|
||||
return false;
|
||||
}
|
||||
let started = writer.draining_started_at_epoch_secs.load(Ordering::Relaxed);
|
||||
if started == 0 {
|
||||
return false;
|
||||
}
|
||||
|
||||
Self::now_epoch_secs().saturating_sub(started) <= ttl_secs
|
||||
Self::now_epoch_secs().saturating_sub(started) <= ttl_secs
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,19 +1,111 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::sync::watch;
|
||||
use tracing::{info, warn};
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::crypto::SecureRandom;
|
||||
|
||||
use super::MePool;
|
||||
|
||||
/// Periodically reinitialize ME generations and swap them after full warmup.
|
||||
pub async fn me_rotation_task(
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum MeReinitTrigger {
|
||||
Periodic,
|
||||
MapChanged,
|
||||
}
|
||||
|
||||
impl MeReinitTrigger {
|
||||
fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
MeReinitTrigger::Periodic => "periodic",
|
||||
MeReinitTrigger::MapChanged => "map-change",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn enqueue_reinit_trigger(
|
||||
tx: &mpsc::Sender<MeReinitTrigger>,
|
||||
trigger: MeReinitTrigger,
|
||||
) {
|
||||
match tx.try_send(trigger) {
|
||||
Ok(()) => {}
|
||||
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
|
||||
debug!(trigger = trigger.as_str(), "ME reinit trigger dropped (queue full)");
|
||||
}
|
||||
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
|
||||
warn!(trigger = trigger.as_str(), "ME reinit trigger dropped (scheduler closed)");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn me_reinit_scheduler(
|
||||
pool: Arc<MePool>,
|
||||
rng: Arc<SecureRandom>,
|
||||
config_rx: watch::Receiver<Arc<ProxyConfig>>,
|
||||
mut trigger_rx: mpsc::Receiver<MeReinitTrigger>,
|
||||
) {
|
||||
info!("ME reinit scheduler started");
|
||||
loop {
|
||||
let Some(first_trigger) = trigger_rx.recv().await else {
|
||||
warn!("ME reinit scheduler stopped: trigger channel closed");
|
||||
break;
|
||||
};
|
||||
|
||||
let mut map_change_seen = matches!(first_trigger, MeReinitTrigger::MapChanged);
|
||||
let mut periodic_seen = matches!(first_trigger, MeReinitTrigger::Periodic);
|
||||
let cfg = config_rx.borrow().clone();
|
||||
let coalesce_window = Duration::from_millis(cfg.general.me_reinit_coalesce_window_ms);
|
||||
if !coalesce_window.is_zero() {
|
||||
let deadline = tokio::time::Instant::now() + coalesce_window;
|
||||
loop {
|
||||
let now = tokio::time::Instant::now();
|
||||
if now >= deadline {
|
||||
break;
|
||||
}
|
||||
match tokio::time::timeout(deadline - now, trigger_rx.recv()).await {
|
||||
Ok(Some(next)) => {
|
||||
if next == MeReinitTrigger::MapChanged {
|
||||
map_change_seen = true;
|
||||
} else {
|
||||
periodic_seen = true;
|
||||
}
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let reason = if map_change_seen && periodic_seen {
|
||||
"map-change+periodic"
|
||||
} else if map_change_seen {
|
||||
"map-change"
|
||||
} else {
|
||||
"periodic"
|
||||
};
|
||||
|
||||
if cfg.general.me_reinit_singleflight {
|
||||
debug!(reason, "ME reinit scheduled (single-flight)");
|
||||
pool.zero_downtime_reinit_periodic(rng.as_ref()).await;
|
||||
} else {
|
||||
debug!(reason, "ME reinit scheduled (concurrent mode)");
|
||||
let pool_clone = pool.clone();
|
||||
let rng_clone = rng.clone();
|
||||
tokio::spawn(async move {
|
||||
pool_clone
|
||||
.zero_downtime_reinit_periodic(rng_clone.as_ref())
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/// Periodically enqueue reinitialization triggers for ME generations.
|
||||
pub async fn me_rotation_task(
|
||||
mut config_rx: watch::Receiver<Arc<ProxyConfig>>,
|
||||
reinit_tx: mpsc::Sender<MeReinitTrigger>,
|
||||
) {
|
||||
let mut interval_secs = config_rx
|
||||
.borrow()
|
||||
@@ -31,7 +123,7 @@ pub async fn me_rotation_task(
|
||||
|
||||
tokio::select! {
|
||||
_ = &mut sleep => {
|
||||
pool.zero_downtime_reinit_periodic(rng.as_ref()).await;
|
||||
enqueue_reinit_trigger(&reinit_tx, MeReinitTrigger::Periodic);
|
||||
let refreshed_secs = config_rx
|
||||
.borrow()
|
||||
.general
|
||||
@@ -70,7 +162,7 @@ pub async fn me_rotation_task(
|
||||
);
|
||||
interval_secs = new_secs;
|
||||
interval = Duration::from_secs(interval_secs);
|
||||
pool.zero_downtime_reinit_periodic(rng.as_ref()).await;
|
||||
enqueue_reinit_trigger(&reinit_tx, MeReinitTrigger::Periodic);
|
||||
next_tick = tokio::time::Instant::now() + interval;
|
||||
} else {
|
||||
info!(
|
||||
|
||||
@@ -138,12 +138,34 @@ impl MePool {
|
||||
}
|
||||
}
|
||||
|
||||
candidate_indices.sort_by_key(|idx| {
|
||||
let w = &writers_snapshot[*idx];
|
||||
let degraded = w.degraded.load(Ordering::Relaxed);
|
||||
let stale = (w.generation < self.current_generation()) as usize;
|
||||
(stale, degraded as usize, Reverse(w.tx.capacity()))
|
||||
});
|
||||
if self.me_deterministic_writer_sort.load(Ordering::Relaxed) {
|
||||
candidate_indices.sort_by(|lhs, rhs| {
|
||||
let left = &writers_snapshot[*lhs];
|
||||
let right = &writers_snapshot[*rhs];
|
||||
let left_key = (
|
||||
(left.generation < self.current_generation()) as usize,
|
||||
left.degraded.load(Ordering::Relaxed) as usize,
|
||||
Reverse(left.tx.capacity()),
|
||||
left.addr,
|
||||
left.id,
|
||||
);
|
||||
let right_key = (
|
||||
(right.generation < self.current_generation()) as usize,
|
||||
right.degraded.load(Ordering::Relaxed) as usize,
|
||||
Reverse(right.tx.capacity()),
|
||||
right.addr,
|
||||
right.id,
|
||||
);
|
||||
left_key.cmp(&right_key)
|
||||
});
|
||||
} else {
|
||||
candidate_indices.sort_by_key(|idx| {
|
||||
let w = &writers_snapshot[*idx];
|
||||
let degraded = w.degraded.load(Ordering::Relaxed);
|
||||
let stale = (w.generation < self.current_generation()) as usize;
|
||||
(stale, degraded as usize, Reverse(w.tx.capacity()))
|
||||
});
|
||||
}
|
||||
|
||||
let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len();
|
||||
let mut fallback_blocking_idx: Option<usize> = None;
|
||||
|
||||
Reference in New Issue
Block a user