Reducing hot-path allocs + duplicate telemetry touchs

Signed-off-by: Alexey <247128645+axkurcom@users.noreply.github.com>
This commit is contained in:
Alexey
2026-05-20 17:07:54 +03:00
parent 8379b48f69
commit c02c7fbe43
6 changed files with 90 additions and 33 deletions

View File

@@ -550,9 +550,7 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
this.counters.touch(Instant::now(), this.epoch); this.counters.touch(Instant::now(), this.epoch);
this.stats this.stats
.add_user_octets_from_handle(this.user_stats.as_ref(), n_to_charge); .add_user_traffic_from_handle(this.user_stats.as_ref(), n_to_charge);
this.stats
.increment_user_msgs_from_handle(this.user_stats.as_ref());
if this.traffic_lease.is_some() { if this.traffic_lease.is_some() {
this.c2s_rate_debt_bytes = this.c2s_rate_debt_bytes =
this.c2s_rate_debt_bytes.saturating_add(n_to_charge); this.c2s_rate_debt_bytes.saturating_add(n_to_charge);
@@ -718,9 +716,7 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
this.counters.touch(Instant::now(), this.epoch); this.counters.touch(Instant::now(), this.epoch);
this.stats this.stats
.add_user_octets_to_handle(this.user_stats.as_ref(), n_to_charge); .add_user_traffic_to_handle(this.user_stats.as_ref(), n_to_charge);
this.stats
.increment_user_msgs_to_handle(this.user_stats.as_ref());
if let (Some(limit), Some(remaining)) = (this.quota_limit, remaining_before) { if let (Some(limit), Some(remaining)) = (this.quota_limit, remaining_before) {
if should_immediate_quota_check(remaining, n_to_charge) { if should_immediate_quota_check(remaining, n_to_charge) {

View File

@@ -474,6 +474,30 @@ impl Stats {
.fetch_add(bytes, Ordering::Relaxed); .fetch_add(bytes, Ordering::Relaxed);
} }
#[inline]
pub(crate) fn add_user_traffic_from_handle(&self, user_stats: &UserStats, bytes: u64) {
if !self.telemetry_user_enabled() {
return;
}
self.touch_user_stats(user_stats);
user_stats
.octets_from_client
.fetch_add(bytes, Ordering::Relaxed);
user_stats.msgs_from_client.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub(crate) fn add_user_traffic_to_handle(&self, user_stats: &UserStats, bytes: u64) {
if !self.telemetry_user_enabled() {
return;
}
self.touch_user_stats(user_stats);
user_stats
.octets_to_client
.fetch_add(bytes, Ordering::Relaxed);
user_stats.msgs_to_client.fetch_add(1, Ordering::Relaxed);
}
#[inline] #[inline]
pub(crate) fn increment_user_msgs_from_handle(&self, user_stats: &UserStats) { pub(crate) fn increment_user_msgs_from_handle(&self, user_stats: &UserStats) {
if !self.telemetry_user_enabled() { if !self.telemetry_user_enabled() {

View File

@@ -5,6 +5,9 @@ use crate::crypto::{AesCbc, crc32, crc32c};
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::protocol::constants::*; use crate::protocol::constants::*;
const RPC_WRITER_FRAME_BUF_SHRINK_THRESHOLD: usize = 256 * 1024;
const RPC_WRITER_FRAME_BUF_RETAIN: usize = 64 * 1024;
/// Commands sent to dedicated writer tasks to avoid mutex contention on TCP writes. /// Commands sent to dedicated writer tasks to avoid mutex contention on TCP writes.
pub(crate) enum WriterCommand { pub(crate) enum WriterCommand {
Data(Bytes), Data(Bytes),
@@ -43,6 +46,7 @@ pub(crate) fn rpc_crc(mode: RpcChecksumMode, data: &[u8]) -> u32 {
} }
} }
/// Builds a fixed-size control payload without heap allocation.
pub(crate) fn build_control_payload(tag: u32, value: u64) -> [u8; 12] { pub(crate) fn build_control_payload(tag: u32, value: u64) -> [u8; 12] {
let mut payload = [0u8; 12]; let mut payload = [0u8; 12];
payload[..4].copy_from_slice(&tag.to_le_bytes()); payload[..4].copy_from_slice(&tag.to_le_bytes());
@@ -51,14 +55,26 @@ pub(crate) fn build_control_payload(tag: u32, value: u64) -> [u8; 12] {
} }
pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8], crc_mode: RpcChecksumMode) -> Vec<u8> { pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8], crc_mode: RpcChecksumMode) -> Vec<u8> {
let total_len = (4 + 4 + payload.len() + 4) as u32; let mut frame = Vec::new();
let mut frame = Vec::with_capacity(total_len as usize); build_rpc_frame_into(&mut frame, seq_no, payload, crc_mode);
frame
}
fn build_rpc_frame_into(
frame: &mut Vec<u8>,
seq_no: i32,
payload: &[u8],
crc_mode: RpcChecksumMode,
) {
let total_len = 4 + 4 + payload.len() + 4;
frame.clear();
frame.reserve(total_len + 15);
let total_len = total_len as u32;
frame.extend_from_slice(&total_len.to_le_bytes()); frame.extend_from_slice(&total_len.to_le_bytes());
frame.extend_from_slice(&seq_no.to_le_bytes()); frame.extend_from_slice(&seq_no.to_le_bytes());
frame.extend_from_slice(payload); frame.extend_from_slice(payload);
let c = rpc_crc(crc_mode, &frame); let c = rpc_crc(crc_mode, &frame);
frame.extend_from_slice(&c.to_le_bytes()); frame.extend_from_slice(&c.to_le_bytes());
frame
} }
pub(crate) async fn read_rpc_frame_plaintext( pub(crate) async fn read_rpc_frame_plaintext(
@@ -226,29 +242,35 @@ pub(crate) struct RpcWriter {
pub(crate) iv: [u8; 16], pub(crate) iv: [u8; 16],
pub(crate) seq_no: i32, pub(crate) seq_no: i32,
pub(crate) crc_mode: RpcChecksumMode, pub(crate) crc_mode: RpcChecksumMode,
pub(crate) frame_buf: Vec<u8>,
} }
impl RpcWriter { impl RpcWriter {
pub(crate) async fn send(&mut self, payload: &[u8]) -> Result<()> { pub(crate) async fn send(&mut self, payload: &[u8]) -> Result<()> {
let frame = build_rpc_frame(self.seq_no, payload, self.crc_mode); build_rpc_frame_into(&mut self.frame_buf, self.seq_no, payload, self.crc_mode);
self.seq_no = self.seq_no.wrapping_add(1); self.seq_no = self.seq_no.wrapping_add(1);
let pad = (16 - (frame.len() % 16)) % 16; let pad = (16 - (self.frame_buf.len() % 16)) % 16;
let mut buf = frame;
let pad_pattern: [u8; 4] = [0x04, 0x00, 0x00, 0x00]; let pad_pattern: [u8; 4] = [0x04, 0x00, 0x00, 0x00];
for i in 0..pad { for i in 0..pad {
buf.push(pad_pattern[i % 4]); self.frame_buf.push(pad_pattern[i % 4]);
} }
let cipher = AesCbc::new(self.key, self.iv); let cipher = AesCbc::new(self.key, self.iv);
cipher cipher
.encrypt_in_place(&mut buf) .encrypt_in_place(&mut self.frame_buf)
.map_err(|e| ProxyError::Crypto(format!("{e}")))?; .map_err(|e| ProxyError::Crypto(format!("{e}")))?;
if buf.len() >= 16 { if self.frame_buf.len() >= 16 {
self.iv.copy_from_slice(&buf[buf.len() - 16..]); self.iv
.copy_from_slice(&self.frame_buf[self.frame_buf.len() - 16..]);
} }
self.writer.write_all(&buf).await.map_err(ProxyError::Io) let write_result = self.writer.write_all(&self.frame_buf).await;
if self.frame_buf.capacity() > RPC_WRITER_FRAME_BUF_SHRINK_THRESHOLD {
self.frame_buf.clear();
self.frame_buf.shrink_to(RPC_WRITER_FRAME_BUF_RETAIN);
}
write_result.map_err(ProxyError::Io)
} }
pub(crate) async fn send_and_flush(&mut self, payload: &[u8]) -> Result<()> { pub(crate) async fn send_and_flush(&mut self, payload: &[u8]) -> Result<()> {

View File

@@ -378,6 +378,7 @@ impl MePool {
iv: hs.write_iv, iv: hs.write_iv,
seq_no: 0, seq_no: 0,
crc_mode: hs.crc_mode, crc_mode: hs.crc_mode,
frame_buf: Vec::new(),
}; };
let writer = MeWriter { let writer = MeWriter {
id: writer_id, id: writer_id,

View File

@@ -567,6 +567,29 @@ impl ConnRegistry {
}) })
} }
/// Returns the active writer and routing metadata from one hot-binding lookup.
pub async fn get_writer_with_meta(&self, conn_id: u64) -> Option<(ConnWriter, ConnMeta)> {
if !self.routing.map.contains_key(&conn_id) {
return None;
}
let hot = self.hot_binding.map.get(&conn_id)?;
let writer_id = hot.writer_id;
let meta = hot.meta.clone();
let writer = self
.writers
.map
.get(&writer_id)
.map(|entry| entry.value().clone())?;
Some((
ConnWriter {
writer_id,
tx: writer,
},
meta,
))
}
pub async fn active_conn_ids(&self) -> Vec<u64> { pub async fn active_conn_ids(&self) -> Vec<u64> {
let binding = self.binding.inner.lock().await; let binding = self.binding.inner.lock().await;
binding.writer_for_conn.keys().copied().collect() binding.writer_for_conn.keys().copied().collect()

View File

@@ -46,12 +46,6 @@ impl MePool {
tag_override: Option<&[u8]>, tag_override: Option<&[u8]>,
) -> Result<()> { ) -> Result<()> {
let tag = tag_override.or(self.proxy_tag.as_deref()); let tag = tag_override.or(self.proxy_tag.as_deref());
let fallback_meta = ConnMeta {
target_dc,
client_addr,
our_addr,
proto_flags,
};
let build_routed_payload = |effective_our_addr: SocketAddr| { let build_routed_payload = |effective_our_addr: SocketAddr| {
( (
build_proxy_req_payload( build_proxy_req_payload(
@@ -90,16 +84,13 @@ impl MePool {
let mut hybrid_wait_current = hybrid_wait_step; let mut hybrid_wait_current = hybrid_wait_step;
loop { loop {
let current_meta = self if let Some((current, current_meta)) =
.registry self.registry.get_writer_with_meta(conn_id).await
.get_meta(conn_id) {
.await let (current_payload, _) = build_routed_payload(current_meta.our_addr);
.unwrap_or_else(|| fallback_meta.clone());
let (current_payload, _) = build_routed_payload(current_meta.our_addr);
if let Some(current) = self.registry.get_writer(conn_id).await {
match current match current
.tx .tx
.try_send(WriterCommand::Data(current_payload.clone())) .try_send(WriterCommand::Data(current_payload))
{ {
Ok(()) => { Ok(()) => {
self.note_hybrid_route_success(); self.note_hybrid_route_success();
@@ -451,7 +442,7 @@ impl MePool {
self.remove_writer_and_close_clients(w.id).await; self.remove_writer_and_close_clients(w.id).await;
continue; continue;
} }
permit.send(WriterCommand::Data(payload.clone())); permit.send(WriterCommand::Data(payload));
self.stats self.stats
.increment_me_writer_pick_success_try_total(pick_mode); .increment_me_writer_pick_success_try_total(pick_mode);
if w.generation < self.current_generation() { if w.generation < self.current_generation() {
@@ -519,7 +510,7 @@ impl MePool {
self.remove_writer_and_close_clients(w.id).await; self.remove_writer_and_close_clients(w.id).await;
continue; continue;
} }
permit.send(WriterCommand::Data(payload.clone())); permit.send(WriterCommand::Data(payload));
self.stats self.stats
.increment_me_writer_pick_success_fallback_total(pick_mode); .increment_me_writer_pick_success_fallback_total(pick_mode);
if w.generation < self.current_generation() { if w.generation < self.current_generation() {