mirror of
https://github.com/telemt/telemt.git
synced 2026-04-16 18:14:10 +03:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f3bdaec2c | ||
|
|
69b02caf77 | ||
|
|
3854955069 | ||
|
|
9b84fc7a5b | ||
|
|
e7cb9238dc | ||
|
|
0e2cbe6178 | ||
|
|
cd076aeeeb | ||
|
|
d683faf922 |
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "telemt"
|
||||
version = "3.3.0"
|
||||
version = "3.3.2"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -548,6 +548,12 @@ impl ProxyConfig {
|
||||
config.general.middle_proxy_nat_probe = true;
|
||||
warn!("Auto-enabled middle_proxy_nat_probe for middle proxy mode");
|
||||
}
|
||||
if config.general.use_middle_proxy && !config.general.me_secret_atomic_snapshot {
|
||||
config.general.me_secret_atomic_snapshot = true;
|
||||
warn!(
|
||||
"Auto-enabled me_secret_atomic_snapshot for middle proxy mode to keep KDF key_selector/secret coherent"
|
||||
);
|
||||
}
|
||||
|
||||
validate_network_cfg(&mut config.network)?;
|
||||
crate::network::dns_overrides::validate_entries(&config.network.dns_overrides)?;
|
||||
|
||||
@@ -187,9 +187,10 @@ impl MeFloorMode {
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum MeRouteNoWriterMode {
|
||||
#[default]
|
||||
AsyncRecoveryFailfast,
|
||||
InlineRecoveryLegacy,
|
||||
#[default]
|
||||
HybridAsyncPersistent,
|
||||
}
|
||||
|
||||
impl MeRouteNoWriterMode {
|
||||
@@ -197,13 +198,16 @@ impl MeRouteNoWriterMode {
|
||||
match self {
|
||||
MeRouteNoWriterMode::AsyncRecoveryFailfast => 0,
|
||||
MeRouteNoWriterMode::InlineRecoveryLegacy => 1,
|
||||
MeRouteNoWriterMode::HybridAsyncPersistent => 2,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_u8(raw: u8) -> Self {
|
||||
match raw {
|
||||
0 => MeRouteNoWriterMode::AsyncRecoveryFailfast,
|
||||
1 => MeRouteNoWriterMode::InlineRecoveryLegacy,
|
||||
_ => MeRouteNoWriterMode::AsyncRecoveryFailfast,
|
||||
2 => MeRouteNoWriterMode::HybridAsyncPersistent,
|
||||
_ => MeRouteNoWriterMode::HybridAsyncPersistent,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
77
src/main.rs
77
src/main.rs
@@ -8,7 +8,7 @@ use std::time::{Duration, Instant};
|
||||
use rand::Rng;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::signal;
|
||||
use tokio::sync::{Semaphore, mpsc};
|
||||
use tokio::sync::{Semaphore, mpsc, watch};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload};
|
||||
#[cfg(unix)]
|
||||
@@ -241,6 +241,17 @@ fn format_uptime(total_secs: u64) -> String {
|
||||
format!("{} / {} seconds", parts.join(", "), total_secs)
|
||||
}
|
||||
|
||||
async fn wait_until_admission_open(admission_rx: &mut watch::Receiver<bool>) -> bool {
|
||||
loop {
|
||||
if *admission_rx.borrow() {
|
||||
return true;
|
||||
}
|
||||
if admission_rx.changed().await.is_err() {
|
||||
return *admission_rx.borrow();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn load_startup_proxy_config_snapshot(
|
||||
url: &str,
|
||||
cache_path: Option<&str>,
|
||||
@@ -1325,6 +1336,60 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
print_proxy_links(&host, port, &config);
|
||||
}
|
||||
|
||||
let (admission_tx, admission_rx) = watch::channel(true);
|
||||
if config.general.use_middle_proxy {
|
||||
if let Some(pool) = me_pool.as_ref() {
|
||||
let initial_open = pool.admission_ready_conditional_cast().await;
|
||||
admission_tx.send_replace(initial_open);
|
||||
if initial_open {
|
||||
info!("Conditional-admission gate: open (ME pool ready)");
|
||||
} else {
|
||||
warn!("Conditional-admission gate: closed (ME pool is not ready)");
|
||||
}
|
||||
|
||||
let pool_for_gate = pool.clone();
|
||||
let admission_tx_gate = admission_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut gate_open = initial_open;
|
||||
let mut open_streak = if initial_open { 1u32 } else { 0u32 };
|
||||
let mut close_streak = if initial_open { 0u32 } else { 1u32 };
|
||||
loop {
|
||||
let ready = pool_for_gate.admission_ready_conditional_cast().await;
|
||||
if ready {
|
||||
open_streak = open_streak.saturating_add(1);
|
||||
close_streak = 0;
|
||||
if !gate_open && open_streak >= 2 {
|
||||
gate_open = true;
|
||||
admission_tx_gate.send_replace(true);
|
||||
info!(
|
||||
open_streak,
|
||||
"Conditional-admission gate opened (ME pool recovered)"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
close_streak = close_streak.saturating_add(1);
|
||||
open_streak = 0;
|
||||
if gate_open && close_streak >= 2 {
|
||||
gate_open = false;
|
||||
admission_tx_gate.send_replace(false);
|
||||
warn!(
|
||||
close_streak,
|
||||
"Conditional-admission gate closed (ME pool has uncovered DC groups)"
|
||||
);
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(250)).await;
|
||||
}
|
||||
});
|
||||
} else {
|
||||
admission_tx.send_replace(false);
|
||||
warn!("Conditional-admission gate: closed (ME pool is unavailable)");
|
||||
}
|
||||
} else {
|
||||
admission_tx.send_replace(true);
|
||||
}
|
||||
let _admission_tx_hold = admission_tx;
|
||||
|
||||
// Unix socket setup (before listeners check so unix-only config works)
|
||||
let mut has_unix_listener = false;
|
||||
#[cfg(unix)]
|
||||
@@ -1358,6 +1423,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
has_unix_listener = true;
|
||||
|
||||
let mut config_rx_unix: tokio::sync::watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
|
||||
let mut admission_rx_unix = admission_rx.clone();
|
||||
let stats = stats.clone();
|
||||
let upstream_manager = upstream_manager.clone();
|
||||
let replay_checker = replay_checker.clone();
|
||||
@@ -1373,6 +1439,10 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
let unix_conn_counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1));
|
||||
|
||||
loop {
|
||||
if !wait_until_admission_open(&mut admission_rx_unix).await {
|
||||
warn!("Conditional-admission gate channel closed for unix listener");
|
||||
break;
|
||||
}
|
||||
match unix_listener.accept().await {
|
||||
Ok((stream, _)) => {
|
||||
let permit = match max_connections_unix.clone().acquire_owned().await {
|
||||
@@ -1507,6 +1577,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
for (listener, listener_proxy_protocol) in listeners {
|
||||
let mut config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
|
||||
let mut admission_rx_tcp = admission_rx.clone();
|
||||
let stats = stats.clone();
|
||||
let upstream_manager = upstream_manager.clone();
|
||||
let replay_checker = replay_checker.clone();
|
||||
@@ -1520,6 +1591,10 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
if !wait_until_admission_open(&mut admission_rx_tcp).await {
|
||||
warn!("Conditional-admission gate channel closed for tcp listener");
|
||||
break;
|
||||
}
|
||||
match listener.accept().await {
|
||||
Ok((stream, peer_addr)) => {
|
||||
let permit = match max_connections_tcp.clone().acquire_owned().await {
|
||||
|
||||
@@ -387,9 +387,11 @@ impl MePool {
|
||||
socks_bound_addr.map(|value| value.ip()),
|
||||
client_port_source,
|
||||
);
|
||||
let mut kdf_fingerprint_guard = self.kdf_material_fingerprint.lock().await;
|
||||
if let Some((prev_fingerprint, prev_client_port)) =
|
||||
let previous_kdf_fingerprint = {
|
||||
let kdf_fingerprint_guard = self.kdf_material_fingerprint.read().await;
|
||||
kdf_fingerprint_guard.get(&peer_addr_nat).copied()
|
||||
};
|
||||
if let Some((prev_fingerprint, prev_client_port)) = previous_kdf_fingerprint
|
||||
{
|
||||
if prev_fingerprint != kdf_fingerprint {
|
||||
self.stats.increment_me_kdf_drift_total();
|
||||
@@ -416,6 +418,9 @@ impl MePool {
|
||||
);
|
||||
}
|
||||
}
|
||||
// Keep fingerprint updates eventually consistent for diagnostics while avoiding
|
||||
// serializing all concurrent handshakes on a single async mutex.
|
||||
let mut kdf_fingerprint_guard = self.kdf_material_fingerprint.write().await;
|
||||
kdf_fingerprint_guard.insert(peer_addr_nat, (kdf_fingerprint, client_port_for_kdf));
|
||||
drop(kdf_fingerprint_guard);
|
||||
|
||||
|
||||
@@ -132,7 +132,7 @@ pub struct MePool {
|
||||
pub(super) pending_hardswap_map_hash: AtomicU64,
|
||||
pub(super) hardswap: AtomicBool,
|
||||
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
|
||||
pub(super) kdf_material_fingerprint: Arc<Mutex<HashMap<SocketAddr, (u64, u16)>>>,
|
||||
pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>,
|
||||
pub(super) me_pool_drain_ttl_secs: AtomicU64,
|
||||
pub(super) me_pool_force_close_secs: AtomicU64,
|
||||
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,
|
||||
@@ -335,7 +335,7 @@ impl MePool {
|
||||
pending_hardswap_map_hash: AtomicU64::new(0),
|
||||
hardswap: AtomicBool::new(hardswap),
|
||||
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
|
||||
kdf_material_fingerprint: Arc::new(Mutex::new(HashMap::new())),
|
||||
kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())),
|
||||
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
|
||||
me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs),
|
||||
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(
|
||||
|
||||
@@ -14,10 +14,12 @@ use super::pool::MePool;
|
||||
impl MePool {
|
||||
pub async fn init(self: &Arc<Self>, pool_size: usize, rng: &Arc<SecureRandom>) -> Result<()> {
|
||||
let family_order = self.family_order();
|
||||
let connect_concurrency = self.me_reconnect_max_concurrent_per_dc.max(1) as usize;
|
||||
let ks = self.key_selector().await;
|
||||
info!(
|
||||
me_servers = self.proxy_map_v4.read().await.len(),
|
||||
pool_size,
|
||||
connect_concurrency,
|
||||
key_selector = format_args!("0x{ks:08x}"),
|
||||
secret_len = self.proxy_secret.read().await.secret.len(),
|
||||
"Initializing ME pool"
|
||||
@@ -41,23 +43,39 @@ impl MePool {
|
||||
})
|
||||
.collect();
|
||||
dc_addrs.sort_unstable_by_key(|(dc, _)| *dc);
|
||||
dc_addrs.sort_by_key(|(_, addrs)| (addrs.len() != 1, addrs.len()));
|
||||
|
||||
// Ensure at least one live writer per DC group; run missing DCs in parallel.
|
||||
// Stage 1: build base coverage for conditional-cast.
|
||||
// Single-endpoint DCs are prefilled first; multi-endpoint DCs require one live writer.
|
||||
let mut join = tokio::task::JoinSet::new();
|
||||
for (dc, addrs) in dc_addrs.iter().cloned() {
|
||||
if addrs.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let target_writers = if addrs.len() == 1 {
|
||||
self.required_writers_for_dc_with_floor_mode(addrs.len(), false)
|
||||
} else {
|
||||
1usize
|
||||
};
|
||||
let endpoints: HashSet<SocketAddr> = addrs
|
||||
.iter()
|
||||
.map(|(ip, port)| SocketAddr::new(*ip, *port))
|
||||
.collect();
|
||||
if self.active_writer_count_for_endpoints(&endpoints).await > 0 {
|
||||
if self.active_writer_count_for_endpoints(&endpoints).await >= target_writers {
|
||||
continue;
|
||||
}
|
||||
let pool = Arc::clone(self);
|
||||
let rng_clone = Arc::clone(rng);
|
||||
join.spawn(async move { pool.connect_primary_for_dc(dc, addrs, rng_clone).await });
|
||||
join.spawn(async move {
|
||||
pool.connect_primary_for_dc(
|
||||
dc,
|
||||
addrs,
|
||||
target_writers,
|
||||
rng_clone,
|
||||
connect_concurrency,
|
||||
)
|
||||
.await
|
||||
});
|
||||
}
|
||||
while join.join_next().await.is_some() {}
|
||||
|
||||
@@ -77,47 +95,35 @@ impl MePool {
|
||||
)));
|
||||
}
|
||||
|
||||
// Warm reserve writers asynchronously so startup does not block after first working pool is ready.
|
||||
// Stage 2: continue saturating multi-endpoint DC groups in background.
|
||||
let pool = Arc::clone(self);
|
||||
let rng_clone = Arc::clone(rng);
|
||||
let dc_addrs_bg = dc_addrs.clone();
|
||||
tokio::spawn(async move {
|
||||
if pool.me_warmup_stagger_enabled {
|
||||
for (dc, addrs) in &dc_addrs_bg {
|
||||
for (ip, port) in addrs {
|
||||
if pool.connection_count() >= pool_size {
|
||||
break;
|
||||
}
|
||||
let addr = SocketAddr::new(*ip, *port);
|
||||
let jitter = rand::rng()
|
||||
.random_range(0..=pool.me_warmup_step_jitter.as_millis() as u64);
|
||||
let delay_ms = pool.me_warmup_step_delay.as_millis() as u64 + jitter;
|
||||
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
|
||||
if let Err(e) = pool.connect_one(addr, rng_clone.as_ref()).await {
|
||||
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed (staggered)");
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (dc, addrs) in &dc_addrs_bg {
|
||||
for (ip, port) in addrs {
|
||||
if pool.connection_count() >= pool_size {
|
||||
break;
|
||||
}
|
||||
let addr = SocketAddr::new(*ip, *port);
|
||||
if let Err(e) = pool.connect_one(addr, rng_clone.as_ref()).await {
|
||||
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed");
|
||||
}
|
||||
}
|
||||
if pool.connection_count() >= pool_size {
|
||||
break;
|
||||
}
|
||||
let mut join_bg = tokio::task::JoinSet::new();
|
||||
for (dc, addrs) in dc_addrs_bg {
|
||||
if addrs.len() <= 1 {
|
||||
continue;
|
||||
}
|
||||
let target_writers = pool.required_writers_for_dc_with_floor_mode(addrs.len(), false);
|
||||
let pool_clone = Arc::clone(&pool);
|
||||
let rng_clone_local = Arc::clone(&rng_clone);
|
||||
join_bg.spawn(async move {
|
||||
pool_clone
|
||||
.connect_primary_for_dc(
|
||||
dc,
|
||||
addrs,
|
||||
target_writers,
|
||||
rng_clone_local,
|
||||
connect_concurrency,
|
||||
)
|
||||
.await
|
||||
});
|
||||
}
|
||||
while join_bg.join_next().await.is_some() {}
|
||||
debug!(
|
||||
target_pool_size = pool_size,
|
||||
current_pool_size = pool.connection_count(),
|
||||
"Background ME reserve warmup finished"
|
||||
"Background ME saturation warmup finished"
|
||||
);
|
||||
});
|
||||
|
||||
@@ -140,62 +146,85 @@ impl MePool {
|
||||
self: Arc<Self>,
|
||||
dc: i32,
|
||||
mut addrs: Vec<(IpAddr, u16)>,
|
||||
target_writers: usize,
|
||||
rng: Arc<SecureRandom>,
|
||||
connect_concurrency: usize,
|
||||
) -> bool {
|
||||
if addrs.is_empty() {
|
||||
return false;
|
||||
}
|
||||
let target_writers = target_writers.max(1);
|
||||
addrs.shuffle(&mut rand::rng());
|
||||
if addrs.len() > 1 {
|
||||
let concurrency = 2usize;
|
||||
let mut join = tokio::task::JoinSet::new();
|
||||
let mut next_idx = 0usize;
|
||||
let endpoints: Vec<SocketAddr> = addrs
|
||||
.iter()
|
||||
.map(|(ip, port)| SocketAddr::new(*ip, *port))
|
||||
.collect();
|
||||
let endpoint_set: HashSet<SocketAddr> = endpoints.iter().copied().collect();
|
||||
|
||||
while next_idx < addrs.len() || !join.is_empty() {
|
||||
while next_idx < addrs.len() && join.len() < concurrency {
|
||||
let (ip, port) = addrs[next_idx];
|
||||
next_idx += 1;
|
||||
let addr = SocketAddr::new(ip, port);
|
||||
loop {
|
||||
let alive = self.active_writer_count_for_endpoints(&endpoint_set).await;
|
||||
if alive >= target_writers {
|
||||
info!(
|
||||
dc = %dc,
|
||||
alive,
|
||||
target_writers,
|
||||
"ME connected"
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
let missing = target_writers.saturating_sub(alive).max(1);
|
||||
let concurrency = connect_concurrency.max(1).min(missing);
|
||||
let mut join = tokio::task::JoinSet::new();
|
||||
for _ in 0..concurrency {
|
||||
let pool = Arc::clone(&self);
|
||||
let rng_clone = Arc::clone(&rng);
|
||||
let endpoints_clone = endpoints.clone();
|
||||
join.spawn(async move {
|
||||
(addr, pool.connect_one(addr, rng_clone.as_ref()).await)
|
||||
pool.connect_endpoints_round_robin(&endpoints_clone, rng_clone.as_ref())
|
||||
.await
|
||||
});
|
||||
}
|
||||
|
||||
let Some(res) = join.join_next().await else {
|
||||
break;
|
||||
};
|
||||
let mut progress = false;
|
||||
while let Some(res) = join.join_next().await {
|
||||
match res {
|
||||
Ok((addr, Ok(()))) => {
|
||||
info!(%addr, dc = %dc, "ME connected");
|
||||
join.abort_all();
|
||||
while join.join_next().await.is_some() {}
|
||||
return true;
|
||||
}
|
||||
Ok((addr, Err(e))) => {
|
||||
warn!(%addr, dc = %dc, error = %e, "ME connect failed, trying next");
|
||||
Ok(true) => {
|
||||
progress = true;
|
||||
}
|
||||
Ok(false) => {}
|
||||
Err(e) => {
|
||||
warn!(dc = %dc, error = %e, "ME connect task failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
warn!(dc = %dc, "All ME servers for DC failed at init");
|
||||
|
||||
let alive_after = self.active_writer_count_for_endpoints(&endpoint_set).await;
|
||||
if alive_after >= target_writers {
|
||||
info!(
|
||||
dc = %dc,
|
||||
alive = alive_after,
|
||||
target_writers,
|
||||
"ME connected"
|
||||
);
|
||||
return true;
|
||||
}
|
||||
if !progress {
|
||||
warn!(
|
||||
dc = %dc,
|
||||
alive = alive_after,
|
||||
target_writers,
|
||||
"All ME servers for DC failed at init"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
for (ip, port) in addrs {
|
||||
let addr = SocketAddr::new(ip, port);
|
||||
match self.connect_one(addr, rng.as_ref()).await {
|
||||
Ok(()) => {
|
||||
info!(%addr, dc = %dc, "ME connected");
|
||||
return true;
|
||||
}
|
||||
Err(e) => warn!(%addr, dc = %dc, error = %e, "ME connect failed, trying next"),
|
||||
if self.me_warmup_stagger_enabled {
|
||||
let jitter = rand::rng()
|
||||
.random_range(0..=self.me_warmup_step_jitter.as_millis() as u64);
|
||||
let delay_ms = self.me_warmup_step_delay.as_millis() as u64 + jitter;
|
||||
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
|
||||
}
|
||||
}
|
||||
warn!(dc = %dc, "All ME servers for DC failed at init");
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,6 +100,134 @@ pub(crate) struct MeApiRuntimeSnapshot {
|
||||
}
|
||||
|
||||
impl MePool {
|
||||
pub(crate) async fn admission_ready_conditional_cast(&self) -> bool {
|
||||
let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new();
|
||||
if self.decision.ipv4_me {
|
||||
let map = self.proxy_map_v4.read().await.clone();
|
||||
for (dc, addrs) in map {
|
||||
let abs_dc = dc.abs();
|
||||
if abs_dc == 0 {
|
||||
continue;
|
||||
}
|
||||
let Ok(dc_idx) = i16::try_from(abs_dc) else {
|
||||
continue;
|
||||
};
|
||||
let entry = endpoints_by_dc.entry(dc_idx).or_default();
|
||||
for (ip, port) in addrs {
|
||||
entry.insert(SocketAddr::new(ip, port));
|
||||
}
|
||||
}
|
||||
}
|
||||
if self.decision.ipv6_me {
|
||||
let map = self.proxy_map_v6.read().await.clone();
|
||||
for (dc, addrs) in map {
|
||||
let abs_dc = dc.abs();
|
||||
if abs_dc == 0 {
|
||||
continue;
|
||||
}
|
||||
let Ok(dc_idx) = i16::try_from(abs_dc) else {
|
||||
continue;
|
||||
};
|
||||
let entry = endpoints_by_dc.entry(dc_idx).or_default();
|
||||
for (ip, port) in addrs {
|
||||
entry.insert(SocketAddr::new(ip, port));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if endpoints_by_dc.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let writers = self.writers.read().await.clone();
|
||||
let mut live_writers_by_endpoint = HashMap::<SocketAddr, usize>::new();
|
||||
for writer in writers {
|
||||
if writer.draining.load(Ordering::Relaxed) {
|
||||
continue;
|
||||
}
|
||||
*live_writers_by_endpoint.entry(writer.addr).or_insert(0) += 1;
|
||||
}
|
||||
|
||||
for endpoints in endpoints_by_dc.values() {
|
||||
let alive: usize = endpoints
|
||||
.iter()
|
||||
.map(|endpoint| live_writers_by_endpoint.get(endpoint).copied().unwrap_or(0))
|
||||
.sum();
|
||||
if alive == 0 {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn admission_ready_full_floor(&self) -> bool {
|
||||
let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new();
|
||||
if self.decision.ipv4_me {
|
||||
let map = self.proxy_map_v4.read().await.clone();
|
||||
for (dc, addrs) in map {
|
||||
let abs_dc = dc.abs();
|
||||
if abs_dc == 0 {
|
||||
continue;
|
||||
}
|
||||
let Ok(dc_idx) = i16::try_from(abs_dc) else {
|
||||
continue;
|
||||
};
|
||||
let entry = endpoints_by_dc.entry(dc_idx).or_default();
|
||||
for (ip, port) in addrs {
|
||||
entry.insert(SocketAddr::new(ip, port));
|
||||
}
|
||||
}
|
||||
}
|
||||
if self.decision.ipv6_me {
|
||||
let map = self.proxy_map_v6.read().await.clone();
|
||||
for (dc, addrs) in map {
|
||||
let abs_dc = dc.abs();
|
||||
if abs_dc == 0 {
|
||||
continue;
|
||||
}
|
||||
let Ok(dc_idx) = i16::try_from(abs_dc) else {
|
||||
continue;
|
||||
};
|
||||
let entry = endpoints_by_dc.entry(dc_idx).or_default();
|
||||
for (ip, port) in addrs {
|
||||
entry.insert(SocketAddr::new(ip, port));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if endpoints_by_dc.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let writers = self.writers.read().await.clone();
|
||||
let mut live_writers_by_endpoint = HashMap::<SocketAddr, usize>::new();
|
||||
for writer in writers {
|
||||
if writer.draining.load(Ordering::Relaxed) {
|
||||
continue;
|
||||
}
|
||||
*live_writers_by_endpoint.entry(writer.addr).or_insert(0) += 1;
|
||||
}
|
||||
|
||||
for endpoints in endpoints_by_dc.values() {
|
||||
let endpoint_count = endpoints.len();
|
||||
if endpoint_count == 0 {
|
||||
return false;
|
||||
}
|
||||
let required = self.required_writers_for_dc_with_floor_mode(endpoint_count, false);
|
||||
let alive: usize = endpoints
|
||||
.iter()
|
||||
.map(|endpoint| live_writers_by_endpoint.get(endpoint).copied().unwrap_or(0))
|
||||
.sum();
|
||||
if alive < required {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
pub(crate) async fn api_status_snapshot(&self) -> MeApiStatusSnapshot {
|
||||
let now_epoch_secs = Self::now_epoch_secs();
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ use super::registry::ConnMeta;
|
||||
|
||||
const IDLE_WRITER_PENALTY_MID_SECS: u64 = 45;
|
||||
const IDLE_WRITER_PENALTY_HIGH_SECS: u64 = 55;
|
||||
const HYBRID_GLOBAL_BURST_PERIOD_ROUNDS: u32 = 4;
|
||||
|
||||
impl MePool {
|
||||
/// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default.
|
||||
@@ -55,6 +56,9 @@ impl MePool {
|
||||
let mut no_writer_deadline: Option<Instant> = None;
|
||||
let mut emergency_attempts = 0u32;
|
||||
let mut async_recovery_triggered = false;
|
||||
let mut hybrid_recovery_round = 0u32;
|
||||
let mut hybrid_last_recovery_at: Option<Instant> = None;
|
||||
let hybrid_wait_step = self.me_route_no_writer_wait.max(Duration::from_millis(50));
|
||||
|
||||
loop {
|
||||
if let Some(current) = self.registry.get_writer(conn_id).await {
|
||||
@@ -138,6 +142,18 @@ impl MePool {
|
||||
}
|
||||
continue;
|
||||
}
|
||||
MeRouteNoWriterMode::HybridAsyncPersistent => {
|
||||
self.maybe_trigger_hybrid_recovery(
|
||||
target_dc,
|
||||
&mut hybrid_recovery_round,
|
||||
&mut hybrid_last_recovery_at,
|
||||
hybrid_wait_step,
|
||||
)
|
||||
.await;
|
||||
let deadline = Instant::now() + hybrid_wait_step;
|
||||
let _ = self.wait_for_writer_until(deadline).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
ws.clone()
|
||||
@@ -215,6 +231,18 @@ impl MePool {
|
||||
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
|
||||
}
|
||||
}
|
||||
MeRouteNoWriterMode::HybridAsyncPersistent => {
|
||||
self.maybe_trigger_hybrid_recovery(
|
||||
target_dc,
|
||||
&mut hybrid_recovery_round,
|
||||
&mut hybrid_last_recovery_at,
|
||||
hybrid_wait_step,
|
||||
)
|
||||
.await;
|
||||
let deadline = Instant::now() + hybrid_wait_step;
|
||||
let _ = self.wait_for_candidate_until(target_dc, deadline).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
let writer_idle_since = self.registry.writer_idle_since_snapshot().await;
|
||||
@@ -459,6 +487,28 @@ impl MePool {
|
||||
preferred
|
||||
}
|
||||
|
||||
async fn maybe_trigger_hybrid_recovery(
|
||||
self: &Arc<Self>,
|
||||
target_dc: i16,
|
||||
hybrid_recovery_round: &mut u32,
|
||||
hybrid_last_recovery_at: &mut Option<Instant>,
|
||||
hybrid_wait_step: Duration,
|
||||
) {
|
||||
if let Some(last) = *hybrid_last_recovery_at
|
||||
&& last.elapsed() < hybrid_wait_step
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let round = *hybrid_recovery_round;
|
||||
let target_triggered = self.trigger_async_recovery_for_target_dc(target_dc).await;
|
||||
if !target_triggered || round % HYBRID_GLOBAL_BURST_PERIOD_ROUNDS == 0 {
|
||||
self.trigger_async_recovery_global().await;
|
||||
}
|
||||
*hybrid_recovery_round = round.saturating_add(1);
|
||||
*hybrid_last_recovery_at = Some(Instant::now());
|
||||
}
|
||||
|
||||
pub async fn send_close(self: &Arc<Self>, conn_id: u64) -> Result<()> {
|
||||
if let Some(w) = self.registry.get_writer(conn_id).await {
|
||||
let mut p = Vec::with_capacity(12);
|
||||
|
||||
Reference in New Issue
Block a user