From b94746a6e0b34d90333dca54aeaae356b709e378 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 25 Mar 2026 21:26:20 +0300 Subject: [PATCH] Dashmap-driven Routing + Health Parallel + Family Runtime State Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/maestro/mod.rs | 16 +- src/tls_front/fetcher.rs | 5 +- src/transport/middle_proxy/health.rs | 290 ++++++++++++++++++------- src/transport/middle_proxy/pool.rs | 12 +- src/transport/middle_proxy/registry.rs | 215 ++++++++---------- 5 files changed, 310 insertions(+), 228 deletions(-) diff --git a/src/maestro/mod.rs b/src/maestro/mod.rs index 7d3b168..5f3fd3a 100644 --- a/src/maestro/mod.rs +++ b/src/maestro/mod.rs @@ -115,15 +115,13 @@ pub async fn run() -> std::result::Result<(), Box> { ); std::process::exit(1); } - } else { - if let Err(e) = std::fs::create_dir_all(data_path) { - eprintln!( - "[telemt] Can't create data_path {}: {}", - data_path.display(), - e - ); - std::process::exit(1); - } + } else if let Err(e) = std::fs::create_dir_all(data_path) { + eprintln!( + "[telemt] Can't create data_path {}: {}", + data_path.display(), + e + ); + std::process::exit(1); } if let Err(e) = std::env::set_current_dir(data_path) { diff --git a/src/tls_front/fetcher.rs b/src/tls_front/fetcher.rs index bbfc336..45d56ce 100644 --- a/src/tls_front/fetcher.rs +++ b/src/tls_front/fetcher.rs @@ -244,10 +244,9 @@ fn order_profiles( if let Some(pos) = ordered .iter() .position(|profile| *profile == cached.profile) + && pos != 0 { - if pos != 0 { - ordered.swap(0, pos); - } + ordered.swap(0, pos); } } diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index ca6e681..257d8f3 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -8,6 +8,7 @@ use std::time::{Duration, Instant}; use rand::RngExt; use tokio::sync::Semaphore; +use tokio::task::JoinSet; use tracing::{debug, info, warn}; use crate::config::MeFloorMode; @@ -15,6 +16,7 @@ use crate::crypto::SecureRandom; use crate::network::IpFamily; use super::MePool; +use super::pool::MeFamilyRuntimeState; const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff #[allow(dead_code)] @@ -28,6 +30,9 @@ const HEALTH_RECONNECT_BUDGET_PER_CORE: usize = 2; const HEALTH_RECONNECT_BUDGET_PER_DC: usize = 1; const HEALTH_RECONNECT_BUDGET_MIN: usize = 4; const HEALTH_RECONNECT_BUDGET_MAX: usize = 128; +const FAMILY_SUPPRESS_FAIL_STREAK_THRESHOLD: u32 = 5; +const FAMILY_SUPPRESS_DURATION_SECS: u64 = 60; +const FAMILY_RECOVER_SUCCESS_STREAK_TARGET: u32 = 2; const HEALTH_DRAIN_CLOSE_BUDGET_PER_CORE: usize = 16; const HEALTH_DRAIN_CLOSE_BUDGET_MIN: usize = 16; const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256; @@ -57,6 +62,17 @@ struct FamilyFloorPlan { target_writers_total: usize, } +#[derive(Debug)] +struct FamilyReconnectOutcome { + key: (i32, IpFamily), + dc: i32, + family: IpFamily, + alive: usize, + required: usize, + endpoint_count: usize, + restored: usize, +} + pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); @@ -115,6 +131,8 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut floor_warn_next_allowed, ) .await; + update_family_runtime_state(&pool, IpFamily::V4, v4_degraded); + update_family_runtime_state(&pool, IpFamily::V6, v6_degraded); degraded_interval = v4_degraded || v6_degraded; } } @@ -430,6 +448,10 @@ async fn check_family( floor_plan.active_writers_current, floor_plan.warm_writers_current, ); + let live_writer_ids_by_addr = Arc::new(live_writer_ids_by_addr); + let writer_idle_since = Arc::new(writer_idle_since); + let bound_clients_by_writer = Arc::new(bound_clients_by_writer); + let mut reconnect_set = JoinSet::::new(); for (dc, endpoints) in dc_endpoints { if endpoints.is_empty() { @@ -503,9 +525,9 @@ async fn check_family( &endpoints, alive, required, - &live_writer_ids_by_addr, - &writer_idle_since, - &bound_clients_by_writer, + live_writer_ids_by_addr.as_ref(), + writer_idle_since.as_ref(), + bound_clients_by_writer.as_ref(), idle_refresh_next_attempt, ) .await; @@ -518,8 +540,8 @@ async fn check_family( &endpoints, alive, required, - &live_writer_ids_by_addr, - &bound_clients_by_writer, + live_writer_ids_by_addr.as_ref(), + bound_clients_by_writer.as_ref(), shadow_rotate_deadline, ) .await; @@ -575,121 +597,165 @@ async fn check_family( continue; } *inflight.entry(key).or_insert(0) += 1; - - let mut restored = 0usize; - for _ in 0..missing { - let Ok(reconnect_permit) = reconnect_sem.clone().try_acquire_owned() else { - break; - }; - if pool.active_contour_writer_count_total().await - >= floor_plan.active_cap_effective_total - { - let swapped = maybe_swap_idle_writer_for_cap( - pool, - rng, - dc, - family, - &endpoints, - &live_writer_ids_by_addr, - &writer_idle_since, - &bound_clients_by_writer, + let pool_for_reconnect = pool.clone(); + let rng_for_reconnect = rng.clone(); + let reconnect_sem_for_dc = reconnect_sem.clone(); + let endpoints_for_dc = endpoints.clone(); + let live_writer_ids_by_addr_for_dc = live_writer_ids_by_addr.clone(); + let writer_idle_since_for_dc = writer_idle_since.clone(); + let bound_clients_by_writer_for_dc = bound_clients_by_writer.clone(); + let active_cap_effective_total = floor_plan.active_cap_effective_total; + reconnect_set.spawn(async move { + let mut restored = 0usize; + for _ in 0..missing { + let Ok(reconnect_permit) = reconnect_sem_for_dc.clone().try_acquire_owned() else { + break; + }; + if pool_for_reconnect.active_contour_writer_count_total().await + >= active_cap_effective_total + { + let swapped = maybe_swap_idle_writer_for_cap( + &pool_for_reconnect, + &rng_for_reconnect, + dc, + family, + &endpoints_for_dc, + live_writer_ids_by_addr_for_dc.as_ref(), + writer_idle_since_for_dc.as_ref(), + bound_clients_by_writer_for_dc.as_ref(), + ) + .await; + if swapped { + pool_for_reconnect + .stats + .increment_me_floor_swap_idle_total(); + restored += 1; + continue; + } + pool_for_reconnect + .stats + .increment_me_floor_cap_block_total(); + pool_for_reconnect + .stats + .increment_me_floor_swap_idle_failed_total(); + debug!( + dc = %dc, + ?family, + alive, + required, + active_cap_effective_total, + "Adaptive floor cap reached, reconnect attempt blocked" + ); + break; + } + let res = tokio::time::timeout( + pool_for_reconnect.reconnect_runtime.me_one_timeout, + pool_for_reconnect.connect_endpoints_round_robin( + dc, + &endpoints_for_dc, + rng_for_reconnect.as_ref(), + ), ) .await; - if swapped { - pool.stats.increment_me_floor_swap_idle_total(); - restored += 1; - continue; + match res { + Ok(true) => { + restored += 1; + pool_for_reconnect.stats.increment_me_reconnect_success(); + } + Ok(false) => { + pool_for_reconnect.stats.increment_me_reconnect_attempt(); + debug!(dc = %dc, ?family, "ME round-robin reconnect failed") + } + Err(_) => { + pool_for_reconnect.stats.increment_me_reconnect_attempt(); + debug!(dc = %dc, ?family, "ME reconnect timed out"); + } } - pool.stats.increment_me_floor_cap_block_total(); - pool.stats.increment_me_floor_swap_idle_failed_total(); - debug!( - dc = %dc, - ?family, - alive, - required, - active_cap_effective_total = floor_plan.active_cap_effective_total, - "Adaptive floor cap reached, reconnect attempt blocked" - ); - break; + drop(reconnect_permit); } - let res = tokio::time::timeout( - pool.reconnect_runtime.me_one_timeout, - pool.connect_endpoints_round_robin(dc, &endpoints, rng.as_ref()), - ) - .await; - match res { - Ok(true) => { - restored += 1; - pool.stats.increment_me_reconnect_success(); - } - Ok(false) => { - pool.stats.increment_me_reconnect_attempt(); - debug!(dc = %dc, ?family, "ME round-robin reconnect failed") - } - Err(_) => { - pool.stats.increment_me_reconnect_attempt(); - debug!(dc = %dc, ?family, "ME reconnect timed out"); - } - } - drop(reconnect_permit); - } - let now_alive = alive + restored; - if now_alive >= required { - info!( - dc = %dc, - ?family, - alive = now_alive, + FamilyReconnectOutcome { + key, + dc, + family, + alive, required, - endpoint_count = endpoints.len(), + endpoint_count: endpoints_for_dc.len(), + restored, + } + }); + } + + while let Some(joined) = reconnect_set.join_next().await { + let outcome = match joined { + Ok(outcome) => outcome, + Err(join_error) => { + debug!(error = %join_error, "Health reconnect task failed"); + continue; + } + }; + let now = Instant::now(); + let now_alive = outcome.alive + outcome.restored; + if now_alive >= outcome.required { + info!( + dc = %outcome.dc, + family = ?outcome.family, + alive = now_alive, + required = outcome.required, + endpoint_count = outcome.endpoint_count, "ME writer floor restored for DC" ); backoff.insert( - key, + outcome.key, pool.reconnect_runtime.me_reconnect_backoff_base.as_millis() as u64, ); let jitter = pool.reconnect_runtime.me_reconnect_backoff_base.as_millis() as u64 / JITTER_FRAC_NUM; let wait = pool.reconnect_runtime.me_reconnect_backoff_base + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); - next_attempt.insert(key, now + wait); + next_attempt.insert(outcome.key, now + wait); } else { let curr = *backoff - .get(&key) + .get(&outcome.key) .unwrap_or(&(pool.reconnect_runtime.me_reconnect_backoff_base.as_millis() as u64)); let next_ms = (curr.saturating_mul(2)) .min(pool.reconnect_runtime.me_reconnect_backoff_cap.as_millis() as u64); - backoff.insert(key, next_ms); + backoff.insert(outcome.key, next_ms); let jitter = next_ms / JITTER_FRAC_NUM; let wait = Duration::from_millis(next_ms) + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); - next_attempt.insert(key, now + wait); + next_attempt.insert(outcome.key, now + wait); if pool.is_runtime_ready() { let warn_cooldown = pool.warn_rate_limit_duration(); - if should_emit_rate_limited_warn(floor_warn_next_allowed, key, now, warn_cooldown) { + if should_emit_rate_limited_warn( + floor_warn_next_allowed, + outcome.key, + now, + warn_cooldown, + ) { warn!( - dc = %dc, - ?family, + dc = %outcome.dc, + family = ?outcome.family, alive = now_alive, - required, - endpoint_count = endpoints.len(), + required = outcome.required, + endpoint_count = outcome.endpoint_count, backoff_ms = next_ms, "DC writer floor is below required level, scheduled reconnect" ); } } else { info!( - dc = %dc, - ?family, + dc = %outcome.dc, + family = ?outcome.family, alive = now_alive, - required, - endpoint_count = endpoints.len(), + required = outcome.required, + endpoint_count = outcome.endpoint_count, backoff_ms = next_ms, "DC writer floor is below required level during startup, scheduled reconnect" ); } } - if let Some(v) = inflight.get_mut(&key) { + if let Some(v) = inflight.get_mut(&outcome.key) { *v = v.saturating_sub(1); } } @@ -706,6 +772,68 @@ fn health_reconnect_budget(pool: &Arc, dc_groups: usize) -> usize { .clamp(HEALTH_RECONNECT_BUDGET_MIN, HEALTH_RECONNECT_BUDGET_MAX) } +fn update_family_runtime_state(pool: &Arc, family: IpFamily, degraded: bool) { + let now_epoch_secs = MePool::now_epoch_secs(); + let previous_state = pool.family_runtime_state(family); + let mut state_since_epoch_secs = pool.family_runtime_state_since_epoch_secs(family); + let previous_suppressed_until_epoch_secs = pool.family_suppressed_until_epoch_secs(family); + let previous_fail_streak = pool.family_fail_streak(family); + let previous_recover_success_streak = pool.family_recover_success_streak(family); + + let (next_state, suppressed_until_epoch_secs, fail_streak, recover_success_streak) = + if previous_suppressed_until_epoch_secs > now_epoch_secs { + let fail_streak = if degraded { + previous_fail_streak.saturating_add(1) + } else { + previous_fail_streak + }; + ( + MeFamilyRuntimeState::Suppressed, + previous_suppressed_until_epoch_secs, + fail_streak, + 0, + ) + } else if degraded { + let fail_streak = previous_fail_streak.saturating_add(1); + if fail_streak >= FAMILY_SUPPRESS_FAIL_STREAK_THRESHOLD { + ( + MeFamilyRuntimeState::Suppressed, + now_epoch_secs.saturating_add(FAMILY_SUPPRESS_DURATION_SECS), + fail_streak, + 0, + ) + } else { + (MeFamilyRuntimeState::Degraded, 0, fail_streak, 0) + } + } else if matches!(previous_state, MeFamilyRuntimeState::Healthy) { + (MeFamilyRuntimeState::Healthy, 0, 0, 0) + } else { + let recover_success_streak = previous_recover_success_streak.saturating_add(1); + if recover_success_streak >= FAMILY_RECOVER_SUCCESS_STREAK_TARGET { + (MeFamilyRuntimeState::Healthy, 0, 0, 0) + } else { + ( + MeFamilyRuntimeState::Recovering, + 0, + 0, + recover_success_streak, + ) + } + }; + + if next_state != previous_state || state_since_epoch_secs == 0 { + state_since_epoch_secs = now_epoch_secs; + } + pool.set_family_runtime_state( + family, + next_state, + state_since_epoch_secs, + suppressed_until_epoch_secs, + fail_streak, + recover_success_streak, + ); +} + fn should_emit_rate_limited_warn( next_allowed: &mut HashMap<(i32, IpFamily), Instant>, key: (i32, IpFamily), diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index af37a0b..249d387 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -202,15 +202,6 @@ impl FamilyHealthSnapshot { } impl MeFamilyRuntimeState { - pub(crate) fn from_u8(value: u8) -> Self { - match value { - 1 => Self::Degraded, - 2 => Self::Suppressed, - 3 => Self::Recovering, - _ => Self::Healthy, - } - } - pub(crate) fn as_str(self) -> &'static str { match self { Self::Healthy => "healthy", @@ -852,12 +843,11 @@ impl MePool { } pub(super) fn notify_writer_epoch(&self) { - let _ = self.writer_epoch.send_modify(|epoch| { + self.writer_epoch.send_modify(|epoch| { *epoch = epoch.wrapping_add(1); }); } - #[allow(dead_code)] pub(super) fn set_family_runtime_state( &self, family: IpFamily, diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 6d830a1..ff4a68b 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -3,8 +3,9 @@ use std::net::SocketAddr; use std::sync::atomic::{AtomicU8, AtomicU64, Ordering}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use dashmap::DashMap; use tokio::sync::mpsc::error::TrySendError; -use tokio::sync::{RwLock, mpsc}; +use tokio::sync::{Mutex, mpsc}; use super::MeResponse; use super::codec::WriterCommand; @@ -50,16 +51,15 @@ pub(super) struct WriterActivitySnapshot { pub active_sessions_by_target_dc: HashMap, } -struct RegistryInner { - routing: RoutingTable, - binding: BindingState, -} - struct RoutingTable { - map: HashMap>, + map: DashMap>, } struct BindingState { + inner: Mutex, +} + +struct BindingInner { writers: HashMap>, writer_for_conn: HashMap, conns_for_writer: HashMap>, @@ -68,26 +68,22 @@ struct BindingState { writer_idle_since_epoch_secs: HashMap, } -impl RegistryInner { +impl BindingInner { fn new() -> Self { Self { - routing: RoutingTable { - map: HashMap::new(), - }, - binding: BindingState { - writers: HashMap::new(), - writer_for_conn: HashMap::new(), - conns_for_writer: HashMap::new(), - meta: HashMap::new(), - last_meta_for_writer: HashMap::new(), - writer_idle_since_epoch_secs: HashMap::new(), - }, + writers: HashMap::new(), + writer_for_conn: HashMap::new(), + conns_for_writer: HashMap::new(), + meta: HashMap::new(), + last_meta_for_writer: HashMap::new(), + writer_idle_since_epoch_secs: HashMap::new(), } } } pub struct ConnRegistry { - inner: RwLock, + routing: RoutingTable, + binding: BindingState, next_id: AtomicU64, route_channel_capacity: usize, route_backpressure_base_timeout_ms: AtomicU64, @@ -106,7 +102,12 @@ impl ConnRegistry { pub fn with_route_channel_capacity(route_channel_capacity: usize) -> Self { let start = rand::random::() | 1; Self { - inner: RwLock::new(RegistryInner::new()), + routing: RoutingTable { + map: DashMap::new(), + }, + binding: BindingState { + inner: Mutex::new(BindingInner::new()), + }, next_id: AtomicU64::new(start), route_channel_capacity: route_channel_capacity.max(1), route_backpressure_base_timeout_ms: AtomicU64::new(ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS), @@ -142,15 +143,14 @@ impl ConnRegistry { pub async fn register(&self) -> (u64, mpsc::Receiver) { let id = self.next_id.fetch_add(1, Ordering::Relaxed); let (tx, rx) = mpsc::channel(self.route_channel_capacity); - self.inner.write().await.routing.map.insert(id, tx); + self.routing.map.insert(id, tx); (id, rx) } pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender) { - let mut inner = self.inner.write().await; - inner.binding.writers.insert(writer_id, tx); - inner - .binding + let mut binding = self.binding.inner.lock().await; + binding.writers.insert(writer_id, tx); + binding .conns_for_writer .entry(writer_id) .or_insert_with(HashSet::new); @@ -158,20 +158,18 @@ impl ConnRegistry { /// Unregister connection, returning associated writer_id if any. pub async fn unregister(&self, id: u64) -> Option { - let mut inner = self.inner.write().await; - inner.routing.map.remove(&id); - inner.binding.meta.remove(&id); - if let Some(writer_id) = inner.binding.writer_for_conn.remove(&id) { - let became_empty = if let Some(set) = inner.binding.conns_for_writer.get_mut(&writer_id) - { + self.routing.map.remove(&id); + let mut binding = self.binding.inner.lock().await; + binding.meta.remove(&id); + if let Some(writer_id) = binding.writer_for_conn.remove(&id) { + let became_empty = if let Some(set) = binding.conns_for_writer.get_mut(&writer_id) { set.remove(&id); set.is_empty() } else { false }; if became_empty { - inner - .binding + binding .writer_idle_since_epoch_secs .insert(writer_id, Self::now_epoch_secs()); } @@ -182,10 +180,7 @@ impl ConnRegistry { #[allow(dead_code)] pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult { - let tx = { - let inner = self.inner.read().await; - inner.routing.map.get(&id).cloned() - }; + let tx = self.routing.map.get(&id).map(|entry| entry.value().clone()); let Some(tx) = tx else { return RouteResult::NoConn; @@ -238,10 +233,7 @@ impl ConnRegistry { } pub async fn route_nowait(&self, id: u64, resp: MeResponse) -> RouteResult { - let tx = { - let inner = self.inner.read().await; - inner.routing.map.get(&id).cloned() - }; + let tx = self.routing.map.get(&id).map(|entry| entry.value().clone()); let Some(tx) = tx else { return RouteResult::NoConn; @@ -264,10 +256,7 @@ impl ConnRegistry { return self.route_nowait(id, resp).await; } - let tx = { - let inner = self.inner.read().await; - inner.routing.map.get(&id).cloned() - }; + let tx = self.routing.map.get(&id).map(|entry| entry.value().clone()); let Some(tx) = tx else { return RouteResult::NoConn; @@ -306,44 +295,39 @@ impl ConnRegistry { } pub async fn bind_writer(&self, conn_id: u64, writer_id: u64, meta: ConnMeta) -> bool { - let mut inner = self.inner.write().await; + let mut binding = self.binding.inner.lock().await; // ROUTING IS THE SOURCE OF TRUTH: // never keep/attach writer binding for a connection that is already // absent from the routing table. - if !inner.routing.map.contains_key(&conn_id) { + if !self.routing.map.contains_key(&conn_id) { return false; } - if !inner.binding.writers.contains_key(&writer_id) { + if !binding.writers.contains_key(&writer_id) { return false; } - let previous_writer_id = inner.binding.writer_for_conn.insert(conn_id, writer_id); + let previous_writer_id = binding.writer_for_conn.insert(conn_id, writer_id); if let Some(previous_writer_id) = previous_writer_id && previous_writer_id != writer_id { let became_empty = - if let Some(set) = inner.binding.conns_for_writer.get_mut(&previous_writer_id) { + if let Some(set) = binding.conns_for_writer.get_mut(&previous_writer_id) { set.remove(&conn_id); set.is_empty() } else { false }; if became_empty { - inner - .binding + binding .writer_idle_since_epoch_secs .insert(previous_writer_id, Self::now_epoch_secs()); } } - inner.binding.meta.insert(conn_id, meta.clone()); - inner.binding.last_meta_for_writer.insert(writer_id, meta); - inner - .binding - .writer_idle_since_epoch_secs - .remove(&writer_id); - inner - .binding + binding.meta.insert(conn_id, meta.clone()); + binding.last_meta_for_writer.insert(writer_id, meta); + binding.writer_idle_since_epoch_secs.remove(&writer_id); + binding .conns_for_writer .entry(writer_id) .or_insert_with(HashSet::new) @@ -352,39 +336,32 @@ impl ConnRegistry { } pub async fn mark_writer_idle(&self, writer_id: u64) { - let mut inner = self.inner.write().await; - inner - .binding + let mut binding = self.binding.inner.lock().await; + binding .conns_for_writer .entry(writer_id) .or_insert_with(HashSet::new); - inner - .binding + binding .writer_idle_since_epoch_secs .entry(writer_id) .or_insert(Self::now_epoch_secs()); } pub async fn get_last_writer_meta(&self, writer_id: u64) -> Option { - let inner = self.inner.read().await; - inner.binding.last_meta_for_writer.get(&writer_id).cloned() + let binding = self.binding.inner.lock().await; + binding.last_meta_for_writer.get(&writer_id).cloned() } pub async fn writer_idle_since_snapshot(&self) -> HashMap { - let inner = self.inner.read().await; - inner.binding.writer_idle_since_epoch_secs.clone() + let binding = self.binding.inner.lock().await; + binding.writer_idle_since_epoch_secs.clone() } pub async fn writer_idle_since_for_writer_ids(&self, writer_ids: &[u64]) -> HashMap { - let inner = self.inner.read().await; + let binding = self.binding.inner.lock().await; let mut out = HashMap::::with_capacity(writer_ids.len()); for writer_id in writer_ids { - if let Some(idle_since) = inner - .binding - .writer_idle_since_epoch_secs - .get(writer_id) - .copied() - { + if let Some(idle_since) = binding.writer_idle_since_epoch_secs.get(writer_id).copied() { out.insert(*writer_id, idle_since); } } @@ -392,14 +369,14 @@ impl ConnRegistry { } pub(super) async fn writer_activity_snapshot(&self) -> WriterActivitySnapshot { - let inner = self.inner.read().await; + let binding = self.binding.inner.lock().await; let mut bound_clients_by_writer = HashMap::::new(); let mut active_sessions_by_target_dc = HashMap::::new(); - for (writer_id, conn_ids) in &inner.binding.conns_for_writer { + for (writer_id, conn_ids) in &binding.conns_for_writer { bound_clients_by_writer.insert(*writer_id, conn_ids.len()); } - for conn_meta in inner.binding.meta.values() { + for conn_meta in binding.meta.values() { if conn_meta.target_dc == 0 { continue; } @@ -415,19 +392,18 @@ impl ConnRegistry { } pub async fn get_writer(&self, conn_id: u64) -> Option { - let mut inner = self.inner.write().await; + let mut binding = self.binding.inner.lock().await; // ROUTING IS THE SOURCE OF TRUTH: // stale bindings are ignored and lazily cleaned when routing no longer // contains the connection. - if !inner.routing.map.contains_key(&conn_id) { - inner.binding.meta.remove(&conn_id); - if let Some(stale_writer_id) = inner.binding.writer_for_conn.remove(&conn_id) - && let Some(conns) = inner.binding.conns_for_writer.get_mut(&stale_writer_id) + if !self.routing.map.contains_key(&conn_id) { + binding.meta.remove(&conn_id); + if let Some(stale_writer_id) = binding.writer_for_conn.remove(&conn_id) + && let Some(conns) = binding.conns_for_writer.get_mut(&stale_writer_id) { conns.remove(&conn_id); if conns.is_empty() { - inner - .binding + binding .writer_idle_since_epoch_secs .insert(stale_writer_id, Self::now_epoch_secs()); } @@ -435,15 +411,14 @@ impl ConnRegistry { return None; } - let writer_id = inner.binding.writer_for_conn.get(&conn_id).copied()?; - let Some(writer) = inner.binding.writers.get(&writer_id).cloned() else { - inner.binding.writer_for_conn.remove(&conn_id); - inner.binding.meta.remove(&conn_id); - if let Some(conns) = inner.binding.conns_for_writer.get_mut(&writer_id) { + let writer_id = binding.writer_for_conn.get(&conn_id).copied()?; + let Some(writer) = binding.writers.get(&writer_id).cloned() else { + binding.writer_for_conn.remove(&conn_id); + binding.meta.remove(&conn_id); + if let Some(conns) = binding.conns_for_writer.get_mut(&writer_id) { conns.remove(&conn_id); if conns.is_empty() { - inner - .binding + binding .writer_idle_since_epoch_secs .insert(writer_id, Self::now_epoch_secs()); } @@ -457,20 +432,16 @@ impl ConnRegistry { } pub async fn active_conn_ids(&self) -> Vec { - let inner = self.inner.read().await; - inner.binding.writer_for_conn.keys().copied().collect() + let binding = self.binding.inner.lock().await; + binding.writer_for_conn.keys().copied().collect() } pub async fn writer_lost(&self, writer_id: u64) -> Vec { - let mut inner = self.inner.write().await; - inner.binding.writers.remove(&writer_id); - inner.binding.last_meta_for_writer.remove(&writer_id); - inner - .binding - .writer_idle_since_epoch_secs - .remove(&writer_id); - let conns = inner - .binding + let mut binding = self.binding.inner.lock().await; + binding.writers.remove(&writer_id); + binding.last_meta_for_writer.remove(&writer_id); + binding.writer_idle_since_epoch_secs.remove(&writer_id); + let conns = binding .conns_for_writer .remove(&writer_id) .unwrap_or_default() @@ -479,11 +450,11 @@ impl ConnRegistry { let mut out = Vec::new(); for conn_id in conns { - if inner.binding.writer_for_conn.get(&conn_id).copied() != Some(writer_id) { + if binding.writer_for_conn.get(&conn_id).copied() != Some(writer_id) { continue; } - inner.binding.writer_for_conn.remove(&conn_id); - if let Some(m) = inner.binding.meta.get(&conn_id) { + binding.writer_for_conn.remove(&conn_id); + if let Some(m) = binding.meta.get(&conn_id) { out.push(BoundConn { conn_id, meta: m.clone(), @@ -495,14 +466,13 @@ impl ConnRegistry { #[allow(dead_code)] pub async fn get_meta(&self, conn_id: u64) -> Option { - let inner = self.inner.read().await; - inner.binding.meta.get(&conn_id).cloned() + let binding = self.binding.inner.lock().await; + binding.meta.get(&conn_id).cloned() } pub async fn is_writer_empty(&self, writer_id: u64) -> bool { - let inner = self.inner.read().await; - inner - .binding + let binding = self.binding.inner.lock().await; + binding .conns_for_writer .get(&writer_id) .map(|s| s.is_empty()) @@ -511,8 +481,8 @@ impl ConnRegistry { #[allow(dead_code)] pub async fn unregister_writer_if_empty(&self, writer_id: u64) -> bool { - let mut inner = self.inner.write().await; - let Some(conn_ids) = inner.binding.conns_for_writer.get(&writer_id) else { + let mut binding = self.binding.inner.lock().await; + let Some(conn_ids) = binding.conns_for_writer.get(&writer_id) else { // Writer is already absent from the registry. return true; }; @@ -520,22 +490,19 @@ impl ConnRegistry { return false; } - inner.binding.writers.remove(&writer_id); - inner.binding.last_meta_for_writer.remove(&writer_id); - inner - .binding - .writer_idle_since_epoch_secs - .remove(&writer_id); - inner.binding.conns_for_writer.remove(&writer_id); + binding.writers.remove(&writer_id); + binding.last_meta_for_writer.remove(&writer_id); + binding.writer_idle_since_epoch_secs.remove(&writer_id); + binding.conns_for_writer.remove(&writer_id); true } #[allow(dead_code)] pub(super) async fn non_empty_writer_ids(&self, writer_ids: &[u64]) -> HashSet { - let inner = self.inner.read().await; + let binding = self.binding.inner.lock().await; let mut out = HashSet::::with_capacity(writer_ids.len()); for writer_id in writer_ids { - if let Some(conns) = inner.binding.conns_for_writer.get(writer_id) + if let Some(conns) = binding.conns_for_writer.get(writer_id) && !conns.is_empty() { out.insert(*writer_id);