mirror of https://github.com/telemt/telemt.git
ME Writer Rebinding - Lifecycle and Consistency fixes: merge pull request #422 from telemt/flow
ME Writer Rebinding - Lifecycle and Consistency fixes
This commit is contained in:
commit
1294da586f
|
|
@ -236,6 +236,8 @@ pub(super) struct MeWritersSummary {
|
|||
pub(super) required_writers: usize,
|
||||
pub(super) alive_writers: usize,
|
||||
pub(super) coverage_pct: f64,
|
||||
pub(super) fresh_alive_writers: usize,
|
||||
pub(super) fresh_coverage_pct: f64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
|
|
@ -250,6 +252,12 @@ pub(super) struct MeWriterStatus {
|
|||
pub(super) bound_clients: usize,
|
||||
pub(super) idle_for_secs: Option<u64>,
|
||||
pub(super) rtt_ema_ms: Option<f64>,
|
||||
pub(super) matches_active_generation: bool,
|
||||
pub(super) in_desired_map: bool,
|
||||
pub(super) allow_drain_fallback: bool,
|
||||
pub(super) drain_started_at_epoch_secs: Option<u64>,
|
||||
pub(super) drain_deadline_epoch_secs: Option<u64>,
|
||||
pub(super) drain_over_ttl: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
|
|
@ -276,6 +284,8 @@ pub(super) struct DcStatus {
|
|||
pub(super) floor_capped: bool,
|
||||
pub(super) alive_writers: usize,
|
||||
pub(super) coverage_pct: f64,
|
||||
pub(super) fresh_alive_writers: usize,
|
||||
pub(super) fresh_coverage_pct: f64,
|
||||
pub(super) rtt_ms: Option<f64>,
|
||||
pub(super) load: usize,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -314,6 +314,8 @@ async fn get_minimal_payload_cached(
|
|||
required_writers: status.required_writers,
|
||||
alive_writers: status.alive_writers,
|
||||
coverage_pct: status.coverage_pct,
|
||||
fresh_alive_writers: status.fresh_alive_writers,
|
||||
fresh_coverage_pct: status.fresh_coverage_pct,
|
||||
},
|
||||
writers: status
|
||||
.writers
|
||||
|
|
@ -329,6 +331,12 @@ async fn get_minimal_payload_cached(
|
|||
bound_clients: entry.bound_clients,
|
||||
idle_for_secs: entry.idle_for_secs,
|
||||
rtt_ema_ms: entry.rtt_ema_ms,
|
||||
matches_active_generation: entry.matches_active_generation,
|
||||
in_desired_map: entry.in_desired_map,
|
||||
allow_drain_fallback: entry.allow_drain_fallback,
|
||||
drain_started_at_epoch_secs: entry.drain_started_at_epoch_secs,
|
||||
drain_deadline_epoch_secs: entry.drain_deadline_epoch_secs,
|
||||
drain_over_ttl: entry.drain_over_ttl,
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
|
@ -363,6 +371,8 @@ async fn get_minimal_payload_cached(
|
|||
floor_capped: entry.floor_capped,
|
||||
alive_writers: entry.alive_writers,
|
||||
coverage_pct: entry.coverage_pct,
|
||||
fresh_alive_writers: entry.fresh_alive_writers,
|
||||
fresh_coverage_pct: entry.fresh_coverage_pct,
|
||||
rtt_ms: entry.rtt_ms,
|
||||
load: entry.load,
|
||||
})
|
||||
|
|
@ -486,6 +496,8 @@ fn disabled_me_writers(now_epoch_secs: u64, reason: &'static str) -> MeWritersDa
|
|||
required_writers: 0,
|
||||
alive_writers: 0,
|
||||
coverage_pct: 0.0,
|
||||
fresh_alive_writers: 0,
|
||||
fresh_coverage_pct: 0.0,
|
||||
},
|
||||
writers: Vec::new(),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -62,6 +62,7 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
|||
let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new();
|
||||
let mut degraded_interval = true;
|
||||
loop {
|
||||
let interval = if degraded_interval {
|
||||
|
|
@ -71,7 +72,7 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
|||
};
|
||||
tokio::time::sleep(interval).await;
|
||||
pool.prune_closed_writers().await;
|
||||
reap_draining_writers(&pool).await;
|
||||
reap_draining_writers(&pool, &mut drain_warn_next_allowed).await;
|
||||
let v4_degraded = check_family(
|
||||
IpFamily::V4,
|
||||
&pool,
|
||||
|
|
@ -110,17 +111,47 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
|||
}
|
||||
}
|
||||
|
||||
async fn reap_draining_writers(pool: &Arc<MePool>) {
|
||||
async fn reap_draining_writers(
|
||||
pool: &Arc<MePool>,
|
||||
warn_next_allowed: &mut HashMap<u64, Instant>,
|
||||
) {
|
||||
let now_epoch_secs = MePool::now_epoch_secs();
|
||||
let now = Instant::now();
|
||||
let drain_ttl_secs = pool.me_pool_drain_ttl_secs.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let writers = pool.writers.read().await.clone();
|
||||
for writer in writers {
|
||||
if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
continue;
|
||||
}
|
||||
if pool.registry.is_writer_empty(writer.id).await {
|
||||
let is_empty = pool.registry.is_writer_empty(writer.id).await;
|
||||
if is_empty {
|
||||
pool.remove_writer_and_close_clients(writer.id).await;
|
||||
continue;
|
||||
}
|
||||
let drain_started_at_epoch_secs = writer
|
||||
.draining_started_at_epoch_secs
|
||||
.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if drain_ttl_secs > 0
|
||||
&& drain_started_at_epoch_secs != 0
|
||||
&& now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > drain_ttl_secs
|
||||
&& should_emit_writer_warn(
|
||||
warn_next_allowed,
|
||||
writer.id,
|
||||
now,
|
||||
pool.warn_rate_limit_duration(),
|
||||
)
|
||||
{
|
||||
warn!(
|
||||
writer_id = writer.id,
|
||||
writer_dc = writer.writer_dc,
|
||||
endpoint = %writer.addr,
|
||||
generation = writer.generation,
|
||||
drain_ttl_secs,
|
||||
force_close_secs = pool.me_pool_force_close_secs.load(std::sync::atomic::Ordering::Relaxed),
|
||||
allow_drain_fallback = writer.allow_drain_fallback.load(std::sync::atomic::Ordering::Relaxed),
|
||||
"ME draining writer remains non-empty past drain TTL"
|
||||
);
|
||||
}
|
||||
let deadline_epoch_secs = writer
|
||||
.drain_deadline_epoch_secs
|
||||
.load(std::sync::atomic::Ordering::Relaxed);
|
||||
|
|
@ -132,6 +163,23 @@ async fn reap_draining_writers(pool: &Arc<MePool>) {
|
|||
}
|
||||
}
|
||||
|
||||
fn should_emit_writer_warn(
|
||||
next_allowed: &mut HashMap<u64, Instant>,
|
||||
writer_id: u64,
|
||||
now: Instant,
|
||||
cooldown: Duration,
|
||||
) -> bool {
|
||||
let Some(ready_at) = next_allowed.get(&writer_id).copied() else {
|
||||
next_allowed.insert(writer_id, now + cooldown);
|
||||
return true;
|
||||
};
|
||||
if now >= ready_at {
|
||||
next_allowed.insert(writer_id, now + cooldown);
|
||||
return true;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
async fn check_family(
|
||||
family: IpFamily,
|
||||
pool: &Arc<MePool>,
|
||||
|
|
|
|||
|
|
@ -19,6 +19,12 @@ pub(crate) struct MeApiWriterStatusSnapshot {
|
|||
pub bound_clients: usize,
|
||||
pub idle_for_secs: Option<u64>,
|
||||
pub rtt_ema_ms: Option<f64>,
|
||||
pub matches_active_generation: bool,
|
||||
pub in_desired_map: bool,
|
||||
pub allow_drain_fallback: bool,
|
||||
pub drain_started_at_epoch_secs: Option<u64>,
|
||||
pub drain_deadline_epoch_secs: Option<u64>,
|
||||
pub drain_over_ttl: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
|
@ -35,6 +41,8 @@ pub(crate) struct MeApiDcStatusSnapshot {
|
|||
pub floor_capped: bool,
|
||||
pub alive_writers: usize,
|
||||
pub coverage_pct: f64,
|
||||
pub fresh_alive_writers: usize,
|
||||
pub fresh_coverage_pct: f64,
|
||||
pub rtt_ms: Option<f64>,
|
||||
pub load: usize,
|
||||
}
|
||||
|
|
@ -55,6 +63,8 @@ pub(crate) struct MeApiStatusSnapshot {
|
|||
pub required_writers: usize,
|
||||
pub alive_writers: usize,
|
||||
pub coverage_pct: f64,
|
||||
pub fresh_alive_writers: usize,
|
||||
pub fresh_coverage_pct: f64,
|
||||
pub writers: Vec<MeApiWriterStatusSnapshot>,
|
||||
pub dcs: Vec<MeApiDcStatusSnapshot>,
|
||||
}
|
||||
|
|
@ -213,6 +223,8 @@ impl MePool {
|
|||
|
||||
pub(crate) async fn api_status_snapshot(&self) -> MeApiStatusSnapshot {
|
||||
let now_epoch_secs = Self::now_epoch_secs();
|
||||
let active_generation = self.current_generation();
|
||||
let drain_ttl_secs = self.me_pool_drain_ttl_secs.load(Ordering::Relaxed);
|
||||
|
||||
let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new();
|
||||
if self.decision.ipv4_me {
|
||||
|
|
@ -239,6 +251,7 @@ impl MePool {
|
|||
|
||||
let mut live_writers_by_dc_endpoint = HashMap::<(i16, SocketAddr), usize>::new();
|
||||
let mut live_writers_by_dc = HashMap::<i16, usize>::new();
|
||||
let mut fresh_writers_by_dc = HashMap::<i16, usize>::new();
|
||||
let mut dc_rtt_agg = HashMap::<i16, (f64, u64)>::new();
|
||||
let mut writer_rows = Vec::<MeApiWriterStatusSnapshot>::with_capacity(writers.len());
|
||||
|
||||
|
|
@ -247,6 +260,10 @@ impl MePool {
|
|||
let dc = i16::try_from(writer.writer_dc).ok();
|
||||
let draining = writer.draining.load(Ordering::Relaxed);
|
||||
let degraded = writer.degraded.load(Ordering::Relaxed);
|
||||
let matches_active_generation = writer.generation == active_generation;
|
||||
let in_desired_map = dc
|
||||
.and_then(|dc_idx| endpoints_by_dc.get(&dc_idx))
|
||||
.is_some_and(|endpoints| endpoints.contains(&endpoint));
|
||||
let bound_clients = activity
|
||||
.bound_clients_by_writer
|
||||
.get(&writer.id)
|
||||
|
|
@ -256,6 +273,21 @@ impl MePool {
|
|||
.get(&writer.id)
|
||||
.map(|idle_ts| now_epoch_secs.saturating_sub(*idle_ts));
|
||||
let rtt_ema_ms = rtt.get(&writer.id).map(|(_, ema)| *ema);
|
||||
let allow_drain_fallback = writer.allow_drain_fallback.load(Ordering::Relaxed);
|
||||
let drain_started_at_epoch_secs = writer
|
||||
.draining_started_at_epoch_secs
|
||||
.load(Ordering::Relaxed);
|
||||
let drain_deadline_epoch_secs = writer
|
||||
.drain_deadline_epoch_secs
|
||||
.load(Ordering::Relaxed);
|
||||
let drain_started_at_epoch_secs =
|
||||
(drain_started_at_epoch_secs != 0).then_some(drain_started_at_epoch_secs);
|
||||
let drain_deadline_epoch_secs =
|
||||
(drain_deadline_epoch_secs != 0).then_some(drain_deadline_epoch_secs);
|
||||
let drain_over_ttl = draining
|
||||
&& drain_ttl_secs > 0
|
||||
&& drain_started_at_epoch_secs
|
||||
.is_some_and(|started| now_epoch_secs.saturating_sub(started) > drain_ttl_secs);
|
||||
let state = match WriterContour::from_u8(writer.contour.load(Ordering::Relaxed)) {
|
||||
WriterContour::Warm => "warm",
|
||||
WriterContour::Active => "active",
|
||||
|
|
@ -273,6 +305,9 @@ impl MePool {
|
|||
entry.0 += ema_ms;
|
||||
entry.1 += 1;
|
||||
}
|
||||
if matches_active_generation && in_desired_map {
|
||||
*fresh_writers_by_dc.entry(dc_idx).or_insert(0) += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -287,6 +322,12 @@ impl MePool {
|
|||
bound_clients,
|
||||
idle_for_secs,
|
||||
rtt_ema_ms,
|
||||
matches_active_generation,
|
||||
in_desired_map,
|
||||
allow_drain_fallback,
|
||||
drain_started_at_epoch_secs,
|
||||
drain_deadline_epoch_secs,
|
||||
drain_over_ttl,
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -295,6 +336,7 @@ impl MePool {
|
|||
let mut dcs = Vec::<MeApiDcStatusSnapshot>::with_capacity(endpoints_by_dc.len());
|
||||
let mut available_endpoints = 0usize;
|
||||
let mut alive_writers = 0usize;
|
||||
let mut fresh_alive_writers = 0usize;
|
||||
let floor_mode = self.floor_mode();
|
||||
let adaptive_cpu_cores = (self
|
||||
.me_adaptive_floor_cpu_cores_effective
|
||||
|
|
@ -333,6 +375,7 @@ impl MePool {
|
|||
let floor_capped = matches!(floor_mode, MeFloorMode::Adaptive)
|
||||
&& dc_required_writers < base_required;
|
||||
let dc_alive_writers = live_writers_by_dc.get(&dc).copied().unwrap_or(0);
|
||||
let dc_fresh_alive_writers = fresh_writers_by_dc.get(&dc).copied().unwrap_or(0);
|
||||
let dc_load = activity
|
||||
.active_sessions_by_target_dc
|
||||
.get(&dc)
|
||||
|
|
@ -344,6 +387,7 @@ impl MePool {
|
|||
|
||||
available_endpoints += dc_available_endpoints;
|
||||
alive_writers += dc_alive_writers;
|
||||
fresh_alive_writers += dc_fresh_alive_writers;
|
||||
|
||||
dcs.push(MeApiDcStatusSnapshot {
|
||||
dc,
|
||||
|
|
@ -367,6 +411,8 @@ impl MePool {
|
|||
floor_capped,
|
||||
alive_writers: dc_alive_writers,
|
||||
coverage_pct: ratio_pct(dc_alive_writers, dc_required_writers),
|
||||
fresh_alive_writers: dc_fresh_alive_writers,
|
||||
fresh_coverage_pct: ratio_pct(dc_fresh_alive_writers, dc_required_writers),
|
||||
rtt_ms: dc_rtt_ms,
|
||||
load: dc_load,
|
||||
});
|
||||
|
|
@ -381,6 +427,8 @@ impl MePool {
|
|||
required_writers,
|
||||
alive_writers,
|
||||
coverage_pct: ratio_pct(alive_writers, required_writers),
|
||||
fresh_alive_writers,
|
||||
fresh_coverage_pct: ratio_pct(fresh_alive_writers, required_writers),
|
||||
writers: writer_rows,
|
||||
dcs,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -178,6 +178,7 @@ impl MePool {
|
|||
allow_drain_fallback: allow_drain_fallback.clone(),
|
||||
};
|
||||
self.writers.write().await.push(writer.clone());
|
||||
self.registry.register_writer(writer_id, tx.clone()).await;
|
||||
self.registry.mark_writer_idle(writer_id).await;
|
||||
self.conn_count.fetch_add(1, Ordering::Relaxed);
|
||||
self.writer_available.notify_one();
|
||||
|
|
@ -414,9 +415,15 @@ impl MePool {
|
|||
};
|
||||
|
||||
let (conn_id, mut service_rx) = pool.registry.register().await;
|
||||
pool.registry
|
||||
.bind_writer(conn_id, writer_id, tx_signal.clone(), meta.clone())
|
||||
.await;
|
||||
if !pool
|
||||
.registry
|
||||
.bind_writer(conn_id, writer_id, meta.clone())
|
||||
.await
|
||||
{
|
||||
let _ = pool.registry.unregister(conn_id).await;
|
||||
stats_signal.increment_me_rpc_proxy_req_signal_skipped_no_meta_total();
|
||||
continue;
|
||||
}
|
||||
|
||||
let payload = build_proxy_req_payload(
|
||||
conn_id,
|
||||
|
|
@ -521,6 +528,12 @@ impl MePool {
|
|||
self.conn_count.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
let conns = self.registry.writer_lost(writer_id).await;
|
||||
{
|
||||
let mut tracker = self.ping_tracker.lock().await;
|
||||
tracker.retain(|_, (_, wid)| *wid != writer_id);
|
||||
}
|
||||
self.rtt_stats.lock().await.remove(&writer_id);
|
||||
if let Some(tx) = close_tx {
|
||||
let _ = tx.send(WriterCommand::Close).await;
|
||||
}
|
||||
|
|
@ -533,8 +546,7 @@ impl MePool {
|
|||
}
|
||||
self.trigger_immediate_refill_for_dc(addr, writer_dc);
|
||||
}
|
||||
self.rtt_stats.lock().await.remove(&writer_id);
|
||||
self.registry.writer_lost(writer_id).await
|
||||
conns
|
||||
}
|
||||
|
||||
pub(crate) async fn mark_writer_draining_with_timeout(
|
||||
|
|
|
|||
|
|
@ -138,6 +138,15 @@ impl ConnRegistry {
|
|||
(id, rx)
|
||||
}
|
||||
|
||||
pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender<WriterCommand>) {
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.writers.insert(writer_id, tx);
|
||||
inner
|
||||
.conns_for_writer
|
||||
.entry(writer_id)
|
||||
.or_insert_with(HashSet::new);
|
||||
}
|
||||
|
||||
/// Unregister connection, returning associated writer_id if any.
|
||||
pub async fn unregister(&self, id: u64) -> Option<u64> {
|
||||
let mut inner = self.inner.write().await;
|
||||
|
|
@ -282,24 +291,39 @@ impl ConnRegistry {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn bind_writer(
|
||||
&self,
|
||||
conn_id: u64,
|
||||
writer_id: u64,
|
||||
tx: mpsc::Sender<WriterCommand>,
|
||||
meta: ConnMeta,
|
||||
) {
|
||||
pub async fn bind_writer(&self, conn_id: u64, writer_id: u64, meta: ConnMeta) -> bool {
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.meta.entry(conn_id).or_insert(meta.clone());
|
||||
inner.writer_for_conn.insert(conn_id, writer_id);
|
||||
if !inner.writers.contains_key(&writer_id) {
|
||||
return false;
|
||||
}
|
||||
|
||||
let previous_writer_id = inner.writer_for_conn.insert(conn_id, writer_id);
|
||||
if let Some(previous_writer_id) = previous_writer_id
|
||||
&& previous_writer_id != writer_id
|
||||
{
|
||||
let became_empty = if let Some(set) = inner.conns_for_writer.get_mut(&previous_writer_id)
|
||||
{
|
||||
set.remove(&conn_id);
|
||||
set.is_empty()
|
||||
} else {
|
||||
false
|
||||
};
|
||||
if became_empty {
|
||||
inner
|
||||
.writer_idle_since_epoch_secs
|
||||
.insert(previous_writer_id, Self::now_epoch_secs());
|
||||
}
|
||||
}
|
||||
|
||||
inner.meta.insert(conn_id, meta.clone());
|
||||
inner.last_meta_for_writer.insert(writer_id, meta);
|
||||
inner.writer_idle_since_epoch_secs.remove(&writer_id);
|
||||
inner.writers.entry(writer_id).or_insert_with(|| tx.clone());
|
||||
inner
|
||||
.conns_for_writer
|
||||
.entry(writer_id)
|
||||
.or_insert_with(HashSet::new)
|
||||
.insert(conn_id);
|
||||
true
|
||||
}
|
||||
|
||||
pub async fn mark_writer_idle(&self, writer_id: u64) {
|
||||
|
|
@ -384,6 +408,9 @@ impl ConnRegistry {
|
|||
|
||||
let mut out = Vec::new();
|
||||
for conn_id in conns {
|
||||
if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) {
|
||||
continue;
|
||||
}
|
||||
inner.writer_for_conn.remove(&conn_id);
|
||||
if let Some(m) = inner.meta.get(&conn_id) {
|
||||
out.push(BoundConn {
|
||||
|
|
@ -427,47 +454,52 @@ mod tests {
|
|||
let (conn_c, _rx_c) = registry.register().await;
|
||||
let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8);
|
||||
let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8);
|
||||
registry.register_writer(10, writer_tx_a.clone()).await;
|
||||
registry.register_writer(20, writer_tx_b.clone()).await;
|
||||
|
||||
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
|
||||
registry
|
||||
.bind_writer(
|
||||
conn_a,
|
||||
10,
|
||||
writer_tx_a.clone(),
|
||||
ConnMeta {
|
||||
target_dc: 2,
|
||||
client_addr: addr,
|
||||
our_addr: addr,
|
||||
proto_flags: 0,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
registry
|
||||
.bind_writer(
|
||||
conn_b,
|
||||
10,
|
||||
writer_tx_a,
|
||||
ConnMeta {
|
||||
target_dc: -2,
|
||||
client_addr: addr,
|
||||
our_addr: addr,
|
||||
proto_flags: 0,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
registry
|
||||
.bind_writer(
|
||||
conn_c,
|
||||
20,
|
||||
writer_tx_b,
|
||||
ConnMeta {
|
||||
target_dc: 4,
|
||||
client_addr: addr,
|
||||
our_addr: addr,
|
||||
proto_flags: 0,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
registry
|
||||
.bind_writer(
|
||||
conn_a,
|
||||
10,
|
||||
ConnMeta {
|
||||
target_dc: 2,
|
||||
client_addr: addr,
|
||||
our_addr: addr,
|
||||
proto_flags: 0,
|
||||
},
|
||||
)
|
||||
.await
|
||||
);
|
||||
assert!(
|
||||
registry
|
||||
.bind_writer(
|
||||
conn_b,
|
||||
10,
|
||||
ConnMeta {
|
||||
target_dc: -2,
|
||||
client_addr: addr,
|
||||
our_addr: addr,
|
||||
proto_flags: 0,
|
||||
},
|
||||
)
|
||||
.await
|
||||
);
|
||||
assert!(
|
||||
registry
|
||||
.bind_writer(
|
||||
conn_c,
|
||||
20,
|
||||
ConnMeta {
|
||||
target_dc: 4,
|
||||
client_addr: addr,
|
||||
our_addr: addr,
|
||||
proto_flags: 0,
|
||||
},
|
||||
)
|
||||
.await
|
||||
);
|
||||
|
||||
let snapshot = registry.writer_activity_snapshot().await;
|
||||
assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&2));
|
||||
|
|
@ -476,4 +508,130 @@ mod tests {
|
|||
assert_eq!(snapshot.active_sessions_by_target_dc.get(&-2), Some(&1));
|
||||
assert_eq!(snapshot.active_sessions_by_target_dc.get(&4), Some(&1));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn bind_writer_rebinds_conn_atomically() {
|
||||
let registry = ConnRegistry::new();
|
||||
let (conn_id, _rx) = registry.register().await;
|
||||
let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8);
|
||||
let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8);
|
||||
registry.register_writer(10, writer_tx_a).await;
|
||||
registry.register_writer(20, writer_tx_b).await;
|
||||
|
||||
let client_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
|
||||
let first_our_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)), 443);
|
||||
let second_our_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(2, 2, 2, 2)), 443);
|
||||
|
||||
assert!(
|
||||
registry
|
||||
.bind_writer(
|
||||
conn_id,
|
||||
10,
|
||||
ConnMeta {
|
||||
target_dc: 2,
|
||||
client_addr,
|
||||
our_addr: first_our_addr,
|
||||
proto_flags: 1,
|
||||
},
|
||||
)
|
||||
.await
|
||||
);
|
||||
assert!(
|
||||
registry
|
||||
.bind_writer(
|
||||
conn_id,
|
||||
20,
|
||||
ConnMeta {
|
||||
target_dc: 2,
|
||||
client_addr,
|
||||
our_addr: second_our_addr,
|
||||
proto_flags: 2,
|
||||
},
|
||||
)
|
||||
.await
|
||||
);
|
||||
|
||||
let writer = registry.get_writer(conn_id).await.expect("writer binding");
|
||||
assert_eq!(writer.writer_id, 20);
|
||||
|
||||
let meta = registry.get_meta(conn_id).await.expect("conn meta");
|
||||
assert_eq!(meta.our_addr, second_our_addr);
|
||||
assert_eq!(meta.proto_flags, 2);
|
||||
|
||||
let snapshot = registry.writer_activity_snapshot().await;
|
||||
assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&0));
|
||||
assert_eq!(snapshot.bound_clients_by_writer.get(&20), Some(&1));
|
||||
assert!(registry.writer_idle_since_snapshot().await.contains_key(&10));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn writer_lost_does_not_drop_rebound_conn() {
|
||||
let registry = ConnRegistry::new();
|
||||
let (conn_id, _rx) = registry.register().await;
|
||||
let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8);
|
||||
let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8);
|
||||
registry.register_writer(10, writer_tx_a).await;
|
||||
registry.register_writer(20, writer_tx_b).await;
|
||||
|
||||
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
|
||||
assert!(
|
||||
registry
|
||||
.bind_writer(
|
||||
conn_id,
|
||||
10,
|
||||
ConnMeta {
|
||||
target_dc: 2,
|
||||
client_addr: addr,
|
||||
our_addr: addr,
|
||||
proto_flags: 0,
|
||||
},
|
||||
)
|
||||
.await
|
||||
);
|
||||
assert!(
|
||||
registry
|
||||
.bind_writer(
|
||||
conn_id,
|
||||
20,
|
||||
ConnMeta {
|
||||
target_dc: 2,
|
||||
client_addr: addr,
|
||||
our_addr: addr,
|
||||
proto_flags: 1,
|
||||
},
|
||||
)
|
||||
.await
|
||||
);
|
||||
|
||||
let lost = registry.writer_lost(10).await;
|
||||
assert!(lost.is_empty());
|
||||
assert_eq!(registry.get_writer(conn_id).await.expect("writer").writer_id, 20);
|
||||
|
||||
let removed_writer = registry.unregister(conn_id).await;
|
||||
assert_eq!(removed_writer, Some(20));
|
||||
assert!(registry.is_writer_empty(20).await);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn bind_writer_rejects_unregistered_writer() {
|
||||
let registry = ConnRegistry::new();
|
||||
let (conn_id, _rx) = registry.register().await;
|
||||
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
|
||||
|
||||
assert!(
|
||||
!registry
|
||||
.bind_writer(
|
||||
conn_id,
|
||||
10,
|
||||
ConnMeta {
|
||||
target_dc: 2,
|
||||
client_addr: addr,
|
||||
our_addr: addr,
|
||||
proto_flags: 0,
|
||||
},
|
||||
)
|
||||
.await
|
||||
);
|
||||
assert!(registry.get_writer(conn_id).await.is_none());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -375,9 +375,14 @@ impl MePool {
|
|||
match w.tx.try_send(WriterCommand::Data(payload.clone())) {
|
||||
Ok(()) => {
|
||||
self.stats.increment_me_writer_pick_success_try_total(pick_mode);
|
||||
self.registry
|
||||
.bind_writer(conn_id, w.id, w.tx.clone(), meta)
|
||||
.await;
|
||||
if !self.registry.bind_writer(conn_id, w.id, meta).await {
|
||||
debug!(
|
||||
conn_id,
|
||||
writer_id = w.id,
|
||||
"ME writer disappeared before bind commit, retrying"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
if w.generation < self.current_generation() {
|
||||
self.stats.increment_pool_stale_pick_total();
|
||||
debug!(
|
||||
|
|
@ -421,9 +426,14 @@ impl MePool {
|
|||
Ok(()) => {
|
||||
self.stats
|
||||
.increment_me_writer_pick_success_fallback_total(pick_mode);
|
||||
self.registry
|
||||
.bind_writer(conn_id, w.id, w.tx.clone(), meta)
|
||||
.await;
|
||||
if !self.registry.bind_writer(conn_id, w.id, meta).await {
|
||||
debug!(
|
||||
conn_id,
|
||||
writer_id = w.id,
|
||||
"ME writer disappeared before fallback bind commit, retrying"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
if w.generation < self.current_generation() {
|
||||
self.stats.increment_pool_stale_pick_total();
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue