From c4c91863f0f49b92c84be2bf81fde3a145f5e63a Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Mon, 23 Feb 2026 03:20:13 +0300 Subject: [PATCH 1/4] Middle-End tuning Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/pool.rs | 2 +- src/transport/middle_proxy/send.rs | 12 +++++- src/transport/upstream.rs | 69 ++++++++++++++++++++++++++---- 3 files changed, 72 insertions(+), 11 deletions(-) diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 062db67..851c0b7 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -452,7 +452,7 @@ impl MePool { }; self.writers.write().await.push(writer.clone()); self.conn_count.fetch_add(1, Ordering::Relaxed); - self.writer_available.notify_waiters(); + self.writer_available.notify_one(); let reg = self.registry.clone(); let writers_arc = self.writers_arc(); diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 627906d..2ebafea 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -62,6 +62,8 @@ impl MePool { let mut writers_snapshot = { let ws = self.writers.read().await; if ws.is_empty() { + // Create waiter before recovery attempts so notify_one permits are not missed. + let waiter = self.writer_available.notified(); drop(ws); for family in self.family_order() { let map = match family { @@ -72,13 +74,19 @@ impl MePool { for (ip, port) in addrs { let addr = SocketAddr::new(*ip, *port); if self.connect_one(addr, self.rng.as_ref()).await.is_ok() { - self.writer_available.notify_waiters(); + self.writer_available.notify_one(); break; } } } } - if tokio::time::timeout(Duration::from_secs(3), self.writer_available.notified()).await.is_err() { + if !self.writers.read().await.is_empty() { + continue; + } + if tokio::time::timeout(Duration::from_secs(3), waiter).await.is_err() { + if !self.writers.read().await.is_empty() { + continue; + } return Err(ProxyError::Proxy("All ME connections dead (waited 3s)".into())); } continue; diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index 0f458f2..6dcc36f 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -394,6 +394,7 @@ impl UpstreamManager { Ok(stream) }, UpstreamType::Socks4 { address, interface, user_id } => { + let connect_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS); // Try to parse as SocketAddr first (IP:port), otherwise treat as hostname:port let mut stream = if let Ok(proxy_addr) = address.parse::() { // IP:port format - use socket with optional interface binding @@ -416,7 +417,15 @@ impl UpstreamManager { let std_stream: std::net::TcpStream = socket.into(); let stream = TcpStream::from_std(std_stream)?; - stream.writable().await?; + match tokio::time::timeout(connect_timeout, stream.writable()).await { + Ok(Ok(())) => {} + Ok(Err(e)) => return Err(ProxyError::Io(e)), + Err(_) => { + return Err(ProxyError::ConnectionTimeout { + addr: proxy_addr.to_string(), + }); + } + } if let Some(e) = stream.take_error()? { return Err(ProxyError::Io(e)); } @@ -427,8 +436,15 @@ impl UpstreamManager { if interface.is_some() { warn!("SOCKS4 interface binding is not supported for hostname addresses, ignoring"); } - TcpStream::connect(address).await - .map_err(ProxyError::Io)? + match tokio::time::timeout(connect_timeout, TcpStream::connect(address)).await { + Ok(Ok(stream)) => stream, + Ok(Err(e)) => return Err(ProxyError::Io(e)), + Err(_) => { + return Err(ProxyError::ConnectionTimeout { + addr: address.clone(), + }); + } + } }; // replace socks user_id with config.selected_scope, if set @@ -436,10 +452,19 @@ impl UpstreamManager { .filter(|s| !s.is_empty()); let _user_id: Option<&str> = scope.or(user_id.as_deref()); - connect_socks4(&mut stream, target, _user_id).await?; + match tokio::time::timeout(connect_timeout, connect_socks4(&mut stream, target, _user_id)).await { + Ok(Ok(())) => {} + Ok(Err(e)) => return Err(e), + Err(_) => { + return Err(ProxyError::ConnectionTimeout { + addr: target.to_string(), + }); + } + } Ok(stream) }, UpstreamType::Socks5 { address, interface, username, password } => { + let connect_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS); // Try to parse as SocketAddr first (IP:port), otherwise treat as hostname:port let mut stream = if let Ok(proxy_addr) = address.parse::() { // IP:port format - use socket with optional interface binding @@ -462,7 +487,15 @@ impl UpstreamManager { let std_stream: std::net::TcpStream = socket.into(); let stream = TcpStream::from_std(std_stream)?; - stream.writable().await?; + match tokio::time::timeout(connect_timeout, stream.writable()).await { + Ok(Ok(())) => {} + Ok(Err(e)) => return Err(ProxyError::Io(e)), + Err(_) => { + return Err(ProxyError::ConnectionTimeout { + addr: proxy_addr.to_string(), + }); + } + } if let Some(e) = stream.take_error()? { return Err(ProxyError::Io(e)); } @@ -473,8 +506,15 @@ impl UpstreamManager { if interface.is_some() { warn!("SOCKS5 interface binding is not supported for hostname addresses, ignoring"); } - TcpStream::connect(address).await - .map_err(ProxyError::Io)? + match tokio::time::timeout(connect_timeout, TcpStream::connect(address)).await { + Ok(Ok(stream)) => stream, + Ok(Err(e)) => return Err(ProxyError::Io(e)), + Err(_) => { + return Err(ProxyError::ConnectionTimeout { + addr: address.clone(), + }); + } + } }; debug!(config = ?config, "Socks5 connection"); @@ -484,7 +524,20 @@ impl UpstreamManager { let _username: Option<&str> = scope.or(username.as_deref()); let _password: Option<&str> = scope.or(password.as_deref()); - connect_socks5(&mut stream, target, _username, _password).await?; + match tokio::time::timeout( + connect_timeout, + connect_socks5(&mut stream, target, _username, _password), + ) + .await + { + Ok(Ok(())) => {} + Ok(Err(e)) => return Err(e), + Err(_) => { + return Err(ProxyError::ConnectionTimeout { + addr: target.to_string(), + }); + } + } Ok(stream) }, } From 489521782872ec6e53dfaac9d7f20fb7ce7078a6 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Mon, 23 Feb 2026 03:32:06 +0300 Subject: [PATCH 2/4] Bounded backpressure + Semaphore Globalgate + Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/main.rs | 20 +++++++++++++++++- src/stats/mod.rs | 17 ++++++++++++++- src/transport/middle_proxy/registry.rs | 29 +++++++++++++++++++------- 3 files changed, 56 insertions(+), 10 deletions(-) diff --git a/src/main.rs b/src/main.rs index 0f3757b..61debb9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -265,7 +265,7 @@ async fn main() -> std::result::Result<(), Box> { } // Connection concurrency limit - let _max_connections = Arc::new(Semaphore::new(10_000)); + let max_connections = Arc::new(Semaphore::new(10_000)); if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me { warn!("No usable IP family for Middle Proxy detected; falling back to direct DC"); @@ -844,6 +844,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); + let max_connections_unix = max_connections.clone(); tokio::spawn(async move { let unix_conn_counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)); @@ -851,6 +852,13 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai loop { match unix_listener.accept().await { Ok((stream, _)) => { + let permit = match max_connections_unix.clone().acquire_owned().await { + Ok(permit) => permit, + Err(_) => { + error!("Connection limiter is closed"); + break; + } + }; let conn_id = unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let fake_peer = SocketAddr::from(([127, 0, 0, 1], (conn_id % 65535) as u16)); @@ -866,6 +874,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai let proxy_protocol_enabled = config.server.proxy_protocol; tokio::spawn(async move { + let _permit = permit; if let Err(e) = crate::proxy::client::handle_client_stream( stream, fake_peer, config, stats, upstream_manager, replay_checker, buffer_pool, rng, @@ -933,11 +942,19 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); + let max_connections_tcp = max_connections.clone(); tokio::spawn(async move { loop { match listener.accept().await { Ok((stream, peer_addr)) => { + let permit = match max_connections_tcp.clone().acquire_owned().await { + Ok(permit) => permit, + Err(_) => { + error!("Connection limiter is closed"); + break; + } + }; let config = config_rx.borrow_and_update().clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); @@ -950,6 +967,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai let proxy_protocol_enabled = listener_proxy_protocol; tokio::spawn(async move { + let _permit = permit; if let Err(e) = ClientHandler::new( stream, peer_addr, diff --git a/src/stats/mod.rs b/src/stats/mod.rs index e480ec6..307da6d 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -109,7 +109,22 @@ impl Stats { pub fn decrement_user_curr_connects(&self, user: &str) { if let Some(stats) = self.user_stats.get(user) { - stats.curr_connects.fetch_sub(1, Ordering::Relaxed); + let counter = &stats.curr_connects; + let mut current = counter.load(Ordering::Relaxed); + loop { + if current == 0 { + break; + } + match counter.compare_exchange_weak( + current, + current - 1, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(actual) => current = actual, + } + } } } diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 4b25e00..6a9250d 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -1,6 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; use tokio::sync::{mpsc, RwLock}; use tokio::sync::mpsc::error::TrySendError; @@ -9,6 +10,7 @@ use super::codec::WriterCommand; use super::MeResponse; const ROUTE_CHANNEL_CAPACITY: usize = 4096; +const ROUTE_BACKPRESSURE_TIMEOUT: Duration = Duration::from_millis(25); #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RouteResult { @@ -94,15 +96,26 @@ impl ConnRegistry { } pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult { - let inner = self.inner.read().await; - if let Some(tx) = inner.map.get(&id) { - match tx.try_send(resp) { - Ok(()) => RouteResult::Routed, - Err(TrySendError::Closed(_)) => RouteResult::ChannelClosed, - Err(TrySendError::Full(_)) => RouteResult::QueueFull, + let tx = { + let inner = self.inner.read().await; + inner.map.get(&id).cloned() + }; + + let Some(tx) = tx else { + return RouteResult::NoConn; + }; + + match tx.try_send(resp) { + Ok(()) => RouteResult::Routed, + Err(TrySendError::Closed(_)) => RouteResult::ChannelClosed, + Err(TrySendError::Full(resp)) => { + // Absorb short bursts without dropping/closing the session immediately. + match tokio::time::timeout(ROUTE_BACKPRESSURE_TIMEOUT, tx.send(resp)).await { + Ok(Ok(())) => RouteResult::Routed, + Ok(Err(_)) => RouteResult::ChannelClosed, + Err(_) => RouteResult::QueueFull, + } } - } else { - RouteResult::NoConn } } From ecad96374acd9159ff54ce4280d9b72a870247ad Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Mon, 23 Feb 2026 03:41:51 +0300 Subject: [PATCH 3/4] ME Pool tuning Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/pool.rs | 82 +++++++++++++----------------- 1 file changed, 35 insertions(+), 47 deletions(-) diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 851c0b7..8faeabf 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -415,7 +415,6 @@ impl MePool { let degraded = Arc::new(AtomicBool::new(false)); let draining = Arc::new(AtomicBool::new(false)); let (tx, mut rx) = mpsc::channel::(4096); - let tx_for_keepalive = tx.clone(); let mut rpc_writer = RpcWriter { writer: hs.wr, key: hs.write_key, @@ -461,7 +460,6 @@ impl MePool { let rtt_stats = self.rtt_stats.clone(); let stats_reader = self.stats.clone(); let stats_ping = self.stats.clone(); - let stats_keepalive = self.stats.clone(); let pool = Arc::downgrade(self); let cancel_ping = cancel.clone(); let tx_ping = tx.clone(); @@ -474,7 +472,6 @@ impl MePool { let keepalive_jitter = self.me_keepalive_jitter; let cancel_reader_token = cancel.clone(); let cancel_ping_token = cancel_ping.clone(); - let cancel_keepalive_token = cancel.clone(); tokio::spawn(async move { let res = reader_loop( @@ -513,15 +510,40 @@ impl MePool { let pool_ping = Arc::downgrade(self); tokio::spawn(async move { let mut ping_id: i64 = rand::random::(); - loop { + // Per-writer jittered start to avoid phase sync. + let startup_jitter = if keepalive_enabled { + let jitter_cap_ms = keepalive_interval.as_millis() / 2; + let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1); + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64)) + } else { let jitter = rand::rng() .random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS); let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64; + Duration::from_secs(wait) + }; + tokio::select! { + _ = cancel_ping_token.cancelled() => return, + _ = tokio::time::sleep(startup_jitter) => {} + } + loop { + let wait = if keepalive_enabled { + let jitter_cap_ms = keepalive_interval.as_millis() / 2; + let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1); + keepalive_interval + + Duration::from_millis( + rand::rng().random_range(0..=effective_jitter_ms as u64) + ) + } else { + let jitter = rand::rng() + .random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS); + let secs = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64; + Duration::from_secs(secs) + }; tokio::select! { _ = cancel_ping_token.cancelled() => { break; } - _ = tokio::time::sleep(Duration::from_secs(wait)) => {} + _ = tokio::time::sleep(wait) => {} } let sent_id = ping_id; let mut p = Vec::with_capacity(12); @@ -538,8 +560,10 @@ impl MePool { tracker.insert(sent_id, (std::time::Instant::now(), writer_id)); } ping_id = ping_id.wrapping_add(1); + stats_ping.increment_me_keepalive_sent(); if tx_ping.send(WriterCommand::DataAndFlush(p)).await.is_err() { - debug!("Active ME ping failed, removing dead writer"); + stats_ping.increment_me_keepalive_failed(); + debug!("ME ping failed, removing dead writer"); cancel_ping.cancel(); if let Some(pool) = pool_ping.upgrade() { if cleanup_for_ping @@ -554,46 +578,6 @@ impl MePool { } }); - if keepalive_enabled { - let tx_keepalive = tx_for_keepalive; - let cancel_keepalive = cancel_keepalive_token; - let ping_tracker_keepalive = ping_tracker.clone(); - tokio::spawn(async move { - // Per-writer jittered start to avoid phase sync. - let jitter_cap_ms = keepalive_interval.as_millis() / 2; - let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1); - let initial_jitter_ms = rand::rng().random_range(0..=effective_jitter_ms as u64); - tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await; - let mut ping_id: i64 = rand::random::(); - loop { - tokio::select! { - _ = cancel_keepalive.cancelled() => break, - _ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))) => {} - } - let sent_id = ping_id; - ping_id = ping_id.wrapping_add(1); - let mut p = Vec::with_capacity(12); - p.extend_from_slice(&RPC_PING_U32.to_le_bytes()); - p.extend_from_slice(&sent_id.to_le_bytes()); - { - let mut tracker = ping_tracker_keepalive.lock().await; - let before = tracker.len(); - tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120)); - let expired = before.saturating_sub(tracker.len()); - if expired > 0 { - stats_keepalive.increment_me_keepalive_timeout_by(expired as u64); - } - tracker.insert(sent_id, (std::time::Instant::now(), writer_id)); - } - stats_keepalive.increment_me_keepalive_sent(); - if tx_keepalive.send(WriterCommand::DataAndFlush(p)).await.is_err() { - stats_keepalive.increment_me_keepalive_failed(); - break; - } - } - }); - } - Ok(()) } @@ -630,15 +614,19 @@ impl MePool { } async fn remove_writer_only(&self, writer_id: u64) -> Vec { + let mut close_tx: Option> = None; { let mut ws = self.writers.write().await; if let Some(pos) = ws.iter().position(|w| w.id == writer_id) { let w = ws.remove(pos); w.cancel.cancel(); - let _ = w.tx.send(WriterCommand::Close).await; + close_tx = Some(w.tx.clone()); self.conn_count.fetch_sub(1, Ordering::Relaxed); } } + if let Some(tx) = close_tx { + let _ = tx.send(WriterCommand::Close).await; + } self.rtt_stats.lock().await.remove(&writer_id); self.registry.writer_lost(writer_id).await } From eaba926fe578b7465abbbfb93beffca047c6d07c Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Mon, 23 Feb 2026 03:52:37 +0300 Subject: [PATCH 4/4] ME Buffer reuse + Bytes Len over Full + Seq-no over Wrap-add Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/crypto/random.rs | 33 ++++++++++++++++++++--------- src/proxy/middle_relay.rs | 26 ++++++++++++++++++----- src/transport/middle_proxy/codec.rs | 2 +- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/src/crypto/random.rs b/src/crypto/random.rs index 99aa5f3..f3432e0 100644 --- a/src/crypto/random.rs +++ b/src/crypto/random.rs @@ -49,19 +49,32 @@ impl SecureRandom { } } - /// Generate random bytes - pub fn bytes(&self, len: usize) -> Vec { + /// Fill a caller-provided buffer with random bytes. + pub fn fill(&self, out: &mut [u8]) { let mut inner = self.inner.lock(); const CHUNK_SIZE: usize = 512; - - while inner.buffer.len() < len { - let mut chunk = vec![0u8; CHUNK_SIZE]; - inner.rng.fill_bytes(&mut chunk); - inner.cipher.apply(&mut chunk); - inner.buffer.extend_from_slice(&chunk); + + let mut written = 0usize; + while written < out.len() { + if inner.buffer.is_empty() { + let mut chunk = vec![0u8; CHUNK_SIZE]; + inner.rng.fill_bytes(&mut chunk); + inner.cipher.apply(&mut chunk); + inner.buffer.extend_from_slice(&chunk); + } + + let take = (out.len() - written).min(inner.buffer.len()); + out[written..written + take].copy_from_slice(&inner.buffer[..take]); + inner.buffer.drain(..take); + written += take; } - - inner.buffer.drain(..len).collect() + } + + /// Generate random bytes + pub fn bytes(&self, len: usize) -> Vec { + let mut out = vec![0u8; len]; + self.fill(&mut out); + out } /// Generate random number in range [0, max) diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 7b97049..3b98112 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -95,6 +95,7 @@ where let user_clone = user.clone(); let me_writer = tokio::spawn(async move { let mut writer = crypto_writer; + let mut frame_buf = Vec::with_capacity(16 * 1024); loop { tokio::select! { msg = me_rx_task.recv() => { @@ -102,7 +103,15 @@ where Some(MeResponse::Data { flags, data }) => { trace!(conn_id, bytes = data.len(), flags, "ME->C data"); stats_clone.add_user_octets_to(&user_clone, data.len() as u64); - write_client_payload(&mut writer, proto_tag, flags, &data, rng_clone.as_ref()).await?; + write_client_payload( + &mut writer, + proto_tag, + flags, + &data, + rng_clone.as_ref(), + &mut frame_buf, + ) + .await?; // Drain all immediately queued ME responses and flush once. while let Ok(next) = me_rx_task.try_recv() { @@ -116,6 +125,7 @@ where flags, &data, rng_clone.as_ref(), + &mut frame_buf, ).await?; } MeResponse::Ack(confirm) => { @@ -363,6 +373,7 @@ async fn write_client_payload( flags: u32, data: &[u8], rng: &SecureRandom, + frame_buf: &mut Vec, ) -> Result<()> where W: AsyncWrite + Unpin + Send + 'static, @@ -384,7 +395,8 @@ where if quickack { first |= 0x80; } - let mut frame_buf = Vec::with_capacity(1 + data.len()); + frame_buf.clear(); + frame_buf.reserve(1 + data.len()); frame_buf.push(first); frame_buf.extend_from_slice(data); client_writer @@ -397,7 +409,8 @@ where first |= 0x80; } let lw = (len_words as u32).to_le_bytes(); - let mut frame_buf = Vec::with_capacity(4 + data.len()); + frame_buf.clear(); + frame_buf.reserve(4 + data.len()); frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]); frame_buf.extend_from_slice(data); client_writer @@ -428,11 +441,14 @@ where len_val |= 0x8000_0000; } let total = 4 + data.len() + padding_len; - let mut frame_buf = Vec::with_capacity(total); + frame_buf.clear(); + frame_buf.reserve(total); frame_buf.extend_from_slice(&len_val.to_le_bytes()); frame_buf.extend_from_slice(data); if padding_len > 0 { - frame_buf.extend_from_slice(&rng.bytes(padding_len)); + let start = frame_buf.len(); + frame_buf.resize(start + padding_len, 0); + rng.fill(&mut frame_buf[start..]); } client_writer .write_all(&frame_buf) diff --git a/src/transport/middle_proxy/codec.rs b/src/transport/middle_proxy/codec.rs index dd9589e..6d83761 100644 --- a/src/transport/middle_proxy/codec.rs +++ b/src/transport/middle_proxy/codec.rs @@ -223,7 +223,7 @@ pub(crate) struct RpcWriter { impl RpcWriter { pub(crate) async fn send(&mut self, payload: &[u8]) -> Result<()> { let frame = build_rpc_frame(self.seq_no, payload, self.crc_mode); - self.seq_no += 1; + self.seq_no = self.seq_no.wrapping_add(1); let pad = (16 - (frame.len() % 16)) % 16; let mut buf = frame;