This commit is contained in:
Alexey
2026-03-21 15:45:29 +03:00
parent 7a8f946029
commit d7bbb376c9
154 changed files with 6194 additions and 3775 deletions

View File

@@ -1,5 +1,5 @@
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use bytes::Bytes;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::crypto::{AesCbc, crc32, crc32c};
use crate::error::{ProxyError, Result};

View File

@@ -96,32 +96,36 @@ pub async fn save_proxy_config_cache(path: &str, raw_text: &str) -> Result<()> {
}
pub async fn fetch_proxy_config_with_raw(url: &str) -> Result<(ProxyConfigData, String)> {
let resp = reqwest::get(url)
.await
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))?
;
let resp = reqwest::get(url).await.map_err(|e| {
crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}"))
})?;
let http_status = resp.status().as_u16();
if let Some(date) = resp.headers().get(reqwest::header::DATE)
&& let Ok(date_str) = date.to_str()
&& let Ok(server_time) = httpdate::parse_http_date(date_str)
&& let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| {
server_time.duration_since(SystemTime::now()).map_err(|_| e)
})
&& let Ok(skew) = SystemTime::now()
.duration_since(server_time)
.or_else(|e| server_time.duration_since(SystemTime::now()).map_err(|_| e))
{
let skew_secs = skew.as_secs();
record_timeskew_sample("proxy_config_date_header", skew_secs);
if skew_secs > 60 {
warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header");
warn!(
skew_secs,
"Time skew >60s detected from fetch_proxy_config Date header"
);
} else if skew_secs > 30 {
warn!(skew_secs, "Time skew >30s detected from fetch_proxy_config Date header");
warn!(
skew_secs,
"Time skew >30s detected from fetch_proxy_config Date header"
);
}
}
let text = resp
.text()
.await
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?;
let text = resp.text().await.map_err(|e| {
crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}"))
})?;
let parsed = parse_proxy_config_text(&text, http_status);
Ok((parsed, text))
}
@@ -165,8 +169,11 @@ fn hash_proxy_config(cfg: &ProxyConfigData) -> u64 {
let mut hasher = DefaultHasher::new();
cfg.default_dc.hash(&mut hasher);
let mut by_dc: Vec<(i32, Vec<(IpAddr, u16)>)> =
cfg.map.iter().map(|(dc, addrs)| (*dc, addrs.clone())).collect();
let mut by_dc: Vec<(i32, Vec<(IpAddr, u16)>)> = cfg
.map
.iter()
.map(|(dc, addrs)| (*dc, addrs.clone()))
.collect();
by_dc.sort_by_key(|(dc, _)| *dc);
for (dc, mut addrs) in by_dc {
dc.hash(&mut hasher);
@@ -264,9 +271,7 @@ fn snapshot_passes_guards(
snapshot: &ProxyConfigData,
snapshot_name: &'static str,
) -> bool {
if cfg.general.me_snapshot_require_http_2xx
&& !(200..=299).contains(&snapshot.http_status)
{
if cfg.general.me_snapshot_require_http_2xx && !(200..=299).contains(&snapshot.http_status) {
warn!(
snapshot = snapshot_name,
http_status = snapshot.http_status,
@@ -330,8 +335,10 @@ async fn run_update_cycle(
cfg.general.me_adaptive_floor_recover_grace_secs,
cfg.general.me_adaptive_floor_writers_per_core_total,
cfg.general.me_adaptive_floor_cpu_cores_override,
cfg.general.me_adaptive_floor_max_extra_writers_single_per_core,
cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core,
cfg.general
.me_adaptive_floor_max_extra_writers_single_per_core,
cfg.general
.me_adaptive_floor_max_extra_writers_multi_per_core,
cfg.general.me_adaptive_floor_max_active_writers_per_core,
cfg.general.me_adaptive_floor_max_warm_writers_per_core,
cfg.general.me_adaptive_floor_max_active_writers_global,
@@ -400,9 +407,7 @@ async fn run_update_cycle(
.as_ref()
.map(|(snapshot, _)| snapshot.map.clone())
.unwrap_or_default();
let update_v6 = ready_v6
.as_ref()
.map(|(snapshot, _)| snapshot.map.clone());
let update_v6 = ready_v6.as_ref().map(|(snapshot, _)| snapshot.map.clone());
let update_is_empty =
update_v4.is_empty() && update_v6.as_ref().is_none_or(|v| v.is_empty());
let apply_outcome = if update_is_empty && !cfg.general.me_snapshot_reject_empty_map {
@@ -440,10 +445,7 @@ async fn run_update_cycle(
}
} else if let Some(last) = state.last_map_apply_at {
let wait_secs = map_apply_cooldown_remaining_secs(last, apply_cooldown);
debug!(
wait_secs,
"ME config stable snapshot deferred by cooldown"
);
debug!(wait_secs, "ME config stable snapshot deferred by cooldown");
}
}

View File

@@ -1,19 +1,19 @@
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use socket2::{SockRef, TcpKeepalive};
#[cfg(target_os = "linux")]
use libc;
use socket2::{SockRef, TcpKeepalive};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::net::{IpAddr, SocketAddr};
#[cfg(target_os = "linux")]
use std::os::fd::{AsRawFd, RawFd};
#[cfg(target_os = "linux")]
use std::os::raw::c_int;
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
use bytes::BytesMut;
use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
use tokio::net::{TcpStream, TcpSocket};
use tokio::net::{TcpSocket, TcpStream};
use tokio::time::timeout;
use tracing::{debug, info, warn};
@@ -28,14 +28,16 @@ use crate::protocol::constants::{
};
use crate::transport::{UpstreamEgressInfo, UpstreamRouteKind};
use super::MePool;
use super::codec::{
RpcChecksumMode, build_handshake_payload, build_nonce_payload, build_rpc_frame,
cbc_decrypt_inplace, cbc_encrypt_padded, parse_handshake_flags, parse_nonce_payload,
read_rpc_frame_plaintext, rpc_crc,
};
use super::selftest::{BndAddrStatus, BndPortStatus, record_bnd_status, record_upstream_bnd_status};
use super::wire::{extract_ip_material, IpMaterial};
use super::MePool;
use super::selftest::{
BndAddrStatus, BndPortStatus, record_bnd_status, record_upstream_bnd_status,
};
use super::wire::{IpMaterial, extract_ip_material};
const ME_KDF_DRIFT_STRICT: bool = false;
@@ -168,11 +170,15 @@ impl MePool {
} else {
match sock.connect(addr).await {
Ok(stream) => return Ok(stream),
Err(e) => debug!(error = %e, target = %addr, "ME IPv6 bound connect failed, retrying default connect"),
Err(e) => {
debug!(error = %e, target = %addr, "ME IPv6 bound connect failed, retrying default connect")
}
}
}
}
Err(e) => debug!(error = %e, "ME IPv6 socket creation failed, falling back to default connect"),
Err(e) => {
debug!(error = %e, "ME IPv6 socket creation failed, falling back to default connect")
}
}
}
TcpStream::connect(addr).await
@@ -315,7 +321,8 @@ impl MePool {
};
let local_addr_nat = self.translate_our_addr_with_reflection(local_addr, reflected);
let peer_addr_nat = SocketAddr::new(self.translate_ip_for_nat(peer_addr.ip()), peer_addr.port());
let peer_addr_nat =
SocketAddr::new(self.translate_ip_for_nat(peer_addr.ip()), peer_addr.port());
if let Some(upstream_info) = upstream_egress {
let client_ip_for_kdf = socks_bound_addr
.map(|value| value.ip())
@@ -367,12 +374,17 @@ impl MePool {
.map_err(|_| ProxyError::TgHandshakeTimeout)??;
if srv_seq != -2 {
return Err(ProxyError::InvalidHandshake(format!("Expected seq=-2, got {srv_seq}")));
return Err(ProxyError::InvalidHandshake(format!(
"Expected seq=-2, got {srv_seq}"
)));
}
let (srv_key_select, schema, srv_ts, srv_nonce) = parse_nonce_payload(&srv_nonce_payload)?;
if schema != RPC_CRYPTO_AES_U32 {
warn!(schema = format_args!("0x{schema:08x}"), "Unsupported ME crypto schema");
warn!(
schema = format_args!("0x{schema:08x}"),
"Unsupported ME crypto schema"
);
return Err(ProxyError::InvalidHandshake(format!(
"Unsupported crypto schema: 0x{schema:x}"
)));
@@ -423,8 +435,7 @@ impl MePool {
let kdf_fingerprint_guard = self.kdf_material_fingerprint.read().await;
kdf_fingerprint_guard.get(&peer_addr_nat).copied()
};
if let Some((prev_fingerprint, prev_client_port)) = previous_kdf_fingerprint
{
if let Some((prev_fingerprint, prev_client_port)) = previous_kdf_fingerprint {
if prev_fingerprint != kdf_fingerprint {
self.stats.increment_me_kdf_drift_total();
warn!(
@@ -461,24 +472,29 @@ impl MePool {
let server_ip = extract_ip_material(peer_addr_nat);
let client_ip = extract_ip_material(local_addr_nat);
let (srv_ip_opt, clt_ip_opt, clt_v6_opt, srv_v6_opt, hs_our_ip, hs_peer_ip) = match (server_ip, client_ip) {
(IpMaterial::V4(mut srv), IpMaterial::V4(mut clt)) => {
srv.reverse();
clt.reverse();
(Some(srv), Some(clt), None, None, clt, srv)
}
(IpMaterial::V6(srv), IpMaterial::V6(clt)) => {
let zero = [0u8; 4];
(None, None, Some(clt), Some(srv), zero, zero)
}
_ => {
return Err(ProxyError::InvalidHandshake(
"mixed IPv4/IPv6 endpoints are not supported for ME key derivation".to_string(),
));
}
};
let (srv_ip_opt, clt_ip_opt, clt_v6_opt, srv_v6_opt, hs_our_ip, hs_peer_ip) =
match (server_ip, client_ip) {
(IpMaterial::V4(mut srv), IpMaterial::V4(mut clt)) => {
srv.reverse();
clt.reverse();
(Some(srv), Some(clt), None, None, clt, srv)
}
(IpMaterial::V6(srv), IpMaterial::V6(clt)) => {
let zero = [0u8; 4];
(None, None, Some(clt), Some(srv), zero, zero)
}
_ => {
return Err(ProxyError::InvalidHandshake(
"mixed IPv4/IPv6 endpoints are not supported for ME key derivation"
.to_string(),
));
}
};
let diag_level: u8 = std::env::var("ME_DIAG").ok().and_then(|v| v.parse().ok()).unwrap_or(0);
let diag_level: u8 = std::env::var("ME_DIAG")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(0);
let prekey_client = build_middleproxy_prekey(
&srv_nonce,

View File

@@ -132,7 +132,9 @@ pub(super) async fn reap_draining_writers(
) {
let now_epoch_secs = MePool::now_epoch_secs();
let now = Instant::now();
let drain_ttl_secs = pool.me_pool_drain_ttl_secs.load(std::sync::atomic::Ordering::Relaxed);
let drain_ttl_secs = pool
.me_pool_drain_ttl_secs
.load(std::sync::atomic::Ordering::Relaxed);
let drain_threshold = pool
.me_pool_drain_threshold
.load(std::sync::atomic::Ordering::Relaxed);
@@ -175,7 +177,9 @@ pub(super) async fn reap_draining_writers(
drop(writers);
let overflow = if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize {
draining_writers.len().saturating_sub(drain_threshold as usize)
draining_writers
.len()
.saturating_sub(drain_threshold as usize)
} else {
0
};
@@ -220,7 +224,8 @@ pub(super) async fn reap_draining_writers(
"ME draining writer remains non-empty past drain TTL"
);
}
if writer.drain_deadline_epoch_secs != 0 && now_epoch_secs >= writer.drain_deadline_epoch_secs
if writer.drain_deadline_epoch_secs != 0
&& now_epoch_secs >= writer.drain_deadline_epoch_secs
{
warn!(writer_id = writer.id, "Drain timeout, force-closing");
force_close_writer_ids.push(writer.id);
@@ -367,9 +372,13 @@ async fn check_family(
let mut live_addr_counts = HashMap::<(i32, SocketAddr), usize>::new();
let mut live_writer_ids_by_addr = HashMap::<(i32, SocketAddr), Vec<u64>>::new();
for writer in pool.writers.read().await.iter().filter(|w| {
!w.draining.load(std::sync::atomic::Ordering::Relaxed)
}) {
for writer in pool
.writers
.read()
.await
.iter()
.filter(|w| !w.draining.load(std::sync::atomic::Ordering::Relaxed))
{
if !matches!(
super::pool::WriterContour::from_u8(
writer.contour.load(std::sync::atomic::Ordering::Relaxed),
@@ -628,8 +637,11 @@ async fn check_family(
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
next_attempt.insert(key, now + wait);
} else {
let curr = *backoff.get(&key).unwrap_or(&(pool.me_reconnect_backoff_base.as_millis() as u64));
let next_ms = (curr.saturating_mul(2)).min(pool.me_reconnect_backoff_cap.as_millis() as u64);
let curr = *backoff
.get(&key)
.unwrap_or(&(pool.me_reconnect_backoff_base.as_millis() as u64));
let next_ms =
(curr.saturating_mul(2)).min(pool.me_reconnect_backoff_cap.as_millis() as u64);
backoff.insert(key, next_ms);
let jitter = next_ms / JITTER_FRAC_NUM;
let wait = Duration::from_millis(next_ms)
@@ -637,12 +649,7 @@ async fn check_family(
next_attempt.insert(key, now + wait);
if pool.is_runtime_ready() {
let warn_cooldown = pool.warn_rate_limit_duration();
if should_emit_rate_limited_warn(
floor_warn_next_allowed,
key,
now,
warn_cooldown,
) {
if should_emit_rate_limited_warn(floor_warn_next_allowed, key, now, warn_cooldown) {
warn!(
dc = %dc,
?family,
@@ -802,7 +809,12 @@ async fn build_family_floor_plan(
let target_required = desired_raw.clamp(min_required, max_required);
let alive = endpoints
.iter()
.map(|endpoint| live_addr_counts.get(&(*dc, *endpoint)).copied().unwrap_or(0))
.map(|endpoint| {
live_addr_counts
.get(&(*dc, *endpoint))
.copied()
.unwrap_or(0)
})
.sum::<usize>();
family_active_total = family_active_total.saturating_add(alive);
let writer_ids = list_writer_ids_for_endpoints(*dc, endpoints, live_writer_ids_by_addr);
@@ -1395,7 +1407,8 @@ async fn maybe_rotate_single_endpoint_shadow(
pool.mark_writer_draining_with_timeout(old_writer_id, pool.force_close_timeout(), false)
.await;
pool.stats.increment_me_single_endpoint_shadow_rotate_total();
pool.stats
.increment_me_single_endpoint_shadow_rotate_total();
shadow_rotate_deadline.insert(key, now + interval);
info!(
dc = %dc,
@@ -1442,11 +1455,9 @@ pub async fn me_zombie_writer_watchdog(pool: Arc<MePool>) {
// Phase 1: collect zombie IDs under a short read-lock with timeout.
let zombie_ids_with_meta: Vec<(u64, bool)> = {
let Ok(ws) = tokio::time::timeout(
Duration::from_secs(LOCK_TIMEOUT_SECS),
pool.writers.read(),
)
.await
let Ok(ws) =
tokio::time::timeout(Duration::from_secs(LOCK_TIMEOUT_SECS), pool.writers.read())
.await
else {
warn!("zombie_watchdog: writers read-lock timeout, skipping tick");
continue;
@@ -1510,11 +1521,7 @@ pub async fn me_zombie_writer_watchdog(pool: Arc<MePool>) {
Ok(()) => {
removal_timeout_streak.remove(writer_id);
pool.stats.increment_pool_force_close_total();
info!(
writer_id,
had_clients,
"Zombie writer removed by watchdog"
);
info!(writer_id, had_clients, "Zombie writer removed by watchdog");
}
Err(_) => {
let streak = removal_timeout_streak
@@ -1542,8 +1549,7 @@ pub async fn me_zombie_writer_watchdog(pool: Arc<MePool>) {
pool.stats.increment_pool_force_close_total();
info!(
writer_id,
had_clients,
"Zombie writer hard-detached after repeated timeouts"
had_clients, "Zombie writer hard-detached after repeated timeouts"
);
}
Ok(false) => {
@@ -1557,8 +1563,7 @@ pub async fn me_zombie_writer_watchdog(pool: Arc<MePool>) {
Err(_) => {
warn!(
writer_id,
had_clients,
"Zombie hard-detach timed out, will retry next tick"
had_clients, "Zombie hard-detach timed out, will retry next tick"
);
}
}
@@ -1593,10 +1598,7 @@ mod tests {
..GeneralConfig::default()
};
let mut proxy_map_v4 = HashMap::new();
proxy_map_v4.insert(
2,
vec![(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 10)), 443)],
);
proxy_map_v4.insert(2, vec![(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 10)), 443)]);
let decision = NetworkDecision {
ipv4_me: true,
..NetworkDecision::default()
@@ -1737,7 +1739,12 @@ mod tests {
let writer = MeWriter {
id: writer_id,
addr: SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(203, 0, 113, (writer_id as u8).saturating_add(1))),
IpAddr::V4(Ipv4Addr::new(
203,
0,
113,
(writer_id as u8).saturating_add(1),
)),
4000 + writer_id as u16,
),
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
@@ -1771,12 +1778,24 @@ mod tests {
reap_draining_writers(&pool, &mut warn_next_allowed).await;
let mut writer_ids: Vec<u64> = pool.writers.read().await.iter().map(|writer| writer.id).collect();
let mut writer_ids: Vec<u64> = pool
.writers
.read()
.await
.iter()
.map(|writer| writer.id)
.collect();
writer_ids.sort_unstable();
assert_eq!(writer_ids, vec![1, 20, 30]);
assert!(pool.registry.get_writer(conn_a).await.is_none());
assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20);
assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30);
assert_eq!(
pool.registry.get_writer(conn_b).await.unwrap().writer_id,
20
);
assert_eq!(
pool.registry.get_writer(conn_c).await.unwrap().writer_id,
30
);
}
#[tokio::test]
@@ -1790,12 +1809,24 @@ mod tests {
reap_draining_writers(&pool, &mut warn_next_allowed).await;
let mut writer_ids: Vec<u64> = pool.writers.read().await.iter().map(|writer| writer.id).collect();
let mut writer_ids: Vec<u64> = pool
.writers
.read()
.await
.iter()
.map(|writer| writer.id)
.collect();
writer_ids.sort_unstable();
assert_eq!(writer_ids, vec![20, 30]);
assert!(pool.registry.get_writer(conn_a).await.is_none());
assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20);
assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30);
assert_eq!(
pool.registry.get_writer(conn_b).await.unwrap().writer_id,
20
);
assert_eq!(
pool.registry.get_writer(conn_c).await.unwrap().writer_id,
30
);
}
#[tokio::test]
@@ -1809,10 +1840,25 @@ mod tests {
reap_draining_writers(&pool, &mut warn_next_allowed).await;
let writer_ids: Vec<u64> = pool.writers.read().await.iter().map(|writer| writer.id).collect();
let writer_ids: Vec<u64> = pool
.writers
.read()
.await
.iter()
.map(|writer| writer.id)
.collect();
assert_eq!(writer_ids, vec![10, 20, 30]);
assert_eq!(pool.registry.get_writer(conn_a).await.unwrap().writer_id, 10);
assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20);
assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30);
assert_eq!(
pool.registry.get_writer(conn_a).await.unwrap().writer_id,
10
);
assert_eq!(
pool.registry.get_writer(conn_b).await.unwrap().writer_id,
20
);
assert_eq!(
pool.registry.get_writer(conn_c).await.unwrap().writer_id,
30
);
}
}

View File

@@ -4,61 +4,61 @@ mod codec;
mod config_updater;
mod handshake;
mod health;
#[cfg(test)]
#[path = "tests/health_adversarial_tests.rs"]
mod health_adversarial_tests;
#[cfg(test)]
#[path = "tests/health_integration_tests.rs"]
mod health_integration_tests;
#[cfg(test)]
#[path = "tests/health_regression_tests.rs"]
mod health_regression_tests;
mod ping;
mod pool;
mod pool_config;
mod pool_init;
mod pool_nat;
mod pool_refill;
mod pool_reinit;
mod pool_runtime_api;
mod pool_writer;
mod ping;
mod reader;
mod registry;
mod rotation;
mod send;
mod secret;
mod selftest;
mod wire;
mod pool_status;
#[cfg(test)]
#[path = "tests/health_regression_tests.rs"]
mod health_regression_tests;
#[cfg(test)]
#[path = "tests/health_integration_tests.rs"]
mod health_integration_tests;
#[cfg(test)]
#[path = "tests/health_adversarial_tests.rs"]
mod health_adversarial_tests;
#[cfg(test)]
#[path = "tests/send_adversarial_tests.rs"]
mod send_adversarial_tests;
#[cfg(test)]
#[path = "tests/pool_writer_security_tests.rs"]
mod pool_writer_security_tests;
#[cfg(test)]
#[path = "tests/pool_refill_security_tests.rs"]
mod pool_refill_security_tests;
mod pool_reinit;
mod pool_runtime_api;
mod pool_status;
mod pool_writer;
#[cfg(test)]
#[path = "tests/pool_writer_security_tests.rs"]
mod pool_writer_security_tests;
mod reader;
mod registry;
mod rotation;
mod secret;
mod selftest;
mod send;
#[cfg(test)]
#[path = "tests/send_adversarial_tests.rs"]
mod send_adversarial_tests;
mod wire;
use bytes::Bytes;
pub use health::{me_drain_timeout_enforcer, me_health_monitor, me_zombie_writer_watchdog};
#[allow(unused_imports)]
pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily};
pub use pool::MePool;
#[allow(unused_imports)]
pub use pool_nat::{stun_probe, detect_public_ip};
pub use registry::ConnRegistry;
pub use secret::fetch_proxy_secret;
#[allow(unused_imports)]
pub use config_updater::{
ProxyConfigData, fetch_proxy_config, fetch_proxy_config_with_raw, load_proxy_config_cache,
me_config_updater, save_proxy_config_cache,
};
pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task};
pub(crate) use selftest::{
bnd_snapshot, timeskew_snapshot, upstream_bnd_snapshots,
pub use health::{me_drain_timeout_enforcer, me_health_monitor, me_zombie_writer_watchdog};
#[allow(unused_imports)]
pub use ping::{
MePingFamily, MePingReport, MePingSample, format_me_route, format_sample_line, run_me_ping,
};
pub use pool::MePool;
#[allow(unused_imports)]
pub use pool_nat::{detect_public_ip, stun_probe};
pub use registry::ConnRegistry;
pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task};
pub use secret::fetch_proxy_secret;
pub(crate) use selftest::{bnd_snapshot, timeskew_snapshot, upstream_bnd_snapshots};
pub use wire::proto_flags_for_tag;
#[derive(Debug)]

View File

@@ -1,7 +1,9 @@
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU8, AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::sync::atomic::{
AtomicBool, AtomicI32, AtomicU8, AtomicU32, AtomicU64, AtomicUsize, Ordering,
};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::{Mutex, Notify, RwLock, mpsc};
@@ -547,7 +549,9 @@ impl MePool {
me_instadrain: AtomicBool::new(me_instadrain),
me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold),
me_pool_drain_soft_evict_enabled: AtomicBool::new(me_pool_drain_soft_evict_enabled),
me_pool_drain_soft_evict_grace_secs: AtomicU64::new(me_pool_drain_soft_evict_grace_secs),
me_pool_drain_soft_evict_grace_secs: AtomicU64::new(
me_pool_drain_soft_evict_grace_secs,
),
me_pool_drain_soft_evict_per_writer: AtomicU8::new(
me_pool_drain_soft_evict_per_writer.max(1),
),
@@ -751,10 +755,7 @@ impl MePool {
}
pub(crate) fn last_drain_gate_block_reason(&self) -> MeDrainGateReason {
MeDrainGateReason::from_u8(
self.me_last_drain_gate_block_reason
.load(Ordering::Relaxed),
)
MeDrainGateReason::from_u8(self.me_last_drain_gate_block_reason.load(Ordering::Relaxed))
}
pub(crate) fn last_drain_gate_updated_at_epoch_secs(&self) -> u64 {
@@ -873,14 +874,18 @@ impl MePool {
.store(floor_mode.as_u8(), Ordering::Relaxed);
self.me_adaptive_floor_idle_secs
.store(adaptive_floor_idle_secs, Ordering::Relaxed);
self.me_adaptive_floor_min_writers_single_endpoint
.store(adaptive_floor_min_writers_single_endpoint, Ordering::Relaxed);
self.me_adaptive_floor_min_writers_single_endpoint.store(
adaptive_floor_min_writers_single_endpoint,
Ordering::Relaxed,
);
self.me_adaptive_floor_min_writers_multi_endpoint
.store(adaptive_floor_min_writers_multi_endpoint, Ordering::Relaxed);
self.me_adaptive_floor_recover_grace_secs
.store(adaptive_floor_recover_grace_secs, Ordering::Relaxed);
self.me_adaptive_floor_writers_per_core_total
.store(adaptive_floor_writers_per_core_total as u32, Ordering::Relaxed);
self.me_adaptive_floor_writers_per_core_total.store(
adaptive_floor_writers_per_core_total as u32,
Ordering::Relaxed,
);
self.me_adaptive_floor_cpu_cores_override
.store(adaptive_floor_cpu_cores_override as u32, Ordering::Relaxed);
self.me_adaptive_floor_max_extra_writers_single_per_core
@@ -893,16 +898,14 @@ impl MePool {
adaptive_floor_max_extra_writers_multi_per_core as u32,
Ordering::Relaxed,
);
self.me_adaptive_floor_max_active_writers_per_core
.store(
adaptive_floor_max_active_writers_per_core as u32,
Ordering::Relaxed,
);
self.me_adaptive_floor_max_warm_writers_per_core
.store(
adaptive_floor_max_warm_writers_per_core as u32,
Ordering::Relaxed,
);
self.me_adaptive_floor_max_active_writers_per_core.store(
adaptive_floor_max_active_writers_per_core as u32,
Ordering::Relaxed,
);
self.me_adaptive_floor_max_warm_writers_per_core.store(
adaptive_floor_max_warm_writers_per_core as u32,
Ordering::Relaxed,
);
self.me_adaptive_floor_max_active_writers_global
.store(adaptive_floor_max_active_writers_global, Ordering::Relaxed);
self.me_adaptive_floor_max_warm_writers_global
@@ -1571,11 +1574,19 @@ impl MePool {
}
pub(super) fn health_interval_unhealthy(&self) -> Duration {
Duration::from_millis(self.me_health_interval_ms_unhealthy.load(Ordering::Relaxed).max(1))
Duration::from_millis(
self.me_health_interval_ms_unhealthy
.load(Ordering::Relaxed)
.max(1),
)
}
pub(super) fn health_interval_healthy(&self) -> Duration {
Duration::from_millis(self.me_health_interval_ms_healthy.load(Ordering::Relaxed).max(1))
Duration::from_millis(
self.me_health_interval_ms_healthy
.load(Ordering::Relaxed)
.max(1),
)
}
pub(super) fn warn_rate_limit_duration(&self) -> Duration {

View File

@@ -83,7 +83,10 @@ impl MePool {
pub async fn update_secret(self: &Arc<Self>, new_secret: Vec<u8>) -> bool {
if new_secret.len() < 32 {
warn!(len = new_secret.len(), "proxy-secret update ignored (too short)");
warn!(
len = new_secret.len(),
"proxy-secret update ignored (too short)"
);
return false;
}
let mut guard = self.proxy_secret.write().await;

View File

@@ -84,7 +84,11 @@ impl MePool {
.iter()
.map(|(ip, port)| SocketAddr::new(*ip, *port))
.collect();
if self.active_writer_count_for_dc_endpoints(*dc, &endpoints).await == 0 {
if self
.active_writer_count_for_dc_endpoints(*dc, &endpoints)
.await
== 0
{
missing_dcs.push(*dc);
}
}
@@ -104,7 +108,8 @@ impl MePool {
if addrs.len() <= 1 {
continue;
}
let target_writers = pool.required_writers_for_dc_with_floor_mode(addrs.len(), false);
let target_writers =
pool.required_writers_for_dc_with_floor_mode(addrs.len(), false);
let pool_clone = Arc::clone(&pool);
let rng_clone_local = Arc::clone(&rng_clone);
join_bg.spawn(async move {
@@ -246,8 +251,8 @@ impl MePool {
}
if self.me_warmup_stagger_enabled {
let jitter = rand::rng()
.random_range(0..=self.me_warmup_step_jitter.as_millis() as u64);
let jitter =
rand::rng().random_range(0..=self.me_warmup_step_jitter.as_millis() as u64);
let delay_ms = self.me_warmup_step_delay.as_millis() as u64 + jitter;
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
}

View File

@@ -8,7 +8,7 @@ use tracing::{debug, info, warn};
use crate::error::{ProxyError, Result};
use crate::network::probe::is_bogon;
use crate::network::stun::{stun_probe_dual, stun_probe_family_with_bind, IpFamily};
use crate::network::stun::{IpFamily, stun_probe_dual, stun_probe_family_with_bind};
use super::MePool;
use std::time::Instant;
@@ -24,14 +24,20 @@ pub async fn stun_probe(stun_addr: Option<String>) -> Result<crate::network::stu
.unwrap_or_default()
});
if stun_addr.is_empty() {
return Err(ProxyError::Proxy("STUN server is not configured".to_string()));
return Err(ProxyError::Proxy(
"STUN server is not configured".to_string(),
));
}
stun_probe_dual(&stun_addr).await
}
#[allow(dead_code)]
pub async fn detect_public_ip() -> Option<IpAddr> {
fetch_public_ipv4_with_retry().await.ok().flatten().map(IpAddr::V4)
fetch_public_ipv4_with_retry()
.await
.ok()
.flatten()
.map(IpAddr::V4)
}
impl MePool {
@@ -141,9 +147,7 @@ impl MePool {
match (ip, nat_ip) {
(IpAddr::V4(src), IpAddr::V4(dst))
if is_bogon(IpAddr::V4(src))
|| src.is_loopback()
|| src.is_unspecified() =>
if is_bogon(IpAddr::V4(src)) || src.is_loopback() || src.is_unspecified() =>
{
IpAddr::V4(dst)
}
@@ -240,9 +244,7 @@ impl MePool {
return None;
}
if use_shared_cache
&& let Ok(mut cache) = self.nat_reflection_cache.try_lock()
{
if use_shared_cache && let Ok(mut cache) = self.nat_reflection_cache.try_lock() {
let slot = match family {
IpFamily::V4 => &mut cache.v4,
IpFamily::V6 => &mut cache.v6,
@@ -277,9 +279,7 @@ impl MePool {
return None;
}
if use_shared_cache
&& let Ok(mut cache) = self.nat_reflection_cache.try_lock()
{
if use_shared_cache && let Ok(mut cache) = self.nat_reflection_cache.try_lock() {
let slot = match family {
IpFamily::V4 => &mut cache.v4,
IpFamily::V6 => &mut cache.v6,
@@ -292,7 +292,8 @@ impl MePool {
}
let attempt = if use_shared_cache {
self.nat_probe_attempts.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
self.nat_probe_attempts
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
} else {
0
};
@@ -308,7 +309,10 @@ impl MePool {
.probe_stun_batch_for_family(&primary_servers, family, attempt, bind_ip)
.await;
if selected_reflected.is_none() && !configured_servers.is_empty() && primary_servers != configured_servers {
if selected_reflected.is_none()
&& !configured_servers.is_empty()
&& primary_servers != configured_servers
{
let (rediscovered_live, rediscovered_reflected) = self
.probe_stun_batch_for_family(&configured_servers, family, attempt, bind_ip)
.await;
@@ -325,7 +329,8 @@ impl MePool {
if let Some(reflected_addr) = selected_reflected {
if use_shared_cache {
self.nat_probe_attempts.store(0, std::sync::atomic::Ordering::Relaxed);
self.nat_probe_attempts
.store(0, std::sync::atomic::Ordering::Relaxed);
}
info!(
family = ?family,
@@ -333,9 +338,7 @@ impl MePool {
"STUN-Quorum reached, IP: {}",
reflected_addr.ip()
);
if use_shared_cache
&& let Ok(mut cache) = self.nat_reflection_cache.try_lock()
{
if use_shared_cache && let Ok(mut cache) = self.nat_reflection_cache.try_lock() {
let slot = match family {
IpFamily::V4 => &mut cache.v4,
IpFamily::V6 => &mut cache.v6,
@@ -368,13 +371,14 @@ async fn fetch_public_ipv4_with_retry() -> Result<Option<Ipv4Addr>> {
}
async fn fetch_public_ipv4_once(url: &str) -> Result<Option<Ipv4Addr>> {
let res = reqwest::get(url).await.map_err(|e| {
ProxyError::Proxy(format!("public IP detection request failed: {e}"))
})?;
let res = reqwest::get(url)
.await
.map_err(|e| ProxyError::Proxy(format!("public IP detection request failed: {e}")))?;
let text = res.text().await.map_err(|e| {
ProxyError::Proxy(format!("public IP detection read failed: {e}"))
})?;
let text = res
.text()
.await
.map_err(|e| ProxyError::Proxy(format!("public IP detection read failed: {e}")))?;
let ip = text.trim().parse().ok();
Ok(ip)

View File

@@ -142,9 +142,7 @@ impl MePool {
continue;
}
if !matches!(
super::pool::WriterContour::from_u8(
writer.contour.load(Ordering::Relaxed),
),
super::pool::WriterContour::from_u8(writer.contour.load(Ordering::Relaxed),),
super::pool::WriterContour::Active
) {
continue;
@@ -154,7 +152,8 @@ impl MePool {
}
}
drop(ws);
candidates.sort_by_key(|addr| (active_by_endpoint.get(addr).copied().unwrap_or(0), *addr));
candidates
.sort_by_key(|addr| (active_by_endpoint.get(addr).copied().unwrap_or(0), *addr));
}
let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % candidates.len();
for offset in 0..candidates.len() {
@@ -212,10 +211,14 @@ impl MePool {
if !same_endpoint_quarantined {
for attempt in 0..fast_retries {
self.stats.increment_me_reconnect_attempt();
match self.connect_one_for_dc(addr, writer_dc, self.rng.as_ref()).await {
match self
.connect_one_for_dc(addr, writer_dc, self.rng.as_ref())
.await
{
Ok(()) => {
self.stats.increment_me_reconnect_success();
self.stats.increment_me_writer_restored_same_endpoint_total();
self.stats
.increment_me_writer_restored_same_endpoint_total();
info!(
%addr,
attempt = attempt + 1,
@@ -267,7 +270,11 @@ impl MePool {
false
}
pub(crate) fn trigger_immediate_refill_for_dc(self: &Arc<Self>, addr: SocketAddr, writer_dc: i32) {
pub(crate) fn trigger_immediate_refill_for_dc(
self: &Arc<Self>,
addr: SocketAddr,
writer_dc: i32,
) {
let endpoint_key = RefillEndpointKey {
dc: writer_dc,
addr,

View File

@@ -7,8 +7,8 @@ use std::time::Duration;
use rand::RngExt;
use rand::seq::SliceRandom;
use tracing::{debug, info, warn};
use std::collections::hash_map::DefaultHasher;
use tracing::{debug, info, warn};
use crate::crypto::SecureRandom;
use crate::network::IpFamily;
@@ -104,7 +104,11 @@ impl MePool {
.map(|(ip, port)| SocketAddr::new(*ip, *port))
.collect();
let dc_endpoints: HashSet<SocketAddr> = dc_addrs.iter().copied().collect();
if self.active_writer_count_for_dc_endpoints(*dc, &dc_endpoints).await == 0 {
if self
.active_writer_count_for_dc_endpoints(*dc, &dc_endpoints)
.await
== 0
{
let mut shuffled = dc_addrs.clone();
shuffled.shuffle(&mut rand::rng());
for addr in shuffled {
@@ -373,7 +377,8 @@ impl MePool {
.load(Ordering::Relaxed);
let pending_map_hash = self.pending_hardswap_map_hash.load(Ordering::Relaxed);
let pending_age_secs = now_epoch_secs.saturating_sub(pending_started_at);
let pending_ttl_expired = pending_started_at > 0 && pending_age_secs > ME_HARDSWAP_PENDING_TTL_SECS;
let pending_ttl_expired =
pending_started_at > 0 && pending_age_secs > ME_HARDSWAP_PENDING_TTL_SECS;
let pending_matches_map = pending_map_hash != 0 && pending_map_hash == desired_map_hash;
if pending_generation != 0
@@ -407,7 +412,8 @@ impl MePool {
.store(now_epoch_secs, Ordering::Relaxed);
self.pending_hardswap_map_hash
.store(desired_map_hash, Ordering::Relaxed);
self.warm_generation.store(next_generation, Ordering::Relaxed);
self.warm_generation
.store(next_generation, Ordering::Relaxed);
next_generation
}
} else {
@@ -433,7 +439,8 @@ impl MePool {
self.me_pool_min_fresh_ratio_permille
.load(Ordering::Relaxed),
);
let (coverage_ratio, missing_dc) = Self::coverage_ratio(&desired_by_dc, &active_writer_addrs);
let (coverage_ratio, missing_dc) =
Self::coverage_ratio(&desired_by_dc, &active_writer_addrs);
let mut route_quorum_ok = coverage_ratio >= min_ratio;
let mut redundancy_ok = missing_dc.is_empty();
let mut redundancy_missing_dc = missing_dc.clone();
@@ -487,7 +494,12 @@ impl MePool {
}
}
self.set_last_drain_gate(route_quorum_ok, redundancy_ok, MeDrainGateReason::Open, now_epoch_secs);
self.set_last_drain_gate(
route_quorum_ok,
redundancy_ok,
MeDrainGateReason::Open,
now_epoch_secs,
);
if !redundancy_ok {
warn!(
missing_dc = ?redundancy_missing_dc,
@@ -541,9 +553,7 @@ impl MePool {
"ME reinit cycle covered; processing stale writers"
);
self.stats.increment_pool_swap_total();
let can_drop_with_replacement = self
.has_non_draining_writer_per_desired_dc_group()
.await;
let can_drop_with_replacement = self.has_non_draining_writer_per_desired_dc_group().await;
if can_drop_with_replacement {
info!(
stale_writers = stale_writer_ids.len(),

View File

@@ -157,7 +157,8 @@ impl MePool {
},
state: state.as_str(),
state_since_epoch_secs: self.family_runtime_state_since_epoch_secs(family),
suppressed_until_epoch_secs: (suppressed_until != 0).then_some(suppressed_until),
suppressed_until_epoch_secs: (suppressed_until != 0)
.then_some(suppressed_until),
fail_streak: self.family_fail_streak(family),
recover_success_streak: self.family_recover_success_streak(family),
}

View File

@@ -277,9 +277,8 @@ impl MePool {
let drain_started_at_epoch_secs = writer
.draining_started_at_epoch_secs
.load(Ordering::Relaxed);
let drain_deadline_epoch_secs = writer
.drain_deadline_epoch_secs
.load(Ordering::Relaxed);
let drain_deadline_epoch_secs =
writer.drain_deadline_epoch_secs.load(Ordering::Relaxed);
let drain_started_at_epoch_secs =
(drain_started_at_epoch_secs != 0).then_some(drain_started_at_epoch_secs);
let drain_deadline_epoch_secs =
@@ -371,9 +370,10 @@ impl MePool {
self.me_adaptive_floor_max_extra_writers_multi_per_core
.load(Ordering::Relaxed) as usize
};
let floor_max = base_required.saturating_add(adaptive_cpu_cores.saturating_mul(extra_per_core));
let floor_capped = matches!(floor_mode, MeFloorMode::Adaptive)
&& dc_required_writers < base_required;
let floor_max =
base_required.saturating_add(adaptive_cpu_cores.saturating_mul(extra_per_core));
let floor_capped =
matches!(floor_mode, MeFloorMode::Adaptive) && dc_required_writers < base_required;
let dc_alive_writers = live_writers_by_dc.get(&dc).copied().unwrap_or(0);
let dc_fresh_alive_writers = fresh_writers_by_dc.get(&dc).copied().unwrap_or(0);
let dc_load = activity
@@ -440,8 +440,8 @@ impl MePool {
let pending_started_at = self
.pending_hardswap_started_at_epoch_secs
.load(Ordering::Relaxed);
let pending_hardswap_age_secs = (pending_started_at > 0)
.then_some(now_epoch_secs.saturating_sub(pending_started_at));
let pending_hardswap_age_secs =
(pending_started_at > 0).then_some(now_epoch_secs.saturating_sub(pending_started_at));
let mut quarantined_endpoints = Vec::<MeApiQuarantinedEndpointSnapshot>::new();
{
@@ -503,16 +503,20 @@ impl MePool {
.load(Ordering::Relaxed) as u16,
adaptive_floor_max_extra_writers_single_per_core: self
.me_adaptive_floor_max_extra_writers_single_per_core
.load(Ordering::Relaxed) as u16,
.load(Ordering::Relaxed)
as u16,
adaptive_floor_max_extra_writers_multi_per_core: self
.me_adaptive_floor_max_extra_writers_multi_per_core
.load(Ordering::Relaxed) as u16,
.load(Ordering::Relaxed)
as u16,
adaptive_floor_max_active_writers_per_core: self
.me_adaptive_floor_max_active_writers_per_core
.load(Ordering::Relaxed) as u16,
.load(Ordering::Relaxed)
as u16,
adaptive_floor_max_warm_writers_per_core: self
.me_adaptive_floor_max_warm_writers_per_core
.load(Ordering::Relaxed) as u16,
.load(Ordering::Relaxed)
as u16,
adaptive_floor_max_active_writers_global: self
.me_adaptive_floor_max_active_writers_global
.load(Ordering::Relaxed),
@@ -564,7 +568,8 @@ impl MePool {
me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed),
me_pool_force_close_secs: self.me_pool_force_close_secs.load(Ordering::Relaxed),
me_pool_min_fresh_ratio: Self::permille_to_ratio(
self.me_pool_min_fresh_ratio_permille.load(Ordering::Relaxed),
self.me_pool_min_fresh_ratio_permille
.load(Ordering::Relaxed),
),
me_bind_stale_mode: bind_stale_mode_label(self.bind_stale_mode()),
me_bind_stale_ttl_secs: self.me_bind_stale_ttl_secs.load(Ordering::Relaxed),
@@ -586,9 +591,7 @@ impl MePool {
me_single_endpoint_shadow_rotate_every_secs: self
.me_single_endpoint_shadow_rotate_every_secs
.load(Ordering::Relaxed),
me_deterministic_writer_sort: self
.me_deterministic_writer_sort
.load(Ordering::Relaxed),
me_deterministic_writer_sort: self.me_deterministic_writer_sort.load(Ordering::Relaxed),
me_writer_pick_mode: writer_pick_mode_label(self.writer_pick_mode()),
me_writer_pick_sample_size: self.writer_pick_sample_size() as u8,
me_socks_kdf_policy: socks_kdf_policy_label(self.socks_kdf_policy()),

View File

@@ -1,8 +1,8 @@
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use std::io::ErrorKind;
use bytes::Bytes;
use bytes::BytesMut;
@@ -40,7 +40,10 @@ impl MePool {
pub(crate) async fn prune_closed_writers(self: &Arc<Self>) {
let closed_writer_ids: Vec<u64> = {
let ws = self.writers.read().await;
ws.iter().filter(|w| w.tx.is_closed()).map(|w| w.id).collect()
ws.iter()
.filter(|w| w.tx.is_closed())
.map(|w| w.id)
.collect()
};
if closed_writer_ids.is_empty() {
return;
@@ -88,12 +91,7 @@ impl MePool {
writer_dc: i32,
) -> Result<()> {
self.connect_one_with_generation_contour_for_dc_with_cap_policy(
addr,
rng,
generation,
contour,
writer_dc,
false,
addr, rng, generation, contour, writer_dc, false,
)
.await
}
@@ -118,12 +116,16 @@ impl MePool {
let secret_len = self.proxy_secret.read().await.secret.len();
if secret_len < 32 {
return Err(ProxyError::Proxy("proxy-secret too short for ME auth".into()));
return Err(ProxyError::Proxy(
"proxy-secret too short for ME auth".into(),
));
}
let dc_idx = i16::try_from(writer_dc).ok();
let (stream, _connect_ms, upstream_egress) = self.connect_tcp(addr, dc_idx).await?;
let hs = self.handshake_only(stream, addr, upstream_egress, rng).await?;
let hs = self
.handshake_only(stream, addr, upstream_egress, rng)
.await?;
let writer_id = self.next_writer_id.fetch_add(1, Ordering::Relaxed);
let contour = Arc::new(AtomicU8::new(contour.as_u8()));
@@ -293,7 +295,8 @@ impl MePool {
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))
} else {
let jitter = rand::rng().random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
let jitter = rand::rng()
.random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
Duration::from_secs(wait)
};
@@ -312,10 +315,15 @@ impl MePool {
break;
}
let jitter_cap_ms = interval.as_millis() / 2;
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))
let effective_jitter_ms =
keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
interval
+ Duration::from_millis(
rand::rng().random_range(0..=effective_jitter_ms as u64),
)
} else {
let jitter = rand::rng().random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
let jitter = rand::rng()
.random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
let secs = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
Duration::from_secs(secs)
};
@@ -415,7 +423,10 @@ impl MePool {
.as_millis()
.min(jitter_cap_ms)
.max(1);
interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))
interval
+ Duration::from_millis(
rand::rng().random_range(0..=effective_jitter_ms as u64),
)
};
tokio::select! {
@@ -596,9 +607,7 @@ impl MePool {
// Quarantine flapping endpoints regardless of draining state.
self.maybe_quarantine_flapping_endpoint(addr, uptime).await;
}
if trigger_refill
&& let Some(writer_dc) = removed_dc
{
if trigger_refill && let Some(writer_dc) = removed_dc {
self.trigger_immediate_refill_for_dc(addr, writer_dc);
}
}
@@ -646,9 +655,7 @@ impl MePool {
let timeout_secs = timeout.map(|d| d.as_secs()).unwrap_or(0);
debug!(
writer_id,
timeout_secs,
allow_drain_fallback,
"ME writer marked draining"
timeout_secs, allow_drain_fallback, "ME writer marked draining"
);
}
@@ -674,7 +681,9 @@ impl MePool {
return true;
}
let started = writer.draining_started_at_epoch_secs.load(Ordering::Relaxed);
let started = writer
.draining_started_at_epoch_secs
.load(Ordering::Relaxed);
if started == 0 {
return false;
}

View File

@@ -7,8 +7,8 @@ use std::time::Instant;
use bytes::{Bytes, BytesMut};
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio::sync::{Mutex, mpsc};
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{Mutex, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
@@ -126,7 +126,8 @@ pub(crate) async fn reader_loop(
let data_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
let routed = if data_wait_ms == 0 {
reg.route_nowait(cid, MeResponse::Data { flags, data }).await
reg.route_nowait(cid, MeResponse::Data { flags, data })
.await
} else {
reg.route_with_timeout(cid, MeResponse::Data { flags, data }, data_wait_ms)
.await
@@ -134,7 +135,9 @@ pub(crate) async fn reader_loop(
if !matches!(routed, RouteResult::Routed) {
match routed {
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
RouteResult::ChannelClosed => stats.increment_me_route_drop_channel_closed(),
RouteResult::ChannelClosed => {
stats.increment_me_route_drop_channel_closed()
}
RouteResult::QueueFullBase => {
stats.increment_me_route_drop_queue_full();
stats.increment_me_route_drop_queue_full_base();
@@ -157,7 +160,9 @@ pub(crate) async fn reader_loop(
if !matches!(routed, RouteResult::Routed) {
match routed {
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
RouteResult::ChannelClosed => stats.increment_me_route_drop_channel_closed(),
RouteResult::ChannelClosed => {
stats.increment_me_route_drop_channel_closed()
}
RouteResult::QueueFullBase => {
stats.increment_me_route_drop_queue_full();
stats.increment_me_route_drop_queue_full_base();
@@ -216,9 +221,18 @@ pub(crate) async fn reader_loop(
}
let degraded_now = entry.1 > entry.0 * 2.0;
degraded.store(degraded_now, Ordering::Relaxed);
writer_rtt_ema_ms_x10
.store((entry.1 * 10.0).clamp(0.0, u32::MAX as f64) as u32, Ordering::Relaxed);
trace!(writer_id = wid, rtt_ms = rtt, ema_ms = entry.1, base_ms = entry.0, degraded = degraded_now, "ME RTT sample");
writer_rtt_ema_ms_x10.store(
(entry.1 * 10.0).clamp(0.0, u32::MAX as f64) as u32,
Ordering::Relaxed,
);
trace!(
writer_id = wid,
rtt_ms = rtt,
ema_ms = entry.1,
base_ms = entry.0,
degraded = degraded_now,
"ME RTT sample"
);
}
} else {
debug!(
@@ -238,10 +252,16 @@ async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) {
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
debug!(conn_id, "ME close_conn signal skipped: writer command channel is full");
debug!(
conn_id,
"ME close_conn signal skipped: writer command channel is full"
);
}
Err(TrySendError::Closed(_)) => {
debug!(conn_id, "ME close_conn signal skipped: writer command channel is closed");
debug!(
conn_id,
"ME close_conn signal skipped: writer command channel is closed"
);
}
}
}

View File

@@ -3,11 +3,11 @@ use std::net::SocketAddr;
use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::{mpsc, RwLock};
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{RwLock, mpsc};
use super::codec::WriterCommand;
use super::MeResponse;
use super::codec::WriterCommand;
const ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS: u64 = 25;
const ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS: u64 = 120;
@@ -97,12 +97,8 @@ impl ConnRegistry {
inner: RwLock::new(RegistryInner::new()),
next_id: AtomicU64::new(start),
route_channel_capacity: route_channel_capacity.max(1),
route_backpressure_base_timeout_ms: AtomicU64::new(
ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS,
),
route_backpressure_high_timeout_ms: AtomicU64::new(
ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS,
),
route_backpressure_base_timeout_ms: AtomicU64::new(ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS),
route_backpressure_high_timeout_ms: AtomicU64::new(ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS),
route_backpressure_high_watermark_pct: AtomicU8::new(
ROUTE_BACKPRESSURE_HIGH_WATERMARK_PCT,
),
@@ -184,8 +180,10 @@ impl ConnRegistry {
Err(TrySendError::Closed(_)) => RouteResult::ChannelClosed,
Err(TrySendError::Full(resp)) => {
// Absorb short bursts without dropping/closing the session immediately.
let base_timeout_ms =
self.route_backpressure_base_timeout_ms.load(Ordering::Relaxed).max(1);
let base_timeout_ms = self
.route_backpressure_base_timeout_ms
.load(Ordering::Relaxed)
.max(1);
let high_timeout_ms = self
.route_backpressure_high_timeout_ms
.load(Ordering::Relaxed)
@@ -301,13 +299,13 @@ impl ConnRegistry {
if let Some(previous_writer_id) = previous_writer_id
&& previous_writer_id != writer_id
{
let became_empty = if let Some(set) = inner.conns_for_writer.get_mut(&previous_writer_id)
{
set.remove(&conn_id);
set.is_empty()
} else {
false
};
let became_empty =
if let Some(set) = inner.conns_for_writer.get_mut(&previous_writer_id) {
set.remove(&conn_id);
set.is_empty()
} else {
false
};
if became_empty {
inner
.writer_idle_since_epoch_secs
@@ -328,7 +326,10 @@ impl ConnRegistry {
pub async fn mark_writer_idle(&self, writer_id: u64) {
let mut inner = self.inner.write().await;
inner.conns_for_writer.entry(writer_id).or_insert_with(HashSet::new);
inner
.conns_for_writer
.entry(writer_id)
.or_insert_with(HashSet::new);
inner
.writer_idle_since_epoch_secs
.entry(writer_id)
@@ -345,10 +346,7 @@ impl ConnRegistry {
inner.writer_idle_since_epoch_secs.clone()
}
pub async fn writer_idle_since_for_writer_ids(
&self,
writer_ids: &[u64],
) -> HashMap<u64, u64> {
pub async fn writer_idle_since_for_writer_ids(&self, writer_ids: &[u64]) -> HashMap<u64, u64> {
let inner = self.inner.read().await;
let mut out = HashMap::<u64, u64>::with_capacity(writer_ids.len());
for writer_id in writer_ids {
@@ -386,7 +384,10 @@ impl ConnRegistry {
let inner = self.inner.read().await;
let writer_id = inner.writer_for_conn.get(&conn_id).cloned()?;
let writer = inner.writers.get(&writer_id).cloned()?;
Some(ConnWriter { writer_id, tx: writer })
Some(ConnWriter {
writer_id,
tx: writer,
})
}
pub async fn active_conn_ids(&self) -> Vec<u64> {
@@ -592,7 +593,12 @@ mod tests {
let snapshot = registry.writer_activity_snapshot().await;
assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&0));
assert_eq!(snapshot.bound_clients_by_writer.get(&20), Some(&1));
assert!(registry.writer_idle_since_snapshot().await.contains_key(&10));
assert!(
registry
.writer_idle_since_snapshot()
.await
.contains_key(&10)
);
}
#[tokio::test]
@@ -636,7 +642,14 @@ mod tests {
let lost = registry.writer_lost(10).await;
assert!(lost.is_empty());
assert_eq!(registry.get_writer(conn_id).await.expect("writer").writer_id, 20);
assert_eq!(
registry
.get_writer(conn_id)
.await
.expect("writer")
.writer_id,
20
);
let removed_writer = registry.unregister(conn_id).await;
assert_eq!(removed_writer, Some(20));

View File

@@ -24,17 +24,20 @@ impl MeReinitTrigger {
}
}
pub fn enqueue_reinit_trigger(
tx: &mpsc::Sender<MeReinitTrigger>,
trigger: MeReinitTrigger,
) {
pub fn enqueue_reinit_trigger(tx: &mpsc::Sender<MeReinitTrigger>, trigger: MeReinitTrigger) {
match tx.try_send(trigger) {
Ok(()) => {}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
debug!(trigger = trigger.as_str(), "ME reinit trigger dropped (queue full)");
debug!(
trigger = trigger.as_str(),
"ME reinit trigger dropped (queue full)"
);
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
warn!(trigger = trigger.as_str(), "ME reinit trigger dropped (scheduler closed)");
warn!(
trigger = trigger.as_str(),
"ME reinit trigger dropped (scheduler closed)"
);
}
}
}
@@ -98,7 +101,6 @@ pub async fn me_reinit_scheduler(
.await;
});
}
}
}

View File

@@ -1,9 +1,9 @@
use tracing::{debug, info, warn};
use std::time::SystemTime;
use httpdate;
use std::time::SystemTime;
use tracing::{debug, info, warn};
use crate::error::{ProxyError, Result};
use super::selftest::record_timeskew_sample;
use crate::error::{ProxyError, Result};
pub const PROXY_SECRET_MIN_LEN: usize = 32;
@@ -11,24 +11,21 @@ pub(super) fn validate_proxy_secret_len(data_len: usize, max_len: usize) -> Resu
if max_len < PROXY_SECRET_MIN_LEN {
return Err(ProxyError::Proxy(format!(
"proxy-secret max length is invalid: {} bytes (must be >= {})",
max_len,
PROXY_SECRET_MIN_LEN
max_len, PROXY_SECRET_MIN_LEN
)));
}
if data_len < PROXY_SECRET_MIN_LEN {
return Err(ProxyError::Proxy(format!(
"proxy-secret too short: {} bytes (need >= {})",
data_len,
PROXY_SECRET_MIN_LEN
data_len, PROXY_SECRET_MIN_LEN
)));
}
if data_len > max_len {
return Err(ProxyError::Proxy(format!(
"proxy-secret too long: {} bytes (limit = {})",
data_len,
max_len
data_len, max_len
)));
}
@@ -94,16 +91,22 @@ pub async fn download_proxy_secret_with_max_len(max_len: usize) -> Result<Vec<u8
if let Some(date) = resp.headers().get(reqwest::header::DATE)
&& let Ok(date_str) = date.to_str()
&& let Ok(server_time) = httpdate::parse_http_date(date_str)
&& let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| {
server_time.duration_since(SystemTime::now()).map_err(|_| e)
})
&& let Ok(skew) = SystemTime::now()
.duration_since(server_time)
.or_else(|e| server_time.duration_since(SystemTime::now()).map_err(|_| e))
{
let skew_secs = skew.as_secs();
record_timeskew_sample("proxy_secret_date_header", skew_secs);
if skew_secs > 60 {
warn!(skew_secs, "Time skew >60s detected from proxy-secret Date header");
warn!(
skew_secs,
"Time skew >60s detected from proxy-secret Date header"
);
} else if skew_secs > 30 {
warn!(skew_secs, "Time skew >30s detected from proxy-secret Date header");
warn!(
skew_secs,
"Time skew >30s detected from proxy-secret Date header"
);
}
}

View File

@@ -17,9 +17,9 @@ use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32};
use super::MePool;
use super::codec::WriterCommand;
use super::pool::WriterContour;
use super::registry::ConnMeta;
use super::wire::build_proxy_req_payload;
use rand::seq::SliceRandom;
use super::registry::ConnMeta;
const IDLE_WRITER_PENALTY_MID_SECS: u64 = 45;
const IDLE_WRITER_PENALTY_HIGH_SECS: u64 = 55;
@@ -68,9 +68,8 @@ impl MePool {
};
let no_writer_mode =
MeRouteNoWriterMode::from_u8(self.me_route_no_writer_mode.load(Ordering::Relaxed));
let (routed_dc, unknown_target_dc) = self
.resolve_target_dc_for_routing(target_dc as i32)
.await;
let (routed_dc, unknown_target_dc) =
self.resolve_target_dc_for_routing(target_dc as i32).await;
let mut no_writer_deadline: Option<Instant> = None;
let mut emergency_attempts = 0u32;
let mut async_recovery_triggered = false;
@@ -87,19 +86,24 @@ impl MePool {
.unwrap_or_else(|| fallback_meta.clone());
let (current_payload, _) = build_routed_payload(current_meta.our_addr);
if let Some(current) = self.registry.get_writer(conn_id).await {
match current.tx.try_send(WriterCommand::Data(current_payload.clone())) {
match current
.tx
.try_send(WriterCommand::Data(current_payload.clone()))
{
Ok(()) => return Ok(()),
Err(TrySendError::Full(cmd)) => {
if current.tx.send(cmd).await.is_ok() {
return Ok(());
}
warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await;
self.remove_writer_and_close_clients(current.writer_id)
.await;
continue;
}
Err(TrySendError::Closed(_)) => {
warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await;
self.remove_writer_and_close_clients(current.writer_id)
.await;
continue;
}
}
@@ -143,7 +147,11 @@ impl MePool {
for (ip, port) in addrs {
let addr = SocketAddr::new(*ip, *port);
let _ = self
.connect_one_for_dc(addr, *dc, self.rng.as_ref())
.connect_one_for_dc(
addr,
*dc,
self.rng.as_ref(),
)
.await;
}
}
@@ -157,8 +165,9 @@ impl MePool {
if !self.writers.read().await.is_empty() {
continue;
}
let deadline = *no_writer_deadline
.get_or_insert_with(|| Instant::now() + self.me_route_inline_recovery_wait);
let deadline = *no_writer_deadline.get_or_insert_with(|| {
Instant::now() + self.me_route_inline_recovery_wait
});
if !self.wait_for_writer_until(deadline).await {
if !self.writers.read().await.is_empty() {
continue;
@@ -182,9 +191,8 @@ impl MePool {
}
let deadline = Instant::now() + hybrid_wait_current;
let _ = self.wait_for_writer_until(deadline).await;
hybrid_wait_current =
(hybrid_wait_current.saturating_mul(2))
.min(Duration::from_millis(400));
hybrid_wait_current = (hybrid_wait_current.saturating_mul(2))
.min(Duration::from_millis(400));
continue;
}
}
@@ -204,11 +212,11 @@ impl MePool {
let pick_mode = self.writer_pick_mode();
match no_writer_mode {
MeRouteNoWriterMode::AsyncRecoveryFailfast => {
let deadline = *no_writer_deadline.get_or_insert_with(|| {
Instant::now() + self.me_route_no_writer_wait
});
let deadline = *no_writer_deadline
.get_or_insert_with(|| Instant::now() + self.me_route_no_writer_wait);
if !async_recovery_triggered && !unknown_target_dc {
let triggered = self.trigger_async_recovery_for_target_dc(routed_dc).await;
let triggered =
self.trigger_async_recovery_for_target_dc(routed_dc).await;
if !triggered {
self.trigger_async_recovery_global().await;
}
@@ -217,7 +225,8 @@ impl MePool {
if self.wait_for_candidate_until(routed_dc, deadline).await {
continue;
}
self.stats.increment_me_writer_pick_no_candidate_total(pick_mode);
self.stats
.increment_me_writer_pick_no_candidate_total(pick_mode);
self.stats.increment_me_no_writer_failfast_total();
return Err(ProxyError::Proxy(
"No ME writers available for target DC in failfast window".into(),
@@ -226,29 +235,41 @@ impl MePool {
MeRouteNoWriterMode::InlineRecoveryLegacy => {
self.stats.increment_me_inline_recovery_total();
if unknown_target_dc {
let deadline = *no_writer_deadline
.get_or_insert_with(|| Instant::now() + self.me_route_inline_recovery_wait);
let deadline = *no_writer_deadline.get_or_insert_with(|| {
Instant::now() + self.me_route_inline_recovery_wait
});
if self.wait_for_candidate_until(routed_dc, deadline).await {
continue;
}
self.stats.increment_me_writer_pick_no_candidate_total(pick_mode);
self.stats
.increment_me_writer_pick_no_candidate_total(pick_mode);
self.stats.increment_me_no_writer_failfast_total();
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
return Err(ProxyError::Proxy(
"No ME writers available for target DC".into(),
));
}
if emergency_attempts >= self.me_route_inline_recovery_attempts.max(1) {
self.stats.increment_me_writer_pick_no_candidate_total(pick_mode);
self.stats
.increment_me_writer_pick_no_candidate_total(pick_mode);
self.stats.increment_me_no_writer_failfast_total();
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
return Err(ProxyError::Proxy(
"No ME writers available for target DC".into(),
));
}
emergency_attempts += 1;
let mut endpoints = self.endpoint_candidates_for_target_dc(routed_dc).await;
endpoints.shuffle(&mut rand::rng());
for addr in endpoints {
if self.connect_one_for_dc(addr, routed_dc, self.rng.as_ref()).await.is_ok() {
if self
.connect_one_for_dc(addr, routed_dc, self.rng.as_ref())
.await
.is_ok()
{
break;
}
}
tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64)).await;
tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64))
.await;
let ws2 = self.writers.read().await;
writers_snapshot = ws2.clone();
drop(ws2);
@@ -261,8 +282,11 @@ impl MePool {
.await;
}
if candidate_indices.is_empty() {
self.stats.increment_me_writer_pick_no_candidate_total(pick_mode);
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
self.stats
.increment_me_writer_pick_no_candidate_total(pick_mode);
return Err(ProxyError::Proxy(
"No ME writers available for target DC".into(),
));
}
}
MeRouteNoWriterMode::HybridAsyncPersistent => {
@@ -277,8 +301,8 @@ impl MePool {
}
let deadline = Instant::now() + hybrid_wait_current;
let _ = self.wait_for_candidate_until(routed_dc, deadline).await;
hybrid_wait_current = (hybrid_wait_current.saturating_mul(2))
.min(Duration::from_millis(400));
hybrid_wait_current =
(hybrid_wait_current.saturating_mul(2)).min(Duration::from_millis(400));
continue;
}
}
@@ -385,7 +409,8 @@ impl MePool {
continue;
}
permit.send(WriterCommand::Data(payload.clone()));
self.stats.increment_me_writer_pick_success_try_total(pick_mode);
self.stats
.increment_me_writer_pick_success_try_total(pick_mode);
if w.generation < self.current_generation() {
self.stats.increment_pool_stale_pick_total();
debug!(
@@ -422,7 +447,8 @@ impl MePool {
self.stats.increment_me_writer_pick_full_total(pick_mode);
continue;
}
self.stats.increment_me_writer_pick_blocking_fallback_total();
self.stats
.increment_me_writer_pick_blocking_fallback_total();
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
let (payload, meta) = build_routed_payload(effective_our_addr);
match w.tx.clone().reserve_owned().await {
@@ -629,7 +655,7 @@ impl MePool {
pub fn connection_count(&self) -> usize {
self.conn_count.load(Ordering::Relaxed)
}
pub(super) async fn candidate_indices_for_dc(
&self,
writers: &[super::pool::MeWriter],
@@ -718,15 +744,17 @@ impl MePool {
0
};
let idle_penalty =
(self.writer_idle_rank_for_selection(writer, idle_since_by_writer, now_epoch_secs) as u64)
(self.writer_idle_rank_for_selection(writer, idle_since_by_writer, now_epoch_secs)
as u64)
* 100;
let queue_cap = self.writer_cmd_channel_capacity.max(1) as u64;
let queue_remaining = writer.tx.capacity() as u64;
let queue_used = queue_cap.saturating_sub(queue_remaining.min(queue_cap));
let queue_util_pct = queue_used.saturating_mul(100) / queue_cap;
let queue_penalty = queue_util_pct.saturating_mul(4);
let rtt_penalty = ((writer.rtt_ema_ms_x10.load(Ordering::Relaxed) as u64).saturating_add(5) / 10)
.min(400);
let rtt_penalty =
((writer.rtt_ema_ms_x10.load(Ordering::Relaxed) as u64).saturating_add(5) / 10)
.min(400);
contour_penalty
.saturating_add(stale_penalty)

View File

@@ -10,9 +10,9 @@ use tokio_util::sync::CancellationToken;
use super::codec::WriterCommand;
use super::health::{health_drain_close_budget, reap_draining_writers};
use super::me_health_monitor;
use super::pool::{MePool, MeWriter, WriterContour};
use super::registry::ConnMeta;
use super::me_health_monitor;
use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode};
use crate::crypto::SecureRandom;
use crate::network::probe::NetworkDecision;
@@ -241,14 +241,7 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle
let now_epoch_secs = MePool::now_epoch_secs();
for writer_id in 1..=60u64 {
insert_draining_writer(
&pool,
writer_id,
now_epoch_secs.saturating_sub(20),
1,
0,
)
.await;
insert_draining_writer(&pool, writer_id, now_epoch_secs.saturating_sub(20), 1, 0).await;
}
let mut warn_next_allowed = HashMap::new();
@@ -267,17 +260,12 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle
async fn reap_draining_writers_handles_large_empty_writer_population() {
let (pool, _rng) = make_pool(128, 1, 1).await;
let now_epoch_secs = MePool::now_epoch_secs();
let total = health_drain_close_budget().saturating_mul(3).saturating_add(27);
let total = health_drain_close_budget()
.saturating_mul(3)
.saturating_add(27);
for writer_id in 1..=total as u64 {
insert_draining_writer(
&pool,
writer_id,
now_epoch_secs.saturating_sub(120),
0,
0,
)
.await;
insert_draining_writer(&pool, writer_id, now_epoch_secs.saturating_sub(120), 0, 0).await;
}
let mut warn_next_allowed = HashMap::new();
@@ -295,7 +283,9 @@ async fn reap_draining_writers_handles_large_empty_writer_population() {
async fn reap_draining_writers_processes_mass_deadline_expiry_without_unbounded_growth() {
let (pool, _rng) = make_pool(128, 1, 1).await;
let now_epoch_secs = MePool::now_epoch_secs();
let total = health_drain_close_budget().saturating_mul(4).saturating_add(31);
let total = health_drain_close_budget()
.saturating_mul(4)
.saturating_add(31);
for writer_id in 1..=total as u64 {
insert_draining_writer(
@@ -580,14 +570,7 @@ async fn reap_draining_writers_repeated_draining_flips_never_leave_stale_warn_st
let now_epoch_secs = MePool::now_epoch_secs();
for writer_id in 1..=24u64 {
insert_draining_writer(
&pool,
writer_id,
now_epoch_secs.saturating_sub(240),
1,
0,
)
.await;
insert_draining_writer(&pool, writer_id, now_epoch_secs.saturating_sub(240), 1, 0).await;
}
let mut warn_next_allowed = HashMap::new();

View File

@@ -9,9 +9,9 @@ use tokio_util::sync::CancellationToken;
use super::codec::WriterCommand;
use super::health::health_drain_close_budget;
use super::me_health_monitor;
use super::pool::{MePool, MeWriter, WriterContour};
use super::registry::ConnMeta;
use super::me_health_monitor;
use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode};
use crate::crypto::SecureRandom;
use crate::network::probe::NetworkDecision;
@@ -185,7 +185,9 @@ async fn wait_for_pool_empty(pool: &Arc<MePool>, timeout: Duration) {
async fn me_health_monitor_drains_expired_backlog_over_multiple_cycles() {
let (pool, rng) = make_pool(128, 1, 1).await;
let now_epoch_secs = MePool::now_epoch_secs();
let writer_total = health_drain_close_budget().saturating_mul(2).saturating_add(9);
let writer_total = health_drain_close_budget()
.saturating_mul(2)
.saturating_add(9);
for writer_id in 1..=writer_total as u64 {
insert_draining_writer(
&pool,

View File

@@ -270,14 +270,7 @@ async fn reap_draining_writers_limits_closes_per_health_tick() {
let close_budget = health_drain_close_budget();
let writer_total = close_budget.saturating_add(20);
for writer_id in 1..=writer_total as u64 {
insert_draining_writer(
&pool,
writer_id,
now_epoch_secs.saturating_sub(20),
1,
0,
)
.await;
insert_draining_writer(&pool, writer_id, now_epoch_secs.saturating_sub(20), 1, 0).await;
}
let mut warn_next_allowed = HashMap::new();
@@ -304,10 +297,7 @@ async fn reap_draining_writers_keeps_warn_state_for_deadline_backlog_writers() {
}
let target_writer_id = writer_total as u64;
let mut warn_next_allowed = HashMap::new();
warn_next_allowed.insert(
target_writer_id,
Instant::now() + Duration::from_secs(300),
);
warn_next_allowed.insert(target_writer_id, Instant::now() + Duration::from_secs(300));
reap_draining_writers(&pool, &mut warn_next_allowed).await;
@@ -333,10 +323,7 @@ async fn reap_draining_writers_keeps_warn_state_for_overflow_backlog_writers() {
}
let target_writer_id = writer_total.saturating_sub(1) as u64;
let mut warn_next_allowed = HashMap::new();
warn_next_allowed.insert(
target_writer_id,
Instant::now() + Duration::from_secs(300),
);
warn_next_allowed.insert(target_writer_id, Instant::now() + Duration::from_secs(300));
reap_draining_writers(&pool, &mut warn_next_allowed).await;
@@ -382,10 +369,7 @@ async fn reap_draining_writers_preserves_warn_state_across_multiple_budget_defer
let tail_writer_id = writer_total as u64;
let mut warn_next_allowed = HashMap::new();
warn_next_allowed.insert(
tail_writer_id,
Instant::now() + Duration::from_secs(300),
);
warn_next_allowed.insert(tail_writer_id, Instant::now() + Duration::from_secs(300));
reap_draining_writers(&pool, &mut warn_next_allowed).await;
assert!(writer_exists(&pool, tail_writer_id).await);
@@ -410,14 +394,7 @@ async fn reap_draining_writers_backlog_drains_across_ticks() {
let close_budget = health_drain_close_budget();
let writer_total = close_budget.saturating_mul(2).saturating_add(7);
for writer_id in 1..=writer_total as u64 {
insert_draining_writer(
&pool,
writer_id,
now_epoch_secs.saturating_sub(20),
0,
0,
)
.await;
insert_draining_writer(&pool, writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await;
}
let mut warn_next_allowed = HashMap::new();
@@ -439,14 +416,7 @@ async fn reap_draining_writers_threshold_backlog_converges_to_threshold() {
let close_budget = health_drain_close_budget();
let writer_total = threshold as usize + close_budget.saturating_add(12);
for writer_id in 1..=writer_total as u64 {
insert_draining_writer(
&pool,
writer_id,
now_epoch_secs.saturating_sub(20),
1,
0,
)
.await;
insert_draining_writer(&pool, writer_id, now_epoch_secs.saturating_sub(20), 1, 0).await;
}
let mut warn_next_allowed = HashMap::new();
@@ -480,17 +450,17 @@ async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() {
let now_epoch_secs = MePool::now_epoch_secs();
let close_budget = health_drain_close_budget();
for writer_id in 1..=close_budget.saturating_add(1) as u64 {
insert_draining_writer(
&pool,
writer_id,
now_epoch_secs.saturating_sub(20),
1,
0,
)
.await;
insert_draining_writer(&pool, writer_id, now_epoch_secs.saturating_sub(20), 1, 0).await;
}
let empty_writer_id = close_budget.saturating_add(2) as u64;
insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await;
insert_draining_writer(
&pool,
empty_writer_id,
now_epoch_secs.saturating_sub(20),
0,
0,
)
.await;
let mut warn_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
@@ -622,14 +592,18 @@ fn general_config_default_drain_threshold_remains_enabled() {
GeneralConfig::default().me_pool_drain_soft_evict_cooldown_ms,
1000
);
assert_eq!(GeneralConfig::default().me_bind_stale_mode, MeBindStaleMode::Never);
assert_eq!(
GeneralConfig::default().me_bind_stale_mode,
MeBindStaleMode::Never
);
}
#[tokio::test]
async fn prune_closed_writers_closes_bound_clients_when_writer_is_non_empty() {
let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs();
let conn_ids = insert_draining_writer(&pool, 910, now_epoch_secs.saturating_sub(60), 1, 0).await;
let conn_ids =
insert_draining_writer(&pool, 910, now_epoch_secs.saturating_sub(60), 1, 0).await;
pool.prune_closed_writers().await;

View File

@@ -132,7 +132,8 @@ async fn connectable_endpoints_releases_quarantine_lock_before_sleep() {
}
let pool_for_task = Arc::clone(&pool);
let task = tokio::spawn(async move { pool_for_task.connectable_endpoints_for_test(&[addr]).await });
let task =
tokio::spawn(async move { pool_for_task.connectable_endpoints_for_test(&[addr]).await });
tokio::time::sleep(Duration::from_millis(10)).await;

View File

@@ -230,7 +230,10 @@ async fn negative_unknown_writer_removal_is_noop() {
assert!(pool.writers.read().await.is_empty());
assert_eq!(pool.conn_count.load(Ordering::Relaxed), 0);
assert_eq!(pool.stats.get_me_endpoint_quarantine_total(), before_quarantine);
assert_eq!(
pool.stats.get_me_endpoint_quarantine_total(),
before_quarantine
);
}
#[tokio::test]
@@ -241,7 +244,10 @@ async fn edge_draining_only_detach_rejects_active_writer() {
insert_writer(&pool, writer_id, 2, addr, false, Instant::now()).await;
let removed = pool.remove_draining_writer_hard_detach(writer_id).await;
assert!(!removed, "active writer must not be detached by draining-only path");
assert!(
!removed,
"active writer must not be detached by draining-only path"
);
assert!(current_writer_ids(&pool).await.contains(&writer_id));
assert_eq!(pool.conn_count.load(Ordering::Relaxed), 1);
@@ -324,8 +330,14 @@ async fn light_fuzz_insert_remove_schedule_preserves_pool_invariants() {
}
let actual_ids = current_writer_ids(&pool).await;
assert_eq!(actual_ids, model, "writer-id set must match model under fuzz schedule");
assert_eq!(pool.conn_count.load(Ordering::Relaxed) as usize, model.len());
assert_eq!(
actual_ids, model,
"writer-id set must match model under fuzz schedule"
);
assert_eq!(
pool.conn_count.load(Ordering::Relaxed) as usize,
model.len()
);
}
for writer_id in model {
@@ -341,7 +353,12 @@ async fn stress_parallel_duplicate_removals_are_idempotent() {
for writer_id in 1..=48u64 {
let addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 14, (writer_id / 250) as u8, (writer_id % 250) as u8)),
IpAddr::V4(Ipv4Addr::new(
127,
14,
(writer_id / 250) as u8,
(writer_id % 250) as u8,
)),
5000 + writer_id as u16,
);
insert_writer(
@@ -363,7 +380,8 @@ async fn stress_parallel_duplicate_removals_are_idempotent() {
if ((writer_id + worker) & 1) == 0 {
pool.remove_writer_and_close_clients(writer_id).await;
} else {
pool.remove_writer_and_close_clients(100_000 + writer_id).await;
pool.remove_writer_and_close_clients(100_000 + writer_id)
.await;
}
}
}));

View File

@@ -205,8 +205,14 @@ async fn send_proxy_req_does_not_replay_when_first_bind_commit_fails() {
.await;
assert!(result.is_ok());
assert_eq!(recv_data_count(&mut stale_rx, Duration::from_millis(50)).await, 0);
assert_eq!(recv_data_count(&mut live_rx, Duration::from_millis(50)).await, 1);
assert_eq!(
recv_data_count(&mut stale_rx, Duration::from_millis(50)).await,
0
);
assert_eq!(
recv_data_count(&mut live_rx, Duration::from_millis(50)).await,
1
);
let bound = pool.registry.get_writer(conn_id).await;
assert!(bound.is_some());
@@ -258,9 +264,18 @@ async fn send_proxy_req_prunes_iterative_stale_bind_failures_without_data_replay
.await;
assert!(result.is_ok());
assert_eq!(recv_data_count(&mut stale_rx_1, Duration::from_millis(50)).await, 0);
assert_eq!(recv_data_count(&mut stale_rx_2, Duration::from_millis(50)).await, 0);
assert_eq!(recv_data_count(&mut live_rx, Duration::from_millis(50)).await, 1);
assert_eq!(
recv_data_count(&mut stale_rx_1, Duration::from_millis(50)).await,
0
);
assert_eq!(
recv_data_count(&mut stale_rx_2, Duration::from_millis(50)).await,
0
);
assert_eq!(
recv_data_count(&mut live_rx, Duration::from_millis(50)).await,
1
);
let writers = pool.writers.read().await;
let writer_ids = writers.iter().map(|w| w.id).collect::<Vec<_>>();

View File

@@ -1,5 +1,5 @@
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use bytes::Bytes;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use crate::protocol::constants::*;