mirror of
https://github.com/telemt/telemt.git
synced 2026-05-22 19:51:43 +03:00
Fix hot-path replay bounds and ME control allocations
Signed-off-by: Alexey <247128645+axkurcom@users.noreply.github.com>
This commit is contained in:
@@ -392,12 +392,18 @@ impl UserIpTracker {
|
|||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
let (mut active_ips, mut recent_ips) = self.active_and_recent_write().await;
|
let (mut active_ips, mut recent_ips) = self.active_and_recent_write().await;
|
||||||
let user_active = active_ips
|
if !active_ips.contains_key(username) {
|
||||||
.entry(username.to_string())
|
active_ips.insert(username.to_string(), HashMap::new());
|
||||||
.or_insert_with(HashMap::new);
|
}
|
||||||
let user_recent = recent_ips
|
if !recent_ips.contains_key(username) {
|
||||||
.entry(username.to_string())
|
recent_ips.insert(username.to_string(), HashMap::new());
|
||||||
.or_insert_with(HashMap::new);
|
}
|
||||||
|
let Some(user_active) = active_ips.get_mut(username) else {
|
||||||
|
return Err(format!("IP tracker active entry unavailable for user '{username}'"));
|
||||||
|
};
|
||||||
|
let Some(user_recent) = recent_ips.get_mut(username) else {
|
||||||
|
return Err(format!("IP tracker recent entry unavailable for user '{username}'"));
|
||||||
|
};
|
||||||
let pruned_recent_entries = Self::prune_recent(user_recent, now, window);
|
let pruned_recent_entries = Self::prune_recent(user_recent, now, window);
|
||||||
Self::decrement_counter(&self.recent_entry_count, pruned_recent_entries);
|
Self::decrement_counter(&self.recent_entry_count, pruned_recent_entries);
|
||||||
let recent_contains_ip = user_recent.contains_key(&ip);
|
let recent_contains_ip = user_recent.contains_key(&ip);
|
||||||
|
|||||||
@@ -515,8 +515,13 @@ fn exclusive_mask_target_for_sni<'a>(
|
|||||||
config: &'a ProxyConfig,
|
config: &'a ProxyConfig,
|
||||||
sni: &str,
|
sni: &str,
|
||||||
) -> Option<MaskTcpTarget<'a>> {
|
) -> Option<MaskTcpTarget<'a>> {
|
||||||
for (domain, target) in &config.censorship.exclusive_mask {
|
if let Some(target) = config.censorship.exclusive_mask.get(sni) {
|
||||||
if domain.eq_ignore_ascii_case(sni) {
|
return parse_exclusive_mask_target(target);
|
||||||
|
}
|
||||||
|
|
||||||
|
if sni.bytes().any(|byte| byte.is_ascii_uppercase()) {
|
||||||
|
let normalized_sni = sni.to_ascii_lowercase();
|
||||||
|
if let Some(target) = config.censorship.exclusive_mask.get(&normalized_sni) {
|
||||||
return parse_exclusive_mask_target(target);
|
return parse_exclusive_mask_target(target);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -529,17 +534,27 @@ fn mask_host_for_initial_data<'a>(config: &'a ProxyConfig, initial_data: &[u8])
|
|||||||
mask_tcp_target_for_initial_data(config, initial_data).host
|
mask_tcp_target_for_initial_data(config, initial_data).host
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
fn mask_tcp_target_for_initial_data<'a>(
|
fn mask_tcp_target_for_initial_data<'a>(
|
||||||
config: &'a ProxyConfig,
|
config: &'a ProxyConfig,
|
||||||
initial_data: &[u8],
|
initial_data: &[u8],
|
||||||
) -> MaskTcpTarget<'a> {
|
) -> MaskTcpTarget<'a> {
|
||||||
if let Some(target) = tls::extract_sni_from_client_hello(initial_data)
|
let sni = tls::extract_sni_from_client_hello(initial_data);
|
||||||
|
if let Some(target) = sni
|
||||||
.as_deref()
|
.as_deref()
|
||||||
.and_then(|sni| exclusive_mask_target_for_sni(config, sni))
|
.and_then(|sni| exclusive_mask_target_for_sni(config, sni))
|
||||||
{
|
{
|
||||||
return target;
|
return target;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
default_mask_tcp_target_for_initial_data(config, initial_data, sni.as_deref())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_mask_tcp_target_for_initial_data<'a>(
|
||||||
|
config: &'a ProxyConfig,
|
||||||
|
initial_data: &[u8],
|
||||||
|
sni: Option<&str>,
|
||||||
|
) -> MaskTcpTarget<'a> {
|
||||||
let configured_mask_host = config
|
let configured_mask_host = config
|
||||||
.censorship
|
.censorship
|
||||||
.mask_host
|
.mask_host
|
||||||
@@ -553,8 +568,13 @@ fn mask_tcp_target_for_initial_data<'a>(
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
let host = tls::extract_sni_from_client_hello(initial_data)
|
let extracted_sni = if sni.is_none() {
|
||||||
.as_deref()
|
tls::extract_sni_from_client_hello(initial_data)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
let host = sni
|
||||||
|
.or(extracted_sni.as_deref())
|
||||||
.and_then(|sni| matching_tls_domain_for_sni(config, sni))
|
.and_then(|sni| matching_tls_domain_for_sni(config, sni))
|
||||||
.unwrap_or(configured_mask_host);
|
.unwrap_or(configured_mask_host);
|
||||||
MaskTcpTarget {
|
MaskTcpTarget {
|
||||||
@@ -858,7 +878,8 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let exclusive_tcp_target = tls::extract_sni_from_client_hello(initial_data)
|
let client_sni = tls::extract_sni_from_client_hello(initial_data);
|
||||||
|
let exclusive_tcp_target = client_sni
|
||||||
.as_deref()
|
.as_deref()
|
||||||
.and_then(|sni| exclusive_mask_target_for_sni(config, sni));
|
.and_then(|sni| exclusive_mask_target_for_sni(config, sni));
|
||||||
|
|
||||||
@@ -943,8 +964,9 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mask_target = exclusive_tcp_target
|
let mask_target = exclusive_tcp_target.unwrap_or_else(|| {
|
||||||
.unwrap_or_else(|| mask_tcp_target_for_initial_data(config, initial_data));
|
default_mask_tcp_target_for_initial_data(config, initial_data, client_sni.as_deref())
|
||||||
|
});
|
||||||
let mask_host = mask_target.host;
|
let mask_host = mask_target.host;
|
||||||
let mask_port = mask_target.port;
|
let mask_port = mask_target.port;
|
||||||
|
|
||||||
|
|||||||
@@ -2678,9 +2678,10 @@ struct ReplayEntry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct ReplayShard {
|
struct ReplayShard {
|
||||||
cache: LruCache<Box<[u8]>, ReplayEntry>,
|
cache: LruCache<Arc<[u8]>, ReplayEntry>,
|
||||||
queue: VecDeque<(Instant, Box<[u8]>, u64)>,
|
queue: VecDeque<(Instant, Arc<[u8]>, u64)>,
|
||||||
seq_counter: u64,
|
seq_counter: u64,
|
||||||
|
capacity: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ReplayShard {
|
impl ReplayShard {
|
||||||
@@ -2689,6 +2690,7 @@ impl ReplayShard {
|
|||||||
cache: LruCache::new(cap),
|
cache: LruCache::new(cap),
|
||||||
queue: VecDeque::with_capacity(cap.get()),
|
queue: VecDeque::with_capacity(cap.get()),
|
||||||
seq_counter: 0,
|
seq_counter: 0,
|
||||||
|
capacity: cap.get(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2709,15 +2711,19 @@ impl ReplayShard {
|
|||||||
if *ts >= cutoff {
|
if *ts >= cutoff {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
let (_, key, queue_seq) = self.queue.pop_front().unwrap();
|
self.evict_queue_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Use key.as_ref() to get &[u8] — avoids Borrow<Q> ambiguity
|
fn evict_queue_front(&mut self) {
|
||||||
// between Borrow<[u8]> and Borrow<Box<[u8]>>
|
let Some((_, key, queue_seq)) = self.queue.pop_front() else {
|
||||||
if let Some(entry) = self.cache.peek(key.as_ref())
|
return;
|
||||||
&& entry.seq == queue_seq
|
};
|
||||||
{
|
|
||||||
self.cache.pop(key.as_ref());
|
if let Some(entry) = self.cache.peek(key.as_ref())
|
||||||
}
|
&& entry.seq == queue_seq
|
||||||
|
{
|
||||||
|
self.cache.pop(key.as_ref());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2738,13 +2744,16 @@ impl ReplayShard {
|
|||||||
if self.cache.peek(key).is_some() {
|
if self.cache.peek(key).is_some() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
while self.queue.len() >= self.capacity {
|
||||||
|
self.evict_queue_front();
|
||||||
|
}
|
||||||
|
|
||||||
let seq = self.next_seq();
|
let seq = self.next_seq();
|
||||||
let boxed_key: Box<[u8]> = key.into();
|
let shared_key: Arc<[u8]> = Arc::from(key);
|
||||||
|
|
||||||
self.cache
|
self.cache
|
||||||
.put(boxed_key.clone(), ReplayEntry { seen_at: now, seq });
|
.put(Arc::clone(&shared_key), ReplayEntry { seen_at: now, seq });
|
||||||
self.queue.push_back((now, boxed_key, seq));
|
self.queue.push_back((now, shared_key, seq));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn len(&self) -> usize {
|
fn len(&self) -> usize {
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ use crate::protocol::constants::*;
|
|||||||
pub(crate) enum WriterCommand {
|
pub(crate) enum WriterCommand {
|
||||||
Data(Bytes),
|
Data(Bytes),
|
||||||
DataAndFlush(Bytes),
|
DataAndFlush(Bytes),
|
||||||
|
ControlAndFlush([u8; 12]),
|
||||||
Close,
|
Close,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -42,6 +43,13 @@ pub(crate) fn rpc_crc(mode: RpcChecksumMode, data: &[u8]) -> u32 {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn build_control_payload(tag: u32, value: u64) -> [u8; 12] {
|
||||||
|
let mut payload = [0u8; 12];
|
||||||
|
payload[..4].copy_from_slice(&tag.to_le_bytes());
|
||||||
|
payload[4..].copy_from_slice(&value.to_le_bytes());
|
||||||
|
payload
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8], crc_mode: RpcChecksumMode) -> Vec<u8> {
|
pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8], crc_mode: RpcChecksumMode) -> Vec<u8> {
|
||||||
let total_len = (4 + 4 + payload.len() + 4) as u32;
|
let total_len = (4 + 4 + payload.len() + 4) as u32;
|
||||||
let mut frame = Vec::with_capacity(total_len as usize);
|
let mut frame = Vec::with_capacity(total_len as usize);
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ use std::sync::Arc;
|
|||||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use bytes::Bytes;
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use rand::RngExt;
|
use rand::RngExt;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
@@ -17,7 +16,7 @@ use crate::crypto::SecureRandom;
|
|||||||
use crate::error::{ProxyError, Result};
|
use crate::error::{ProxyError, Result};
|
||||||
use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32};
|
use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32};
|
||||||
|
|
||||||
use super::codec::{RpcWriter, WriterCommand};
|
use super::codec::{RpcWriter, WriterCommand, build_control_payload};
|
||||||
use super::pool::{MePool, MeWriter, WriterContour};
|
use super::pool::{MePool, MeWriter, WriterContour};
|
||||||
use super::reader::reader_loop;
|
use super::reader::reader_loop;
|
||||||
use super::wire::build_proxy_req_payload;
|
use super::wire::build_proxy_req_payload;
|
||||||
@@ -61,6 +60,9 @@ async fn writer_command_loop(
|
|||||||
Some(WriterCommand::DataAndFlush(payload)) => {
|
Some(WriterCommand::DataAndFlush(payload)) => {
|
||||||
rpc_writer.send_and_flush(&payload).await?;
|
rpc_writer.send_and_flush(&payload).await?;
|
||||||
}
|
}
|
||||||
|
Some(WriterCommand::ControlAndFlush(payload)) => {
|
||||||
|
rpc_writer.send_and_flush(&payload).await?;
|
||||||
|
}
|
||||||
Some(WriterCommand::Close) | None => return Ok(()),
|
Some(WriterCommand::Close) | None => return Ok(()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -130,9 +132,7 @@ async fn ping_loop(
|
|||||||
_ = tokio::time::sleep(wait) => {}
|
_ = tokio::time::sleep(wait) => {}
|
||||||
}
|
}
|
||||||
let sent_id = ping_id;
|
let sent_id = ping_id;
|
||||||
let mut p = Vec::with_capacity(12);
|
let payload = build_control_payload(RPC_PING_U32, sent_id as u64);
|
||||||
p.extend_from_slice(&RPC_PING_U32.to_le_bytes());
|
|
||||||
p.extend_from_slice(&sent_id.to_le_bytes());
|
|
||||||
{
|
{
|
||||||
let mut tracker = ping_tracker_ping.lock().await;
|
let mut tracker = ping_tracker_ping.lock().await;
|
||||||
cleanup_tick = cleanup_tick.wrapping_add(1);
|
cleanup_tick = cleanup_tick.wrapping_add(1);
|
||||||
@@ -149,7 +149,7 @@ async fn ping_loop(
|
|||||||
ping_id = ping_id.wrapping_add(1);
|
ping_id = ping_id.wrapping_add(1);
|
||||||
stats_ping.increment_me_keepalive_sent();
|
stats_ping.increment_me_keepalive_sent();
|
||||||
if tx_ping
|
if tx_ping
|
||||||
.send(WriterCommand::DataAndFlush(Bytes::from(p)))
|
.send(WriterCommand::ControlAndFlush(payload))
|
||||||
.await
|
.await
|
||||||
.is_err()
|
.is_err()
|
||||||
{
|
{
|
||||||
@@ -253,12 +253,10 @@ async fn rpc_proxy_req_signal_loop(
|
|||||||
stats_signal.increment_me_rpc_proxy_req_signal_response_total();
|
stats_signal.increment_me_rpc_proxy_req_signal_response_total();
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut close_payload = Vec::with_capacity(12);
|
let close_payload = build_control_payload(RPC_CLOSE_EXT_U32, conn_id);
|
||||||
close_payload.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
|
|
||||||
close_payload.extend_from_slice(&conn_id.to_le_bytes());
|
|
||||||
|
|
||||||
if tx_signal
|
if tx_signal
|
||||||
.send(WriterCommand::DataAndFlush(Bytes::from(close_payload)))
|
.send(WriterCommand::ControlAndFlush(close_payload))
|
||||||
.await
|
.await
|
||||||
.is_err()
|
.is_err()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ use crate::error::{ProxyError, Result};
|
|||||||
use crate::protocol::constants::*;
|
use crate::protocol::constants::*;
|
||||||
use crate::stats::Stats;
|
use crate::stats::Stats;
|
||||||
|
|
||||||
use super::codec::{RpcChecksumMode, WriterCommand, rpc_crc};
|
use super::codec::{RpcChecksumMode, WriterCommand, build_control_payload, rpc_crc};
|
||||||
use super::fairness::{
|
use super::fairness::{
|
||||||
AdmissionDecision, DispatchAction, DispatchFeedback, PressureState, SchedulerDecision,
|
AdmissionDecision, DispatchAction, DispatchFeedback, PressureState, SchedulerDecision,
|
||||||
WorkerFairnessConfig, WorkerFairnessSnapshot, WorkerFairnessState,
|
WorkerFairnessConfig, WorkerFairnessSnapshot, WorkerFairnessState,
|
||||||
@@ -464,10 +464,8 @@ pub(crate) async fn reader_loop(
|
|||||||
} else if pt == RPC_PING_U32 && body.len() >= 8 {
|
} else if pt == RPC_PING_U32 && body.len() >= 8 {
|
||||||
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
|
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
|
||||||
trace!(ping_id, "RPC_PING -> RPC_PONG");
|
trace!(ping_id, "RPC_PING -> RPC_PONG");
|
||||||
let mut pong = Vec::with_capacity(12);
|
let pong = build_control_payload(RPC_PONG_U32, ping_id as u64);
|
||||||
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
|
match tx.try_send(WriterCommand::ControlAndFlush(pong)) {
|
||||||
pong.extend_from_slice(&ping_id.to_le_bytes());
|
|
||||||
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(pong))) {
|
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
Err(TrySendError::Full(_)) => {
|
Err(TrySendError::Full(_)) => {
|
||||||
debug!(ping_id, "PONG dropped: writer command channel is full");
|
debug!(ping_id, "PONG dropped: writer command channel is full");
|
||||||
@@ -667,10 +665,8 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) {
|
async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) {
|
||||||
let mut p = Vec::with_capacity(12);
|
let payload = build_control_payload(RPC_CLOSE_CONN_U32, conn_id);
|
||||||
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
|
match tx.try_send(WriterCommand::ControlAndFlush(payload)) {
|
||||||
p.extend_from_slice(&conn_id.to_le_bytes());
|
|
||||||
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
|
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
Err(TrySendError::Full(_)) => {
|
Err(TrySendError::Full(_)) => {
|
||||||
debug!(
|
debug!(
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ use std::sync::Arc;
|
|||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use bytes::Bytes;
|
|
||||||
use tokio::sync::mpsc::error::TrySendError;
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
use tracing::{debug, warn};
|
use tracing::{debug, warn};
|
||||||
|
|
||||||
@@ -17,7 +16,7 @@ use crate::network::IpFamily;
|
|||||||
use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32};
|
use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32};
|
||||||
|
|
||||||
use super::MePool;
|
use super::MePool;
|
||||||
use super::codec::WriterCommand;
|
use super::codec::{WriterCommand, build_control_payload};
|
||||||
use super::pool::WriterContour;
|
use super::pool::WriterContour;
|
||||||
use super::registry::ConnMeta;
|
use super::registry::ConnMeta;
|
||||||
use super::wire::build_proxy_req_payload;
|
use super::wire::build_proxy_req_payload;
|
||||||
@@ -735,11 +734,9 @@ impl MePool {
|
|||||||
|
|
||||||
pub async fn send_close(self: &Arc<Self>, conn_id: u64) -> Result<()> {
|
pub async fn send_close(self: &Arc<Self>, conn_id: u64) -> Result<()> {
|
||||||
if let Some(w) = self.registry.get_writer(conn_id).await {
|
if let Some(w) = self.registry.get_writer(conn_id).await {
|
||||||
let mut p = Vec::with_capacity(12);
|
let payload = build_control_payload(RPC_CLOSE_EXT_U32, conn_id);
|
||||||
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
|
|
||||||
p.extend_from_slice(&conn_id.to_le_bytes());
|
|
||||||
if w.tx
|
if w.tx
|
||||||
.send(WriterCommand::DataAndFlush(Bytes::from(p)))
|
.send(WriterCommand::ControlAndFlush(payload))
|
||||||
.await
|
.await
|
||||||
.is_err()
|
.is_err()
|
||||||
{
|
{
|
||||||
@@ -756,10 +753,8 @@ impl MePool {
|
|||||||
|
|
||||||
pub async fn send_close_conn(self: &Arc<Self>, conn_id: u64) -> Result<()> {
|
pub async fn send_close_conn(self: &Arc<Self>, conn_id: u64) -> Result<()> {
|
||||||
if let Some(w) = self.registry.get_writer(conn_id).await {
|
if let Some(w) = self.registry.get_writer(conn_id).await {
|
||||||
let mut p = Vec::with_capacity(12);
|
let payload = build_control_payload(RPC_CLOSE_CONN_U32, conn_id);
|
||||||
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
|
match w.tx.try_send(WriterCommand::ControlAndFlush(payload)) {
|
||||||
p.extend_from_slice(&conn_id.to_le_bytes());
|
|
||||||
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
|
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
Err(TrySendError::Full(cmd)) => {
|
Err(TrySendError::Full(cmd)) => {
|
||||||
let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await;
|
let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await;
|
||||||
|
|||||||
@@ -165,6 +165,7 @@ async fn recv_data_count(rx: &mut mpsc::Receiver<WriterCommand>, budget: Duratio
|
|||||||
match tokio::time::timeout(remaining.min(Duration::from_millis(10)), rx.recv()).await {
|
match tokio::time::timeout(remaining.min(Duration::from_millis(10)), rx.recv()).await {
|
||||||
Ok(Some(WriterCommand::Data(_))) => data_count += 1,
|
Ok(Some(WriterCommand::Data(_))) => data_count += 1,
|
||||||
Ok(Some(WriterCommand::DataAndFlush(_))) => data_count += 1,
|
Ok(Some(WriterCommand::DataAndFlush(_))) => data_count += 1,
|
||||||
|
Ok(Some(WriterCommand::ControlAndFlush(_))) => data_count += 1,
|
||||||
Ok(Some(WriterCommand::Close)) => {}
|
Ok(Some(WriterCommand::Close)) => {}
|
||||||
Ok(None) => break,
|
Ok(None) => break,
|
||||||
Err(_) => break,
|
Err(_) => break,
|
||||||
|
|||||||
Reference in New Issue
Block a user