Dashmap-driven Routing + Health Parallel + Family Runtime State

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-25 21:26:20 +03:00
parent ceae1564af
commit b94746a6e0
No known key found for this signature in database
5 changed files with 310 additions and 228 deletions

View File

@ -115,15 +115,13 @@ pub async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
);
std::process::exit(1);
}
} else {
if let Err(e) = std::fs::create_dir_all(data_path) {
eprintln!(
"[telemt] Can't create data_path {}: {}",
data_path.display(),
e
);
std::process::exit(1);
}
} else if let Err(e) = std::fs::create_dir_all(data_path) {
eprintln!(
"[telemt] Can't create data_path {}: {}",
data_path.display(),
e
);
std::process::exit(1);
}
if let Err(e) = std::env::set_current_dir(data_path) {

View File

@ -244,10 +244,9 @@ fn order_profiles(
if let Some(pos) = ordered
.iter()
.position(|profile| *profile == cached.profile)
&& pos != 0
{
if pos != 0 {
ordered.swap(0, pos);
}
ordered.swap(0, pos);
}
}

View File

@ -8,6 +8,7 @@ use std::time::{Duration, Instant};
use rand::RngExt;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::{debug, info, warn};
use crate::config::MeFloorMode;
@ -15,6 +16,7 @@ use crate::crypto::SecureRandom;
use crate::network::IpFamily;
use super::MePool;
use super::pool::MeFamilyRuntimeState;
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
#[allow(dead_code)]
@ -28,6 +30,9 @@ const HEALTH_RECONNECT_BUDGET_PER_CORE: usize = 2;
const HEALTH_RECONNECT_BUDGET_PER_DC: usize = 1;
const HEALTH_RECONNECT_BUDGET_MIN: usize = 4;
const HEALTH_RECONNECT_BUDGET_MAX: usize = 128;
const FAMILY_SUPPRESS_FAIL_STREAK_THRESHOLD: u32 = 5;
const FAMILY_SUPPRESS_DURATION_SECS: u64 = 60;
const FAMILY_RECOVER_SUCCESS_STREAK_TARGET: u32 = 2;
const HEALTH_DRAIN_CLOSE_BUDGET_PER_CORE: usize = 16;
const HEALTH_DRAIN_CLOSE_BUDGET_MIN: usize = 16;
const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256;
@ -57,6 +62,17 @@ struct FamilyFloorPlan {
target_writers_total: usize,
}
#[derive(Debug)]
struct FamilyReconnectOutcome {
key: (i32, IpFamily),
dc: i32,
family: IpFamily,
alive: usize,
required: usize,
endpoint_count: usize,
restored: usize,
}
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
@ -115,6 +131,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut floor_warn_next_allowed,
)
.await;
update_family_runtime_state(&pool, IpFamily::V4, v4_degraded);
update_family_runtime_state(&pool, IpFamily::V6, v6_degraded);
degraded_interval = v4_degraded || v6_degraded;
}
}
@ -430,6 +448,10 @@ async fn check_family(
floor_plan.active_writers_current,
floor_plan.warm_writers_current,
);
let live_writer_ids_by_addr = Arc::new(live_writer_ids_by_addr);
let writer_idle_since = Arc::new(writer_idle_since);
let bound_clients_by_writer = Arc::new(bound_clients_by_writer);
let mut reconnect_set = JoinSet::<FamilyReconnectOutcome>::new();
for (dc, endpoints) in dc_endpoints {
if endpoints.is_empty() {
@ -503,9 +525,9 @@ async fn check_family(
&endpoints,
alive,
required,
&live_writer_ids_by_addr,
&writer_idle_since,
&bound_clients_by_writer,
live_writer_ids_by_addr.as_ref(),
writer_idle_since.as_ref(),
bound_clients_by_writer.as_ref(),
idle_refresh_next_attempt,
)
.await;
@ -518,8 +540,8 @@ async fn check_family(
&endpoints,
alive,
required,
&live_writer_ids_by_addr,
&bound_clients_by_writer,
live_writer_ids_by_addr.as_ref(),
bound_clients_by_writer.as_ref(),
shadow_rotate_deadline,
)
.await;
@ -575,121 +597,165 @@ async fn check_family(
continue;
}
*inflight.entry(key).or_insert(0) += 1;
let mut restored = 0usize;
for _ in 0..missing {
let Ok(reconnect_permit) = reconnect_sem.clone().try_acquire_owned() else {
break;
};
if pool.active_contour_writer_count_total().await
>= floor_plan.active_cap_effective_total
{
let swapped = maybe_swap_idle_writer_for_cap(
pool,
rng,
dc,
family,
&endpoints,
&live_writer_ids_by_addr,
&writer_idle_since,
&bound_clients_by_writer,
let pool_for_reconnect = pool.clone();
let rng_for_reconnect = rng.clone();
let reconnect_sem_for_dc = reconnect_sem.clone();
let endpoints_for_dc = endpoints.clone();
let live_writer_ids_by_addr_for_dc = live_writer_ids_by_addr.clone();
let writer_idle_since_for_dc = writer_idle_since.clone();
let bound_clients_by_writer_for_dc = bound_clients_by_writer.clone();
let active_cap_effective_total = floor_plan.active_cap_effective_total;
reconnect_set.spawn(async move {
let mut restored = 0usize;
for _ in 0..missing {
let Ok(reconnect_permit) = reconnect_sem_for_dc.clone().try_acquire_owned() else {
break;
};
if pool_for_reconnect.active_contour_writer_count_total().await
>= active_cap_effective_total
{
let swapped = maybe_swap_idle_writer_for_cap(
&pool_for_reconnect,
&rng_for_reconnect,
dc,
family,
&endpoints_for_dc,
live_writer_ids_by_addr_for_dc.as_ref(),
writer_idle_since_for_dc.as_ref(),
bound_clients_by_writer_for_dc.as_ref(),
)
.await;
if swapped {
pool_for_reconnect
.stats
.increment_me_floor_swap_idle_total();
restored += 1;
continue;
}
pool_for_reconnect
.stats
.increment_me_floor_cap_block_total();
pool_for_reconnect
.stats
.increment_me_floor_swap_idle_failed_total();
debug!(
dc = %dc,
?family,
alive,
required,
active_cap_effective_total,
"Adaptive floor cap reached, reconnect attempt blocked"
);
break;
}
let res = tokio::time::timeout(
pool_for_reconnect.reconnect_runtime.me_one_timeout,
pool_for_reconnect.connect_endpoints_round_robin(
dc,
&endpoints_for_dc,
rng_for_reconnect.as_ref(),
),
)
.await;
if swapped {
pool.stats.increment_me_floor_swap_idle_total();
restored += 1;
continue;
match res {
Ok(true) => {
restored += 1;
pool_for_reconnect.stats.increment_me_reconnect_success();
}
Ok(false) => {
pool_for_reconnect.stats.increment_me_reconnect_attempt();
debug!(dc = %dc, ?family, "ME round-robin reconnect failed")
}
Err(_) => {
pool_for_reconnect.stats.increment_me_reconnect_attempt();
debug!(dc = %dc, ?family, "ME reconnect timed out");
}
}
pool.stats.increment_me_floor_cap_block_total();
pool.stats.increment_me_floor_swap_idle_failed_total();
debug!(
dc = %dc,
?family,
alive,
required,
active_cap_effective_total = floor_plan.active_cap_effective_total,
"Adaptive floor cap reached, reconnect attempt blocked"
);
break;
drop(reconnect_permit);
}
let res = tokio::time::timeout(
pool.reconnect_runtime.me_one_timeout,
pool.connect_endpoints_round_robin(dc, &endpoints, rng.as_ref()),
)
.await;
match res {
Ok(true) => {
restored += 1;
pool.stats.increment_me_reconnect_success();
}
Ok(false) => {
pool.stats.increment_me_reconnect_attempt();
debug!(dc = %dc, ?family, "ME round-robin reconnect failed")
}
Err(_) => {
pool.stats.increment_me_reconnect_attempt();
debug!(dc = %dc, ?family, "ME reconnect timed out");
}
}
drop(reconnect_permit);
}
let now_alive = alive + restored;
if now_alive >= required {
info!(
dc = %dc,
?family,
alive = now_alive,
FamilyReconnectOutcome {
key,
dc,
family,
alive,
required,
endpoint_count = endpoints.len(),
endpoint_count: endpoints_for_dc.len(),
restored,
}
});
}
while let Some(joined) = reconnect_set.join_next().await {
let outcome = match joined {
Ok(outcome) => outcome,
Err(join_error) => {
debug!(error = %join_error, "Health reconnect task failed");
continue;
}
};
let now = Instant::now();
let now_alive = outcome.alive + outcome.restored;
if now_alive >= outcome.required {
info!(
dc = %outcome.dc,
family = ?outcome.family,
alive = now_alive,
required = outcome.required,
endpoint_count = outcome.endpoint_count,
"ME writer floor restored for DC"
);
backoff.insert(
key,
outcome.key,
pool.reconnect_runtime.me_reconnect_backoff_base.as_millis() as u64,
);
let jitter = pool.reconnect_runtime.me_reconnect_backoff_base.as_millis() as u64
/ JITTER_FRAC_NUM;
let wait = pool.reconnect_runtime.me_reconnect_backoff_base
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
next_attempt.insert(key, now + wait);
next_attempt.insert(outcome.key, now + wait);
} else {
let curr = *backoff
.get(&key)
.get(&outcome.key)
.unwrap_or(&(pool.reconnect_runtime.me_reconnect_backoff_base.as_millis() as u64));
let next_ms = (curr.saturating_mul(2))
.min(pool.reconnect_runtime.me_reconnect_backoff_cap.as_millis() as u64);
backoff.insert(key, next_ms);
backoff.insert(outcome.key, next_ms);
let jitter = next_ms / JITTER_FRAC_NUM;
let wait = Duration::from_millis(next_ms)
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
next_attempt.insert(key, now + wait);
next_attempt.insert(outcome.key, now + wait);
if pool.is_runtime_ready() {
let warn_cooldown = pool.warn_rate_limit_duration();
if should_emit_rate_limited_warn(floor_warn_next_allowed, key, now, warn_cooldown) {
if should_emit_rate_limited_warn(
floor_warn_next_allowed,
outcome.key,
now,
warn_cooldown,
) {
warn!(
dc = %dc,
?family,
dc = %outcome.dc,
family = ?outcome.family,
alive = now_alive,
required,
endpoint_count = endpoints.len(),
required = outcome.required,
endpoint_count = outcome.endpoint_count,
backoff_ms = next_ms,
"DC writer floor is below required level, scheduled reconnect"
);
}
} else {
info!(
dc = %dc,
?family,
dc = %outcome.dc,
family = ?outcome.family,
alive = now_alive,
required,
endpoint_count = endpoints.len(),
required = outcome.required,
endpoint_count = outcome.endpoint_count,
backoff_ms = next_ms,
"DC writer floor is below required level during startup, scheduled reconnect"
);
}
}
if let Some(v) = inflight.get_mut(&key) {
if let Some(v) = inflight.get_mut(&outcome.key) {
*v = v.saturating_sub(1);
}
}
@ -706,6 +772,68 @@ fn health_reconnect_budget(pool: &Arc<MePool>, dc_groups: usize) -> usize {
.clamp(HEALTH_RECONNECT_BUDGET_MIN, HEALTH_RECONNECT_BUDGET_MAX)
}
fn update_family_runtime_state(pool: &Arc<MePool>, family: IpFamily, degraded: bool) {
let now_epoch_secs = MePool::now_epoch_secs();
let previous_state = pool.family_runtime_state(family);
let mut state_since_epoch_secs = pool.family_runtime_state_since_epoch_secs(family);
let previous_suppressed_until_epoch_secs = pool.family_suppressed_until_epoch_secs(family);
let previous_fail_streak = pool.family_fail_streak(family);
let previous_recover_success_streak = pool.family_recover_success_streak(family);
let (next_state, suppressed_until_epoch_secs, fail_streak, recover_success_streak) =
if previous_suppressed_until_epoch_secs > now_epoch_secs {
let fail_streak = if degraded {
previous_fail_streak.saturating_add(1)
} else {
previous_fail_streak
};
(
MeFamilyRuntimeState::Suppressed,
previous_suppressed_until_epoch_secs,
fail_streak,
0,
)
} else if degraded {
let fail_streak = previous_fail_streak.saturating_add(1);
if fail_streak >= FAMILY_SUPPRESS_FAIL_STREAK_THRESHOLD {
(
MeFamilyRuntimeState::Suppressed,
now_epoch_secs.saturating_add(FAMILY_SUPPRESS_DURATION_SECS),
fail_streak,
0,
)
} else {
(MeFamilyRuntimeState::Degraded, 0, fail_streak, 0)
}
} else if matches!(previous_state, MeFamilyRuntimeState::Healthy) {
(MeFamilyRuntimeState::Healthy, 0, 0, 0)
} else {
let recover_success_streak = previous_recover_success_streak.saturating_add(1);
if recover_success_streak >= FAMILY_RECOVER_SUCCESS_STREAK_TARGET {
(MeFamilyRuntimeState::Healthy, 0, 0, 0)
} else {
(
MeFamilyRuntimeState::Recovering,
0,
0,
recover_success_streak,
)
}
};
if next_state != previous_state || state_since_epoch_secs == 0 {
state_since_epoch_secs = now_epoch_secs;
}
pool.set_family_runtime_state(
family,
next_state,
state_since_epoch_secs,
suppressed_until_epoch_secs,
fail_streak,
recover_success_streak,
);
}
fn should_emit_rate_limited_warn(
next_allowed: &mut HashMap<(i32, IpFamily), Instant>,
key: (i32, IpFamily),

View File

@ -202,15 +202,6 @@ impl FamilyHealthSnapshot {
}
impl MeFamilyRuntimeState {
pub(crate) fn from_u8(value: u8) -> Self {
match value {
1 => Self::Degraded,
2 => Self::Suppressed,
3 => Self::Recovering,
_ => Self::Healthy,
}
}
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Healthy => "healthy",
@ -852,12 +843,11 @@ impl MePool {
}
pub(super) fn notify_writer_epoch(&self) {
let _ = self.writer_epoch.send_modify(|epoch| {
self.writer_epoch.send_modify(|epoch| {
*epoch = epoch.wrapping_add(1);
});
}
#[allow(dead_code)]
pub(super) fn set_family_runtime_state(
&self,
family: IpFamily,

View File

@ -3,8 +3,9 @@ use std::net::SocketAddr;
use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use dashmap::DashMap;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{RwLock, mpsc};
use tokio::sync::{Mutex, mpsc};
use super::MeResponse;
use super::codec::WriterCommand;
@ -50,16 +51,15 @@ pub(super) struct WriterActivitySnapshot {
pub active_sessions_by_target_dc: HashMap<i16, usize>,
}
struct RegistryInner {
routing: RoutingTable,
binding: BindingState,
}
struct RoutingTable {
map: HashMap<u64, mpsc::Sender<MeResponse>>,
map: DashMap<u64, mpsc::Sender<MeResponse>>,
}
struct BindingState {
inner: Mutex<BindingInner>,
}
struct BindingInner {
writers: HashMap<u64, mpsc::Sender<WriterCommand>>,
writer_for_conn: HashMap<u64, u64>,
conns_for_writer: HashMap<u64, HashSet<u64>>,
@ -68,26 +68,22 @@ struct BindingState {
writer_idle_since_epoch_secs: HashMap<u64, u64>,
}
impl RegistryInner {
impl BindingInner {
fn new() -> Self {
Self {
routing: RoutingTable {
map: HashMap::new(),
},
binding: BindingState {
writers: HashMap::new(),
writer_for_conn: HashMap::new(),
conns_for_writer: HashMap::new(),
meta: HashMap::new(),
last_meta_for_writer: HashMap::new(),
writer_idle_since_epoch_secs: HashMap::new(),
},
writers: HashMap::new(),
writer_for_conn: HashMap::new(),
conns_for_writer: HashMap::new(),
meta: HashMap::new(),
last_meta_for_writer: HashMap::new(),
writer_idle_since_epoch_secs: HashMap::new(),
}
}
}
pub struct ConnRegistry {
inner: RwLock<RegistryInner>,
routing: RoutingTable,
binding: BindingState,
next_id: AtomicU64,
route_channel_capacity: usize,
route_backpressure_base_timeout_ms: AtomicU64,
@ -106,7 +102,12 @@ impl ConnRegistry {
pub fn with_route_channel_capacity(route_channel_capacity: usize) -> Self {
let start = rand::random::<u64>() | 1;
Self {
inner: RwLock::new(RegistryInner::new()),
routing: RoutingTable {
map: DashMap::new(),
},
binding: BindingState {
inner: Mutex::new(BindingInner::new()),
},
next_id: AtomicU64::new(start),
route_channel_capacity: route_channel_capacity.max(1),
route_backpressure_base_timeout_ms: AtomicU64::new(ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS),
@ -142,15 +143,14 @@ impl ConnRegistry {
pub async fn register(&self) -> (u64, mpsc::Receiver<MeResponse>) {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = mpsc::channel(self.route_channel_capacity);
self.inner.write().await.routing.map.insert(id, tx);
self.routing.map.insert(id, tx);
(id, rx)
}
pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender<WriterCommand>) {
let mut inner = self.inner.write().await;
inner.binding.writers.insert(writer_id, tx);
inner
.binding
let mut binding = self.binding.inner.lock().await;
binding.writers.insert(writer_id, tx);
binding
.conns_for_writer
.entry(writer_id)
.or_insert_with(HashSet::new);
@ -158,20 +158,18 @@ impl ConnRegistry {
/// Unregister connection, returning associated writer_id if any.
pub async fn unregister(&self, id: u64) -> Option<u64> {
let mut inner = self.inner.write().await;
inner.routing.map.remove(&id);
inner.binding.meta.remove(&id);
if let Some(writer_id) = inner.binding.writer_for_conn.remove(&id) {
let became_empty = if let Some(set) = inner.binding.conns_for_writer.get_mut(&writer_id)
{
self.routing.map.remove(&id);
let mut binding = self.binding.inner.lock().await;
binding.meta.remove(&id);
if let Some(writer_id) = binding.writer_for_conn.remove(&id) {
let became_empty = if let Some(set) = binding.conns_for_writer.get_mut(&writer_id) {
set.remove(&id);
set.is_empty()
} else {
false
};
if became_empty {
inner
.binding
binding
.writer_idle_since_epoch_secs
.insert(writer_id, Self::now_epoch_secs());
}
@ -182,10 +180,7 @@ impl ConnRegistry {
#[allow(dead_code)]
pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult {
let tx = {
let inner = self.inner.read().await;
inner.routing.map.get(&id).cloned()
};
let tx = self.routing.map.get(&id).map(|entry| entry.value().clone());
let Some(tx) = tx else {
return RouteResult::NoConn;
@ -238,10 +233,7 @@ impl ConnRegistry {
}
pub async fn route_nowait(&self, id: u64, resp: MeResponse) -> RouteResult {
let tx = {
let inner = self.inner.read().await;
inner.routing.map.get(&id).cloned()
};
let tx = self.routing.map.get(&id).map(|entry| entry.value().clone());
let Some(tx) = tx else {
return RouteResult::NoConn;
@ -264,10 +256,7 @@ impl ConnRegistry {
return self.route_nowait(id, resp).await;
}
let tx = {
let inner = self.inner.read().await;
inner.routing.map.get(&id).cloned()
};
let tx = self.routing.map.get(&id).map(|entry| entry.value().clone());
let Some(tx) = tx else {
return RouteResult::NoConn;
@ -306,44 +295,39 @@ impl ConnRegistry {
}
pub async fn bind_writer(&self, conn_id: u64, writer_id: u64, meta: ConnMeta) -> bool {
let mut inner = self.inner.write().await;
let mut binding = self.binding.inner.lock().await;
// ROUTING IS THE SOURCE OF TRUTH:
// never keep/attach writer binding for a connection that is already
// absent from the routing table.
if !inner.routing.map.contains_key(&conn_id) {
if !self.routing.map.contains_key(&conn_id) {
return false;
}
if !inner.binding.writers.contains_key(&writer_id) {
if !binding.writers.contains_key(&writer_id) {
return false;
}
let previous_writer_id = inner.binding.writer_for_conn.insert(conn_id, writer_id);
let previous_writer_id = binding.writer_for_conn.insert(conn_id, writer_id);
if let Some(previous_writer_id) = previous_writer_id
&& previous_writer_id != writer_id
{
let became_empty =
if let Some(set) = inner.binding.conns_for_writer.get_mut(&previous_writer_id) {
if let Some(set) = binding.conns_for_writer.get_mut(&previous_writer_id) {
set.remove(&conn_id);
set.is_empty()
} else {
false
};
if became_empty {
inner
.binding
binding
.writer_idle_since_epoch_secs
.insert(previous_writer_id, Self::now_epoch_secs());
}
}
inner.binding.meta.insert(conn_id, meta.clone());
inner.binding.last_meta_for_writer.insert(writer_id, meta);
inner
.binding
.writer_idle_since_epoch_secs
.remove(&writer_id);
inner
.binding
binding.meta.insert(conn_id, meta.clone());
binding.last_meta_for_writer.insert(writer_id, meta);
binding.writer_idle_since_epoch_secs.remove(&writer_id);
binding
.conns_for_writer
.entry(writer_id)
.or_insert_with(HashSet::new)
@ -352,39 +336,32 @@ impl ConnRegistry {
}
pub async fn mark_writer_idle(&self, writer_id: u64) {
let mut inner = self.inner.write().await;
inner
.binding
let mut binding = self.binding.inner.lock().await;
binding
.conns_for_writer
.entry(writer_id)
.or_insert_with(HashSet::new);
inner
.binding
binding
.writer_idle_since_epoch_secs
.entry(writer_id)
.or_insert(Self::now_epoch_secs());
}
pub async fn get_last_writer_meta(&self, writer_id: u64) -> Option<ConnMeta> {
let inner = self.inner.read().await;
inner.binding.last_meta_for_writer.get(&writer_id).cloned()
let binding = self.binding.inner.lock().await;
binding.last_meta_for_writer.get(&writer_id).cloned()
}
pub async fn writer_idle_since_snapshot(&self) -> HashMap<u64, u64> {
let inner = self.inner.read().await;
inner.binding.writer_idle_since_epoch_secs.clone()
let binding = self.binding.inner.lock().await;
binding.writer_idle_since_epoch_secs.clone()
}
pub async fn writer_idle_since_for_writer_ids(&self, writer_ids: &[u64]) -> HashMap<u64, u64> {
let inner = self.inner.read().await;
let binding = self.binding.inner.lock().await;
let mut out = HashMap::<u64, u64>::with_capacity(writer_ids.len());
for writer_id in writer_ids {
if let Some(idle_since) = inner
.binding
.writer_idle_since_epoch_secs
.get(writer_id)
.copied()
{
if let Some(idle_since) = binding.writer_idle_since_epoch_secs.get(writer_id).copied() {
out.insert(*writer_id, idle_since);
}
}
@ -392,14 +369,14 @@ impl ConnRegistry {
}
pub(super) async fn writer_activity_snapshot(&self) -> WriterActivitySnapshot {
let inner = self.inner.read().await;
let binding = self.binding.inner.lock().await;
let mut bound_clients_by_writer = HashMap::<u64, usize>::new();
let mut active_sessions_by_target_dc = HashMap::<i16, usize>::new();
for (writer_id, conn_ids) in &inner.binding.conns_for_writer {
for (writer_id, conn_ids) in &binding.conns_for_writer {
bound_clients_by_writer.insert(*writer_id, conn_ids.len());
}
for conn_meta in inner.binding.meta.values() {
for conn_meta in binding.meta.values() {
if conn_meta.target_dc == 0 {
continue;
}
@ -415,19 +392,18 @@ impl ConnRegistry {
}
pub async fn get_writer(&self, conn_id: u64) -> Option<ConnWriter> {
let mut inner = self.inner.write().await;
let mut binding = self.binding.inner.lock().await;
// ROUTING IS THE SOURCE OF TRUTH:
// stale bindings are ignored and lazily cleaned when routing no longer
// contains the connection.
if !inner.routing.map.contains_key(&conn_id) {
inner.binding.meta.remove(&conn_id);
if let Some(stale_writer_id) = inner.binding.writer_for_conn.remove(&conn_id)
&& let Some(conns) = inner.binding.conns_for_writer.get_mut(&stale_writer_id)
if !self.routing.map.contains_key(&conn_id) {
binding.meta.remove(&conn_id);
if let Some(stale_writer_id) = binding.writer_for_conn.remove(&conn_id)
&& let Some(conns) = binding.conns_for_writer.get_mut(&stale_writer_id)
{
conns.remove(&conn_id);
if conns.is_empty() {
inner
.binding
binding
.writer_idle_since_epoch_secs
.insert(stale_writer_id, Self::now_epoch_secs());
}
@ -435,15 +411,14 @@ impl ConnRegistry {
return None;
}
let writer_id = inner.binding.writer_for_conn.get(&conn_id).copied()?;
let Some(writer) = inner.binding.writers.get(&writer_id).cloned() else {
inner.binding.writer_for_conn.remove(&conn_id);
inner.binding.meta.remove(&conn_id);
if let Some(conns) = inner.binding.conns_for_writer.get_mut(&writer_id) {
let writer_id = binding.writer_for_conn.get(&conn_id).copied()?;
let Some(writer) = binding.writers.get(&writer_id).cloned() else {
binding.writer_for_conn.remove(&conn_id);
binding.meta.remove(&conn_id);
if let Some(conns) = binding.conns_for_writer.get_mut(&writer_id) {
conns.remove(&conn_id);
if conns.is_empty() {
inner
.binding
binding
.writer_idle_since_epoch_secs
.insert(writer_id, Self::now_epoch_secs());
}
@ -457,20 +432,16 @@ impl ConnRegistry {
}
pub async fn active_conn_ids(&self) -> Vec<u64> {
let inner = self.inner.read().await;
inner.binding.writer_for_conn.keys().copied().collect()
let binding = self.binding.inner.lock().await;
binding.writer_for_conn.keys().copied().collect()
}
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
let mut inner = self.inner.write().await;
inner.binding.writers.remove(&writer_id);
inner.binding.last_meta_for_writer.remove(&writer_id);
inner
.binding
.writer_idle_since_epoch_secs
.remove(&writer_id);
let conns = inner
.binding
let mut binding = self.binding.inner.lock().await;
binding.writers.remove(&writer_id);
binding.last_meta_for_writer.remove(&writer_id);
binding.writer_idle_since_epoch_secs.remove(&writer_id);
let conns = binding
.conns_for_writer
.remove(&writer_id)
.unwrap_or_default()
@ -479,11 +450,11 @@ impl ConnRegistry {
let mut out = Vec::new();
for conn_id in conns {
if inner.binding.writer_for_conn.get(&conn_id).copied() != Some(writer_id) {
if binding.writer_for_conn.get(&conn_id).copied() != Some(writer_id) {
continue;
}
inner.binding.writer_for_conn.remove(&conn_id);
if let Some(m) = inner.binding.meta.get(&conn_id) {
binding.writer_for_conn.remove(&conn_id);
if let Some(m) = binding.meta.get(&conn_id) {
out.push(BoundConn {
conn_id,
meta: m.clone(),
@ -495,14 +466,13 @@ impl ConnRegistry {
#[allow(dead_code)]
pub async fn get_meta(&self, conn_id: u64) -> Option<ConnMeta> {
let inner = self.inner.read().await;
inner.binding.meta.get(&conn_id).cloned()
let binding = self.binding.inner.lock().await;
binding.meta.get(&conn_id).cloned()
}
pub async fn is_writer_empty(&self, writer_id: u64) -> bool {
let inner = self.inner.read().await;
inner
.binding
let binding = self.binding.inner.lock().await;
binding
.conns_for_writer
.get(&writer_id)
.map(|s| s.is_empty())
@ -511,8 +481,8 @@ impl ConnRegistry {
#[allow(dead_code)]
pub async fn unregister_writer_if_empty(&self, writer_id: u64) -> bool {
let mut inner = self.inner.write().await;
let Some(conn_ids) = inner.binding.conns_for_writer.get(&writer_id) else {
let mut binding = self.binding.inner.lock().await;
let Some(conn_ids) = binding.conns_for_writer.get(&writer_id) else {
// Writer is already absent from the registry.
return true;
};
@ -520,22 +490,19 @@ impl ConnRegistry {
return false;
}
inner.binding.writers.remove(&writer_id);
inner.binding.last_meta_for_writer.remove(&writer_id);
inner
.binding
.writer_idle_since_epoch_secs
.remove(&writer_id);
inner.binding.conns_for_writer.remove(&writer_id);
binding.writers.remove(&writer_id);
binding.last_meta_for_writer.remove(&writer_id);
binding.writer_idle_since_epoch_secs.remove(&writer_id);
binding.conns_for_writer.remove(&writer_id);
true
}
#[allow(dead_code)]
pub(super) async fn non_empty_writer_ids(&self, writer_ids: &[u64]) -> HashSet<u64> {
let inner = self.inner.read().await;
let binding = self.binding.inner.lock().await;
let mut out = HashSet::<u64>::with_capacity(writer_ids.len());
for writer_id in writer_ids {
if let Some(conns) = inner.binding.conns_for_writer.get(writer_id)
if let Some(conns) = binding.conns_for_writer.get(writer_id)
&& !conns.is_empty()
{
out.insert(*writer_id);