From c02c7fbe43a77a72d6ffaf7a8aa4e9d3c26f1f80 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 20 May 2026 17:07:54 +0300 Subject: [PATCH] Reducing hot-path allocs + duplicate telemetry touchs Signed-off-by: Alexey <247128645+axkurcom@users.noreply.github.com> --- src/proxy/relay.rs | 8 ++--- src/stats/mod.rs | 24 +++++++++++++ src/transport/middle_proxy/codec.rs | 44 +++++++++++++++++------ src/transport/middle_proxy/pool_writer.rs | 1 + src/transport/middle_proxy/registry.rs | 23 ++++++++++++ src/transport/middle_proxy/send.rs | 23 ++++-------- 6 files changed, 90 insertions(+), 33 deletions(-) diff --git a/src/proxy/relay.rs b/src/proxy/relay.rs index 965dd66..eb5fceb 100644 --- a/src/proxy/relay.rs +++ b/src/proxy/relay.rs @@ -550,9 +550,7 @@ impl AsyncRead for StatsIo { this.counters.touch(Instant::now(), this.epoch); this.stats - .add_user_octets_from_handle(this.user_stats.as_ref(), n_to_charge); - this.stats - .increment_user_msgs_from_handle(this.user_stats.as_ref()); + .add_user_traffic_from_handle(this.user_stats.as_ref(), n_to_charge); if this.traffic_lease.is_some() { this.c2s_rate_debt_bytes = this.c2s_rate_debt_bytes.saturating_add(n_to_charge); @@ -718,9 +716,7 @@ impl AsyncWrite for StatsIo { this.counters.touch(Instant::now(), this.epoch); this.stats - .add_user_octets_to_handle(this.user_stats.as_ref(), n_to_charge); - this.stats - .increment_user_msgs_to_handle(this.user_stats.as_ref()); + .add_user_traffic_to_handle(this.user_stats.as_ref(), n_to_charge); if let (Some(limit), Some(remaining)) = (this.quota_limit, remaining_before) { if should_immediate_quota_check(remaining, n_to_charge) { diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 76d464d..50bc4e2 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -474,6 +474,30 @@ impl Stats { .fetch_add(bytes, Ordering::Relaxed); } + #[inline] + pub(crate) fn add_user_traffic_from_handle(&self, user_stats: &UserStats, bytes: u64) { + if !self.telemetry_user_enabled() { + return; + } + self.touch_user_stats(user_stats); + user_stats + .octets_from_client + .fetch_add(bytes, Ordering::Relaxed); + user_stats.msgs_from_client.fetch_add(1, Ordering::Relaxed); + } + + #[inline] + pub(crate) fn add_user_traffic_to_handle(&self, user_stats: &UserStats, bytes: u64) { + if !self.telemetry_user_enabled() { + return; + } + self.touch_user_stats(user_stats); + user_stats + .octets_to_client + .fetch_add(bytes, Ordering::Relaxed); + user_stats.msgs_to_client.fetch_add(1, Ordering::Relaxed); + } + #[inline] pub(crate) fn increment_user_msgs_from_handle(&self, user_stats: &UserStats) { if !self.telemetry_user_enabled() { diff --git a/src/transport/middle_proxy/codec.rs b/src/transport/middle_proxy/codec.rs index d820854..3037c52 100644 --- a/src/transport/middle_proxy/codec.rs +++ b/src/transport/middle_proxy/codec.rs @@ -5,6 +5,9 @@ use crate::crypto::{AesCbc, crc32, crc32c}; use crate::error::{ProxyError, Result}; use crate::protocol::constants::*; +const RPC_WRITER_FRAME_BUF_SHRINK_THRESHOLD: usize = 256 * 1024; +const RPC_WRITER_FRAME_BUF_RETAIN: usize = 64 * 1024; + /// Commands sent to dedicated writer tasks to avoid mutex contention on TCP writes. pub(crate) enum WriterCommand { Data(Bytes), @@ -43,6 +46,7 @@ pub(crate) fn rpc_crc(mode: RpcChecksumMode, data: &[u8]) -> u32 { } } +/// Builds a fixed-size control payload without heap allocation. pub(crate) fn build_control_payload(tag: u32, value: u64) -> [u8; 12] { let mut payload = [0u8; 12]; payload[..4].copy_from_slice(&tag.to_le_bytes()); @@ -51,14 +55,26 @@ pub(crate) fn build_control_payload(tag: u32, value: u64) -> [u8; 12] { } pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8], crc_mode: RpcChecksumMode) -> Vec { - let total_len = (4 + 4 + payload.len() + 4) as u32; - let mut frame = Vec::with_capacity(total_len as usize); + let mut frame = Vec::new(); + build_rpc_frame_into(&mut frame, seq_no, payload, crc_mode); + frame +} + +fn build_rpc_frame_into( + frame: &mut Vec, + seq_no: i32, + payload: &[u8], + crc_mode: RpcChecksumMode, +) { + let total_len = 4 + 4 + payload.len() + 4; + frame.clear(); + frame.reserve(total_len + 15); + let total_len = total_len as u32; frame.extend_from_slice(&total_len.to_le_bytes()); frame.extend_from_slice(&seq_no.to_le_bytes()); frame.extend_from_slice(payload); let c = rpc_crc(crc_mode, &frame); frame.extend_from_slice(&c.to_le_bytes()); - frame } pub(crate) async fn read_rpc_frame_plaintext( @@ -226,29 +242,35 @@ pub(crate) struct RpcWriter { pub(crate) iv: [u8; 16], pub(crate) seq_no: i32, pub(crate) crc_mode: RpcChecksumMode, + pub(crate) frame_buf: Vec, } impl RpcWriter { pub(crate) async fn send(&mut self, payload: &[u8]) -> Result<()> { - let frame = build_rpc_frame(self.seq_no, payload, self.crc_mode); + build_rpc_frame_into(&mut self.frame_buf, self.seq_no, payload, self.crc_mode); self.seq_no = self.seq_no.wrapping_add(1); - let pad = (16 - (frame.len() % 16)) % 16; - let mut buf = frame; + let pad = (16 - (self.frame_buf.len() % 16)) % 16; let pad_pattern: [u8; 4] = [0x04, 0x00, 0x00, 0x00]; for i in 0..pad { - buf.push(pad_pattern[i % 4]); + self.frame_buf.push(pad_pattern[i % 4]); } let cipher = AesCbc::new(self.key, self.iv); cipher - .encrypt_in_place(&mut buf) + .encrypt_in_place(&mut self.frame_buf) .map_err(|e| ProxyError::Crypto(format!("{e}")))?; - if buf.len() >= 16 { - self.iv.copy_from_slice(&buf[buf.len() - 16..]); + if self.frame_buf.len() >= 16 { + self.iv + .copy_from_slice(&self.frame_buf[self.frame_buf.len() - 16..]); } - self.writer.write_all(&buf).await.map_err(ProxyError::Io) + let write_result = self.writer.write_all(&self.frame_buf).await; + if self.frame_buf.capacity() > RPC_WRITER_FRAME_BUF_SHRINK_THRESHOLD { + self.frame_buf.clear(); + self.frame_buf.shrink_to(RPC_WRITER_FRAME_BUF_RETAIN); + } + write_result.map_err(ProxyError::Io) } pub(crate) async fn send_and_flush(&mut self, payload: &[u8]) -> Result<()> { diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 218be30..52ffa58 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -378,6 +378,7 @@ impl MePool { iv: hs.write_iv, seq_no: 0, crc_mode: hs.crc_mode, + frame_buf: Vec::new(), }; let writer = MeWriter { id: writer_id, diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index aca6f9c..8095da8 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -567,6 +567,29 @@ impl ConnRegistry { }) } + /// Returns the active writer and routing metadata from one hot-binding lookup. + pub async fn get_writer_with_meta(&self, conn_id: u64) -> Option<(ConnWriter, ConnMeta)> { + if !self.routing.map.contains_key(&conn_id) { + return None; + } + + let hot = self.hot_binding.map.get(&conn_id)?; + let writer_id = hot.writer_id; + let meta = hot.meta.clone(); + let writer = self + .writers + .map + .get(&writer_id) + .map(|entry| entry.value().clone())?; + Some(( + ConnWriter { + writer_id, + tx: writer, + }, + meta, + )) + } + pub async fn active_conn_ids(&self) -> Vec { let binding = self.binding.inner.lock().await; binding.writer_for_conn.keys().copied().collect() diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index a637948..8217588 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -46,12 +46,6 @@ impl MePool { tag_override: Option<&[u8]>, ) -> Result<()> { let tag = tag_override.or(self.proxy_tag.as_deref()); - let fallback_meta = ConnMeta { - target_dc, - client_addr, - our_addr, - proto_flags, - }; let build_routed_payload = |effective_our_addr: SocketAddr| { ( build_proxy_req_payload( @@ -90,16 +84,13 @@ impl MePool { let mut hybrid_wait_current = hybrid_wait_step; loop { - let current_meta = self - .registry - .get_meta(conn_id) - .await - .unwrap_or_else(|| fallback_meta.clone()); - let (current_payload, _) = build_routed_payload(current_meta.our_addr); - if let Some(current) = self.registry.get_writer(conn_id).await { + if let Some((current, current_meta)) = + self.registry.get_writer_with_meta(conn_id).await + { + let (current_payload, _) = build_routed_payload(current_meta.our_addr); match current .tx - .try_send(WriterCommand::Data(current_payload.clone())) + .try_send(WriterCommand::Data(current_payload)) { Ok(()) => { self.note_hybrid_route_success(); @@ -451,7 +442,7 @@ impl MePool { self.remove_writer_and_close_clients(w.id).await; continue; } - permit.send(WriterCommand::Data(payload.clone())); + permit.send(WriterCommand::Data(payload)); self.stats .increment_me_writer_pick_success_try_total(pick_mode); if w.generation < self.current_generation() { @@ -519,7 +510,7 @@ impl MePool { self.remove_writer_and_close_clients(w.id).await; continue; } - permit.send(WriterCommand::Data(payload.clone())); + permit.send(WriterCommand::Data(payload)); self.stats .increment_me_writer_pick_success_fallback_total(pick_mode); if w.generation < self.current_generation() {