This commit is contained in:
AndreyAkifev 2026-02-16 11:12:30 +07:00
parent b9b8ec529c
commit a1d200d1a1
1 changed files with 6 additions and 4 deletions

View File

@ -9,7 +9,7 @@ use std::sync::Arc;
use tokio::sync::Mutex;
pub struct ConnRegistry {
map: RwLock<HashMap<u64, mpsc::Sender<MeResponse>>>,
map: RwLock<HashMap<u64, mpsc::UnboundedSender<MeResponse>>>,
writers: RwLock<HashMap<u64, Arc<Mutex<RpcWriter>>>>,
next_id: AtomicU64,
}
@ -25,9 +25,11 @@ impl ConnRegistry {
}
}
pub async fn register(&self) -> (u64, mpsc::Receiver<MeResponse>) {
pub async fn register(&self) -> (u64, mpsc::UnboundedReceiver<MeResponse>) {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = mpsc::channel(256);
// Unbounded per-connection queue prevents reader-loop HOL blocking on
// slow clients: routing stays non-blocking and preserves message order.
let (tx, rx) = mpsc::unbounded_channel();
self.map.write().await.insert(id, tx);
(id, rx)
}
@ -40,7 +42,7 @@ impl ConnRegistry {
pub async fn route(&self, id: u64, resp: MeResponse) -> bool {
let m = self.map.read().await;
if let Some(tx) = m.get(&id) {
tx.send(resp).await.is_ok()
tx.send(resp).is_ok()
} else {
false
}