This commit is contained in:
Alexey
2026-04-25 14:35:35 +03:00
parent 2f2fe9d5d3
commit 37c916056a
2 changed files with 12 additions and 14 deletions
+10 -7
View File
@@ -8,8 +8,8 @@ use dashmap::DashMap;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{Mutex, Semaphore, mpsc};
use super::{MeResponse, RouteBytePermit};
use super::codec::WriterCommand;
use super::{MeResponse, RouteBytePermit};
const ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS: u64 = 25;
const ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS: u64 = 120;
@@ -129,7 +129,10 @@ impl ConnRegistry {
)
}
fn with_route_limits(route_channel_capacity: usize, route_byte_permits_per_conn: usize) -> Self {
fn with_route_limits(
route_channel_capacity: usize,
route_byte_permits_per_conn: usize,
) -> Self {
let start = rand::random::<u64>() | 1;
let route_channel_capacity = route_channel_capacity.max(1);
Self {
@@ -209,9 +212,10 @@ impl ConnRegistry {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = mpsc::channel(self.route_channel_capacity);
self.routing.map.insert(id, tx);
self.routing
.byte_budget
.insert(id, Arc::new(Semaphore::new(self.route_byte_permits_per_conn)));
self.routing.byte_budget.insert(
id,
Arc::new(Semaphore::new(self.route_byte_permits_per_conn)),
);
(id, rx)
}
@@ -287,8 +291,7 @@ impl ConnRegistry {
.map_err(|_| RouteResult::QueueFullHigh)?,
Some(timeout_ms) => {
let acquire = semaphore.acquire_many_owned(permits);
match tokio::time::timeout(Duration::from_millis(timeout_ms.max(1)), acquire)
.await
match tokio::time::timeout(Duration::from_millis(timeout_ms.max(1)), acquire).await
{
Ok(Ok(permit)) => permit,
Ok(Err(_)) => return Err(RouteResult::ChannelClosed),