From 4c2bc2f41fe52b161cb21062de722f44fab26f40 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 4 Mar 2026 01:42:24 +0300 Subject: [PATCH] Pool Status hooks in ME Registry Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/registry.rs | 96 ++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 4a66654..869030a 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -45,6 +45,12 @@ pub struct ConnWriter { pub tx: mpsc::Sender, } +#[derive(Clone, Debug, Default)] +pub(super) struct WriterActivitySnapshot { + pub bound_clients_by_writer: HashMap, + pub active_sessions_by_target_dc: HashMap, +} + struct RegistryInner { map: HashMap>, writers: HashMap>, @@ -241,6 +247,30 @@ impl ConnRegistry { inner.writer_idle_since_epoch_secs.clone() } + pub(super) async fn writer_activity_snapshot(&self) -> WriterActivitySnapshot { + let inner = self.inner.read().await; + let mut bound_clients_by_writer = HashMap::::new(); + let mut active_sessions_by_target_dc = HashMap::::new(); + + for (writer_id, conn_ids) in &inner.conns_for_writer { + bound_clients_by_writer.insert(*writer_id, conn_ids.len()); + } + for conn_meta in inner.meta.values() { + let dc_u16 = conn_meta.target_dc.unsigned_abs(); + if dc_u16 == 0 { + continue; + } + if let Ok(dc) = i16::try_from(dc_u16) { + *active_sessions_by_target_dc.entry(dc).or_insert(0) += 1; + } + } + + WriterActivitySnapshot { + bound_clients_by_writer, + active_sessions_by_target_dc, + } + } + pub async fn get_writer(&self, conn_id: u64) -> Option { let inner = self.inner.read().await; let writer_id = inner.writer_for_conn.get(&conn_id).cloned()?; @@ -288,3 +318,69 @@ impl ConnRegistry { .unwrap_or(true) } } + +#[cfg(test)] +mod tests { + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + use super::ConnMeta; + use super::ConnRegistry; + + #[tokio::test] + async fn writer_activity_snapshot_tracks_writer_and_dc_load() { + let registry = ConnRegistry::new(); + + let (conn_a, _rx_a) = registry.register().await; + let (conn_b, _rx_b) = registry.register().await; + let (conn_c, _rx_c) = registry.register().await; + let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8); + let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8); + + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443); + registry + .bind_writer( + conn_a, + 10, + writer_tx_a.clone(), + ConnMeta { + target_dc: 2, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await; + registry + .bind_writer( + conn_b, + 10, + writer_tx_a, + ConnMeta { + target_dc: -2, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await; + registry + .bind_writer( + conn_c, + 20, + writer_tx_b, + ConnMeta { + target_dc: 4, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await; + + let snapshot = registry.writer_activity_snapshot().await; + assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&2)); + assert_eq!(snapshot.bound_clients_by_writer.get(&20), Some(&1)); + assert_eq!(snapshot.active_sessions_by_target_dc.get(&2), Some(&2)); + assert_eq!(snapshot.active_sessions_by_target_dc.get(&4), Some(&1)); + } +}