diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index ef5a766..6f19789 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -425,15 +425,8 @@ impl MePool { }; let (conn_id, mut service_rx) = pool.registry.register().await; - if !pool - .registry - .bind_writer(conn_id, writer_id, meta.clone()) - .await - { - let _ = pool.registry.unregister(conn_id).await; - stats_signal.increment_me_rpc_proxy_req_signal_skipped_no_meta_total(); - continue; - } + // Service RPC_PROXY_REQ signal path is intentionally route-only: + // do not bind synthetic conn_id into regular writer/client accounting. let payload = build_proxy_req_payload( conn_id, diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index becd8c4..8277e7f 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -51,7 +51,15 @@ pub(super) struct WriterActivitySnapshot { } struct RegistryInner { + routing: RoutingTable, + binding: BindingState, +} + +struct RoutingTable { map: HashMap>, +} + +struct BindingState { writers: HashMap>, writer_for_conn: HashMap, conns_for_writer: HashMap>, @@ -63,13 +71,17 @@ struct RegistryInner { impl RegistryInner { fn new() -> Self { Self { - map: HashMap::new(), - writers: HashMap::new(), - writer_for_conn: HashMap::new(), - conns_for_writer: HashMap::new(), - meta: HashMap::new(), - last_meta_for_writer: HashMap::new(), - writer_idle_since_epoch_secs: HashMap::new(), + routing: RoutingTable { + map: HashMap::new(), + }, + binding: BindingState { + writers: HashMap::new(), + writer_for_conn: HashMap::new(), + conns_for_writer: HashMap::new(), + meta: HashMap::new(), + last_meta_for_writer: HashMap::new(), + writer_idle_since_epoch_secs: HashMap::new(), + }, } } } @@ -130,14 +142,15 @@ impl ConnRegistry { pub async fn register(&self) -> (u64, mpsc::Receiver) { let id = self.next_id.fetch_add(1, Ordering::Relaxed); let (tx, rx) = mpsc::channel(self.route_channel_capacity); - self.inner.write().await.map.insert(id, tx); + self.inner.write().await.routing.map.insert(id, tx); (id, rx) } pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender) { let mut inner = self.inner.write().await; - inner.writers.insert(writer_id, tx); + inner.binding.writers.insert(writer_id, tx); inner + .binding .conns_for_writer .entry(writer_id) .or_insert_with(HashSet::new); @@ -146,10 +159,10 @@ impl ConnRegistry { /// Unregister connection, returning associated writer_id if any. pub async fn unregister(&self, id: u64) -> Option { let mut inner = self.inner.write().await; - inner.map.remove(&id); - inner.meta.remove(&id); - if let Some(writer_id) = inner.writer_for_conn.remove(&id) { - let became_empty = if let Some(set) = inner.conns_for_writer.get_mut(&writer_id) { + inner.routing.map.remove(&id); + inner.binding.meta.remove(&id); + if let Some(writer_id) = inner.binding.writer_for_conn.remove(&id) { + let became_empty = if let Some(set) = inner.binding.conns_for_writer.get_mut(&writer_id) { set.remove(&id); set.is_empty() } else { @@ -157,6 +170,7 @@ impl ConnRegistry { }; if became_empty { inner + .binding .writer_idle_since_epoch_secs .insert(writer_id, Self::now_epoch_secs()); } @@ -169,7 +183,7 @@ impl ConnRegistry { pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult { let tx = { let inner = self.inner.read().await; - inner.map.get(&id).cloned() + inner.routing.map.get(&id).cloned() }; let Some(tx) = tx else { @@ -225,7 +239,7 @@ impl ConnRegistry { pub async fn route_nowait(&self, id: u64, resp: MeResponse) -> RouteResult { let tx = { let inner = self.inner.read().await; - inner.map.get(&id).cloned() + inner.routing.map.get(&id).cloned() }; let Some(tx) = tx else { @@ -251,7 +265,7 @@ impl ConnRegistry { let tx = { let inner = self.inner.read().await; - inner.map.get(&id).cloned() + inner.routing.map.get(&id).cloned() }; let Some(tx) = tx else { @@ -295,19 +309,19 @@ impl ConnRegistry { // ROUTING IS THE SOURCE OF TRUTH: // never keep/attach writer binding for a connection that is already // absent from the routing table. - if !inner.map.contains_key(&conn_id) { + if !inner.routing.map.contains_key(&conn_id) { return false; } - if !inner.writers.contains_key(&writer_id) { + if !inner.binding.writers.contains_key(&writer_id) { return false; } - let previous_writer_id = inner.writer_for_conn.insert(conn_id, writer_id); + let previous_writer_id = inner.binding.writer_for_conn.insert(conn_id, writer_id); if let Some(previous_writer_id) = previous_writer_id && previous_writer_id != writer_id { let became_empty = - if let Some(set) = inner.conns_for_writer.get_mut(&previous_writer_id) { + if let Some(set) = inner.binding.conns_for_writer.get_mut(&previous_writer_id) { set.remove(&conn_id); set.is_empty() } else { @@ -315,15 +329,17 @@ impl ConnRegistry { }; if became_empty { inner + .binding .writer_idle_since_epoch_secs .insert(previous_writer_id, Self::now_epoch_secs()); } } - inner.meta.insert(conn_id, meta.clone()); - inner.last_meta_for_writer.insert(writer_id, meta); - inner.writer_idle_since_epoch_secs.remove(&writer_id); + inner.binding.meta.insert(conn_id, meta.clone()); + inner.binding.last_meta_for_writer.insert(writer_id, meta); + inner.binding.writer_idle_since_epoch_secs.remove(&writer_id); inner + .binding .conns_for_writer .entry(writer_id) .or_insert_with(HashSet::new) @@ -334,10 +350,12 @@ impl ConnRegistry { pub async fn mark_writer_idle(&self, writer_id: u64) { let mut inner = self.inner.write().await; inner + .binding .conns_for_writer .entry(writer_id) .or_insert_with(HashSet::new); inner + .binding .writer_idle_since_epoch_secs .entry(writer_id) .or_insert(Self::now_epoch_secs()); @@ -345,19 +363,19 @@ impl ConnRegistry { pub async fn get_last_writer_meta(&self, writer_id: u64) -> Option { let inner = self.inner.read().await; - inner.last_meta_for_writer.get(&writer_id).cloned() + inner.binding.last_meta_for_writer.get(&writer_id).cloned() } pub async fn writer_idle_since_snapshot(&self) -> HashMap { let inner = self.inner.read().await; - inner.writer_idle_since_epoch_secs.clone() + inner.binding.writer_idle_since_epoch_secs.clone() } pub async fn writer_idle_since_for_writer_ids(&self, writer_ids: &[u64]) -> HashMap { let inner = self.inner.read().await; let mut out = HashMap::::with_capacity(writer_ids.len()); for writer_id in writer_ids { - if let Some(idle_since) = inner.writer_idle_since_epoch_secs.get(writer_id).copied() { + if let Some(idle_since) = inner.binding.writer_idle_since_epoch_secs.get(writer_id).copied() { out.insert(*writer_id, idle_since); } } @@ -369,10 +387,10 @@ impl ConnRegistry { let mut bound_clients_by_writer = HashMap::::new(); let mut active_sessions_by_target_dc = HashMap::::new(); - for (writer_id, conn_ids) in &inner.conns_for_writer { + for (writer_id, conn_ids) in &inner.binding.conns_for_writer { bound_clients_by_writer.insert(*writer_id, conn_ids.len()); } - for conn_meta in inner.meta.values() { + for conn_meta in inner.binding.meta.values() { if conn_meta.target_dc == 0 { continue; } @@ -392,14 +410,15 @@ impl ConnRegistry { // ROUTING IS THE SOURCE OF TRUTH: // stale bindings are ignored and lazily cleaned when routing no longer // contains the connection. - if !inner.map.contains_key(&conn_id) { - inner.meta.remove(&conn_id); - if let Some(stale_writer_id) = inner.writer_for_conn.remove(&conn_id) - && let Some(conns) = inner.conns_for_writer.get_mut(&stale_writer_id) + if !inner.routing.map.contains_key(&conn_id) { + inner.binding.meta.remove(&conn_id); + if let Some(stale_writer_id) = inner.binding.writer_for_conn.remove(&conn_id) + && let Some(conns) = inner.binding.conns_for_writer.get_mut(&stale_writer_id) { conns.remove(&conn_id); if conns.is_empty() { inner + .binding .writer_idle_since_epoch_secs .insert(stale_writer_id, Self::now_epoch_secs()); } @@ -407,14 +426,15 @@ impl ConnRegistry { return None; } - let writer_id = inner.writer_for_conn.get(&conn_id).copied()?; - let Some(writer) = inner.writers.get(&writer_id).cloned() else { - inner.writer_for_conn.remove(&conn_id); - inner.meta.remove(&conn_id); - if let Some(conns) = inner.conns_for_writer.get_mut(&writer_id) { + let writer_id = inner.binding.writer_for_conn.get(&conn_id).copied()?; + let Some(writer) = inner.binding.writers.get(&writer_id).cloned() else { + inner.binding.writer_for_conn.remove(&conn_id); + inner.binding.meta.remove(&conn_id); + if let Some(conns) = inner.binding.conns_for_writer.get_mut(&writer_id) { conns.remove(&conn_id); if conns.is_empty() { inner + .binding .writer_idle_since_epoch_secs .insert(writer_id, Self::now_epoch_secs()); } @@ -429,15 +449,16 @@ impl ConnRegistry { pub async fn active_conn_ids(&self) -> Vec { let inner = self.inner.read().await; - inner.writer_for_conn.keys().copied().collect() + inner.binding.writer_for_conn.keys().copied().collect() } pub async fn writer_lost(&self, writer_id: u64) -> Vec { let mut inner = self.inner.write().await; - inner.writers.remove(&writer_id); - inner.last_meta_for_writer.remove(&writer_id); - inner.writer_idle_since_epoch_secs.remove(&writer_id); + inner.binding.writers.remove(&writer_id); + inner.binding.last_meta_for_writer.remove(&writer_id); + inner.binding.writer_idle_since_epoch_secs.remove(&writer_id); let conns = inner + .binding .conns_for_writer .remove(&writer_id) .unwrap_or_default() @@ -446,11 +467,11 @@ impl ConnRegistry { let mut out = Vec::new(); for conn_id in conns { - if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) { + if inner.binding.writer_for_conn.get(&conn_id).copied() != Some(writer_id) { continue; } - inner.writer_for_conn.remove(&conn_id); - if let Some(m) = inner.meta.get(&conn_id) { + inner.binding.writer_for_conn.remove(&conn_id); + if let Some(m) = inner.binding.meta.get(&conn_id) { out.push(BoundConn { conn_id, meta: m.clone(), @@ -463,12 +484,13 @@ impl ConnRegistry { #[allow(dead_code)] pub async fn get_meta(&self, conn_id: u64) -> Option { let inner = self.inner.read().await; - inner.meta.get(&conn_id).cloned() + inner.binding.meta.get(&conn_id).cloned() } pub async fn is_writer_empty(&self, writer_id: u64) -> bool { let inner = self.inner.read().await; inner + .binding .conns_for_writer .get(&writer_id) .map(|s| s.is_empty()) @@ -478,7 +500,7 @@ impl ConnRegistry { #[allow(dead_code)] pub async fn unregister_writer_if_empty(&self, writer_id: u64) -> bool { let mut inner = self.inner.write().await; - let Some(conn_ids) = inner.conns_for_writer.get(&writer_id) else { + let Some(conn_ids) = inner.binding.conns_for_writer.get(&writer_id) else { // Writer is already absent from the registry. return true; }; @@ -486,10 +508,10 @@ impl ConnRegistry { return false; } - inner.writers.remove(&writer_id); - inner.last_meta_for_writer.remove(&writer_id); - inner.writer_idle_since_epoch_secs.remove(&writer_id); - inner.conns_for_writer.remove(&writer_id); + inner.binding.writers.remove(&writer_id); + inner.binding.last_meta_for_writer.remove(&writer_id); + inner.binding.writer_idle_since_epoch_secs.remove(&writer_id); + inner.binding.conns_for_writer.remove(&writer_id); true } @@ -498,7 +520,7 @@ impl ConnRegistry { let inner = self.inner.read().await; let mut out = HashSet::::with_capacity(writer_ids.len()); for writer_id in writer_ids { - if let Some(conns) = inner.conns_for_writer.get(writer_id) + if let Some(conns) = inner.binding.conns_for_writer.get(writer_id) && !conns.is_empty() { out.insert(*writer_id);