Phase 2 implemented with additional guards

This commit is contained in:
David Osipov
2026-04-03 02:08:59 +04:00
parent a9f695623d
commit 6ea867ce36
27 changed files with 2513 additions and 1131 deletions

View File

@@ -1,14 +1,16 @@
use std::collections::hash_map::RandomState;
#[cfg(test)]
use std::collections::hash_map::DefaultHasher;
use std::collections::{BTreeSet, HashMap};
#[cfg(test)]
use std::future::Future;
use std::hash::{BuildHasher, Hash};
#[cfg(test)]
use std::hash::Hasher;
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::sync::Arc;
use std::time::{Duration, Instant};
use dashmap::DashMap;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::timeout;
@@ -19,6 +21,7 @@ use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result};
use crate::protocol::constants::{secure_padding_len, *};
use crate::proxy::handshake::HandshakeSuccess;
use crate::proxy::shared_state::ProxySharedState;
use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay,
@@ -51,19 +54,9 @@ const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
const ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR: usize = 2;
const ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES: usize = 128 * 1024;
const QUOTA_RESERVE_SPIN_RETRIES: usize = 32;
static DESYNC_DEDUP: OnceLock<DashMap<u64, Instant>> = OnceLock::new();
static DESYNC_DEDUP_PREVIOUS: OnceLock<DashMap<u64, Instant>> = OnceLock::new();
static DESYNC_HASHER: OnceLock<RandomState> = OnceLock::new();
static DESYNC_FULL_CACHE_LAST_EMIT_AT: OnceLock<Mutex<Option<Instant>>> = OnceLock::new();
static DESYNC_DEDUP_ROTATION_STATE: OnceLock<Mutex<DesyncDedupRotationState>> = OnceLock::new();
// Invariant for async callers:
// this std::sync::Mutex is allowed only because critical sections are short,
// synchronous, and MUST never cross an `.await`.
static RELAY_IDLE_CANDIDATE_REGISTRY: OnceLock<Mutex<RelayIdleCandidateRegistry>> = OnceLock::new();
static RELAY_IDLE_MARK_SEQ: AtomicU64 = AtomicU64::new(0);
#[derive(Default)]
struct DesyncDedupRotationState {
pub(crate) struct DesyncDedupRotationState {
current_started_at: Option<Instant>,
}
@@ -80,7 +73,7 @@ struct RelayForensicsState {
}
#[derive(Default)]
struct RelayIdleCandidateRegistry {
pub(crate) struct RelayIdleCandidateRegistry {
by_conn_id: HashMap<u64, RelayIdleCandidateMeta>,
ordered: BTreeSet<(u64, u64)>,
pressure_event_seq: u64,
@@ -93,20 +86,14 @@ struct RelayIdleCandidateMeta {
mark_pressure_seq: u64,
}
fn relay_idle_candidate_registry() -> &'static Mutex<RelayIdleCandidateRegistry> {
RELAY_IDLE_CANDIDATE_REGISTRY.get_or_init(|| Mutex::new(RelayIdleCandidateRegistry::default()))
}
fn relay_idle_candidate_registry_lock() -> std::sync::MutexGuard<'static, RelayIdleCandidateRegistry>
{
// Keep lock scope narrow and synchronous: callers must drop guard before any `.await`.
let registry = relay_idle_candidate_registry();
fn relay_idle_candidate_registry_lock_in(
shared: &ProxySharedState,
) -> std::sync::MutexGuard<'_, RelayIdleCandidateRegistry> {
let registry = &shared.middle_relay.relay_idle_registry;
match registry.lock() {
Ok(guard) => guard,
Err(poisoned) => {
let mut guard = poisoned.into_inner();
// Fail closed after panic while holding registry lock: drop all
// candidates and pressure cursors to avoid stale cross-session state.
*guard = RelayIdleCandidateRegistry::default();
registry.clear_poison();
guard
@@ -114,14 +101,16 @@ fn relay_idle_candidate_registry_lock() -> std::sync::MutexGuard<'static, RelayI
}
}
fn mark_relay_idle_candidate(conn_id: u64) -> bool {
let mut guard = relay_idle_candidate_registry_lock();
fn mark_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u64) -> bool {
let mut guard = relay_idle_candidate_registry_lock_in(shared);
if guard.by_conn_id.contains_key(&conn_id) {
return false;
}
let mark_order_seq = RELAY_IDLE_MARK_SEQ
let mark_order_seq = shared
.middle_relay
.relay_idle_mark_seq
.fetch_add(1, Ordering::Relaxed)
.saturating_add(1);
let meta = RelayIdleCandidateMeta {
@@ -133,36 +122,31 @@ fn mark_relay_idle_candidate(conn_id: u64) -> bool {
true
}
fn clear_relay_idle_candidate(conn_id: u64) {
let mut guard = relay_idle_candidate_registry_lock();
fn clear_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u64) {
let mut guard = relay_idle_candidate_registry_lock_in(shared);
if let Some(meta) = guard.by_conn_id.remove(&conn_id) {
guard.ordered.remove(&(meta.mark_order_seq, conn_id));
}
}
#[cfg(test)]
fn oldest_relay_idle_candidate() -> Option<u64> {
let guard = relay_idle_candidate_registry_lock();
guard.ordered.iter().next().map(|(_, conn_id)| *conn_id)
}
fn note_relay_pressure_event() {
let mut guard = relay_idle_candidate_registry_lock();
fn note_relay_pressure_event_in(shared: &ProxySharedState) {
let mut guard = relay_idle_candidate_registry_lock_in(shared);
guard.pressure_event_seq = guard.pressure_event_seq.wrapping_add(1);
}
fn relay_pressure_event_seq() -> u64 {
let guard = relay_idle_candidate_registry_lock();
fn relay_pressure_event_seq_in(shared: &ProxySharedState) -> u64 {
let guard = relay_idle_candidate_registry_lock_in(shared);
guard.pressure_event_seq
}
fn maybe_evict_idle_candidate_on_pressure(
fn maybe_evict_idle_candidate_on_pressure_in(
shared: &ProxySharedState,
conn_id: u64,
seen_pressure_seq: &mut u64,
stats: &Stats,
) -> bool {
let mut guard = relay_idle_candidate_registry_lock();
let mut guard = relay_idle_candidate_registry_lock_in(shared);
let latest_pressure_seq = guard.pressure_event_seq;
if latest_pressure_seq == *seen_pressure_seq {
@@ -192,7 +176,6 @@ fn maybe_evict_idle_candidate_on_pressure(
return false;
};
// Pressure events that happened before candidate soft-mark are stale for this candidate.
if latest_pressure_seq == candidate_meta.mark_pressure_seq {
return false;
}
@@ -205,15 +188,6 @@ fn maybe_evict_idle_candidate_on_pressure(
true
}
#[cfg(test)]
fn clear_relay_idle_pressure_state_for_testing() {
if RELAY_IDLE_CANDIDATE_REGISTRY.get().is_some() {
let mut guard = relay_idle_candidate_registry_lock();
*guard = RelayIdleCandidateRegistry::default();
}
RELAY_IDLE_MARK_SEQ.store(0, Ordering::Relaxed);
}
#[derive(Clone, Copy)]
struct MeD2cFlushPolicy {
max_frames: usize,
@@ -235,31 +209,41 @@ struct RelayClientIdlePolicy {
impl RelayClientIdlePolicy {
fn from_config(config: &ProxyConfig) -> Self {
let frame_read_timeout =
Duration::from_secs(config.timeouts.relay_client_idle_hard_secs.max(1));
if !config.timeouts.relay_idle_policy_v2_enabled {
return Self::disabled(frame_read_timeout);
}
let soft_idle = Duration::from_secs(config.timeouts.relay_client_idle_soft_secs.max(1));
let hard_idle = Duration::from_secs(config.timeouts.relay_client_idle_hard_secs.max(1));
let grace_after_downstream_activity = Duration::from_secs(
config
.timeouts
.relay_idle_grace_after_downstream_activity_secs,
);
Self {
enabled: config.timeouts.relay_idle_policy_v2_enabled,
soft_idle: Duration::from_secs(config.timeouts.relay_client_idle_soft_secs.max(1)),
hard_idle: Duration::from_secs(config.timeouts.relay_client_idle_hard_secs.max(1)),
grace_after_downstream_activity: Duration::from_secs(
config
.timeouts
.relay_idle_grace_after_downstream_activity_secs,
),
legacy_frame_read_timeout: Duration::from_secs(config.timeouts.client_handshake.max(1)),
enabled: true,
soft_idle,
hard_idle,
grace_after_downstream_activity,
legacy_frame_read_timeout: frame_read_timeout,
}
}
#[cfg(test)]
fn disabled(frame_read_timeout: Duration) -> Self {
Self {
enabled: false,
soft_idle: Duration::from_secs(0),
hard_idle: Duration::from_secs(0),
grace_after_downstream_activity: Duration::from_secs(0),
soft_idle: frame_read_timeout,
hard_idle: frame_read_timeout,
grace_after_downstream_activity: Duration::ZERO,
legacy_frame_read_timeout: frame_read_timeout,
}
}
}
#[derive(Clone, Copy)]
struct RelayClientIdleState {
last_client_frame_at: Instant,
soft_idle_marked: bool,
@@ -303,24 +287,39 @@ impl MeD2cFlushPolicy {
}
}
#[cfg(test)]
fn hash_value<T: Hash>(value: &T) -> u64 {
let state = DESYNC_HASHER.get_or_init(RandomState::new);
state.hash_one(value)
let mut hasher = DefaultHasher::new();
value.hash(&mut hasher);
hasher.finish()
}
fn hash_value_in<T: Hash>(shared: &ProxySharedState, value: &T) -> u64 {
shared.middle_relay.desync_hasher.hash_one(value)
}
#[cfg(test)]
fn hash_ip(ip: IpAddr) -> u64 {
hash_value(&ip)
}
fn should_emit_full_desync(key: u64, all_full: bool, now: Instant) -> bool {
fn hash_ip_in(shared: &ProxySharedState, ip: IpAddr) -> u64 {
hash_value_in(shared, &ip)
}
fn should_emit_full_desync_in(
shared: &ProxySharedState,
key: u64,
all_full: bool,
now: Instant,
) -> bool {
if all_full {
return true;
}
let dedup_current = DESYNC_DEDUP.get_or_init(DashMap::new);
let dedup_previous = DESYNC_DEDUP_PREVIOUS.get_or_init(DashMap::new);
let rotation_state =
DESYNC_DEDUP_ROTATION_STATE.get_or_init(|| Mutex::new(DesyncDedupRotationState::default()));
let dedup_current = &shared.middle_relay.desync_dedup;
let dedup_previous = &shared.middle_relay.desync_dedup_previous;
let rotation_state = &shared.middle_relay.desync_dedup_rotation_state;
let mut state = match rotation_state.lock() {
Ok(guard) => guard,
@@ -366,8 +365,6 @@ fn should_emit_full_desync(key: u64, all_full: bool, now: Instant) -> bool {
None => true,
};
if within_window {
// Keep the original timestamp when promoting from previous bucket,
// so dedup expiry remains tied to first-seen time.
dedup_current.insert(key, seen_at);
return false;
}
@@ -375,8 +372,6 @@ fn should_emit_full_desync(key: u64, all_full: bool, now: Instant) -> bool {
}
if dedup_current.len() >= DESYNC_DEDUP_MAX_ENTRIES {
// Bounded eviction path: rotate buckets instead of scanning/evicting
// arbitrary entries from a saturated single map.
dedup_previous.clear();
for entry in dedup_current.iter() {
dedup_previous.insert(*entry.key(), *entry.value());
@@ -384,15 +379,15 @@ fn should_emit_full_desync(key: u64, all_full: bool, now: Instant) -> bool {
dedup_current.clear();
state.current_started_at = Some(now);
dedup_current.insert(key, now);
should_emit_full_desync_full_cache(now)
should_emit_full_desync_full_cache_in(shared, now)
} else {
dedup_current.insert(key, now);
true
}
}
fn should_emit_full_desync_full_cache(now: Instant) -> bool {
let gate = DESYNC_FULL_CACHE_LAST_EMIT_AT.get_or_init(|| Mutex::new(None));
fn should_emit_full_desync_full_cache_in(shared: &ProxySharedState, now: Instant) -> bool {
let gate = &shared.middle_relay.desync_full_cache_last_emit_at;
let Ok(mut last_emit_at) = gate.lock() else {
return false;
};
@@ -417,46 +412,6 @@ fn should_emit_full_desync_full_cache(now: Instant) -> bool {
}
}
#[cfg(test)]
fn clear_desync_dedup_for_testing() {
if let Some(dedup) = DESYNC_DEDUP.get() {
dedup.clear();
}
if let Some(dedup_previous) = DESYNC_DEDUP_PREVIOUS.get() {
dedup_previous.clear();
}
if let Some(rotation_state) = DESYNC_DEDUP_ROTATION_STATE.get() {
match rotation_state.lock() {
Ok(mut guard) => {
*guard = DesyncDedupRotationState::default();
}
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = DesyncDedupRotationState::default();
rotation_state.clear_poison();
}
}
}
if let Some(last_emit_at) = DESYNC_FULL_CACHE_LAST_EMIT_AT.get() {
match last_emit_at.lock() {
Ok(mut guard) => {
*guard = None;
}
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = None;
last_emit_at.clear_poison();
}
}
}
}
#[cfg(test)]
fn desync_dedup_test_lock() -> &'static Mutex<()> {
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
TEST_LOCK.get_or_init(|| Mutex::new(()))
}
fn desync_forensics_len_bytes(len: usize) -> ([u8; 4], bool) {
match u32::try_from(len) {
Ok(value) => (value.to_le_bytes(), false),
@@ -464,7 +419,8 @@ fn desync_forensics_len_bytes(len: usize) -> ([u8; 4], bool) {
}
}
fn report_desync_frame_too_large(
fn report_desync_frame_too_large_in(
shared: &ProxySharedState,
state: &RelayForensicsState,
proto_tag: ProtoTag,
frame_counter: u64,
@@ -482,13 +438,13 @@ fn report_desync_frame_too_large(
.map(|b| matches!(b[0], b'G' | b'P' | b'H' | b'C' | b'D'))
.unwrap_or(false);
let now = Instant::now();
let dedup_key = hash_value(&(
let dedup_key = hash_value_in(shared, &(
state.user.as_str(),
state.peer_hash,
proto_tag,
DESYNC_ERROR_CLASS,
));
let emit_full = should_emit_full_desync(dedup_key, state.desync_all_full, now);
let emit_full = should_emit_full_desync_in(shared, dedup_key, state.desync_all_full, now);
let duration_ms = state.started_at.elapsed().as_millis() as u64;
let bytes_me2c = state.bytes_me2c.load(Ordering::Relaxed);
@@ -557,6 +513,29 @@ fn report_desync_frame_too_large(
))
}
#[cfg(test)]
fn report_desync_frame_too_large(
state: &RelayForensicsState,
proto_tag: ProtoTag,
frame_counter: u64,
max_frame: usize,
len: usize,
raw_len_bytes: Option<[u8; 4]>,
stats: &Stats,
) -> ProxyError {
let shared = ProxySharedState::new();
report_desync_frame_too_large_in(
shared.as_ref(),
state,
proto_tag,
frame_counter,
max_frame,
len,
raw_len_bytes,
stats,
)
}
fn should_yield_c2me_sender(sent_since_yield: usize, has_backlog: bool) -> bool {
has_backlog && sent_since_yield >= C2ME_SENDER_FAIRNESS_BUDGET
}
@@ -629,19 +608,263 @@ fn observe_me_d2c_flush_event(
}
#[cfg(test)]
fn relay_idle_pressure_test_guard() -> &'static Mutex<()> {
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
TEST_LOCK.get_or_init(|| Mutex::new(()))
pub(crate) fn mark_relay_idle_candidate_for_testing(shared: &ProxySharedState, conn_id: u64) -> bool {
let registry = &shared.middle_relay.relay_idle_registry;
let mut guard = match registry.lock() {
Ok(guard) => guard,
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = RelayIdleCandidateRegistry::default();
registry.clear_poison();
guard
}
};
if guard.by_conn_id.contains_key(&conn_id) {
return false;
}
let mark_order_seq = shared
.middle_relay
.relay_idle_mark_seq
.fetch_add(1, Ordering::Relaxed);
let mark_pressure_seq = guard.pressure_event_seq;
let meta = RelayIdleCandidateMeta {
mark_order_seq,
mark_pressure_seq,
};
guard.by_conn_id.insert(conn_id, meta);
guard.ordered.insert((mark_order_seq, conn_id));
true
}
#[cfg(test)]
pub(crate) fn relay_idle_pressure_test_scope() -> std::sync::MutexGuard<'static, ()> {
relay_idle_pressure_test_guard()
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
pub(crate) fn oldest_relay_idle_candidate_for_testing(shared: &ProxySharedState) -> Option<u64> {
let registry = &shared.middle_relay.relay_idle_registry;
let guard = match registry.lock() {
Ok(guard) => guard,
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = RelayIdleCandidateRegistry::default();
registry.clear_poison();
guard
}
};
guard.ordered.iter().next().map(|(_, conn_id)| *conn_id)
}
async fn enqueue_c2me_command(
#[cfg(test)]
pub(crate) fn clear_relay_idle_candidate_for_testing(shared: &ProxySharedState, conn_id: u64) {
let registry = &shared.middle_relay.relay_idle_registry;
let mut guard = match registry.lock() {
Ok(guard) => guard,
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = RelayIdleCandidateRegistry::default();
registry.clear_poison();
guard
}
};
if let Some(meta) = guard.by_conn_id.remove(&conn_id) {
guard.ordered.remove(&(meta.mark_order_seq, conn_id));
}
}
#[cfg(test)]
pub(crate) fn clear_relay_idle_pressure_state_for_testing_in_shared(shared: &ProxySharedState) {
if let Ok(mut guard) = shared.middle_relay.relay_idle_registry.lock() {
*guard = RelayIdleCandidateRegistry::default();
}
shared
.middle_relay
.relay_idle_mark_seq
.store(0, Ordering::Relaxed);
}
#[cfg(test)]
pub(crate) fn note_relay_pressure_event_for_testing(shared: &ProxySharedState) {
note_relay_pressure_event_in(shared);
}
#[cfg(test)]
pub(crate) fn relay_pressure_event_seq_for_testing(shared: &ProxySharedState) -> u64 {
relay_pressure_event_seq_in(shared)
}
#[cfg(test)]
pub(crate) fn relay_idle_mark_seq_for_testing(shared: &ProxySharedState) -> u64 {
shared.middle_relay.relay_idle_mark_seq.load(Ordering::Relaxed)
}
#[cfg(test)]
pub(crate) fn maybe_evict_idle_candidate_on_pressure_for_testing(
shared: &ProxySharedState,
conn_id: u64,
seen_pressure_seq: &mut u64,
stats: &Stats,
) -> bool {
maybe_evict_idle_candidate_on_pressure_in(shared, conn_id, seen_pressure_seq, stats)
}
#[cfg(test)]
pub(crate) fn set_relay_pressure_state_for_testing(
shared: &ProxySharedState,
pressure_event_seq: u64,
pressure_consumed_seq: u64,
) {
let registry = &shared.middle_relay.relay_idle_registry;
let mut guard = match registry.lock() {
Ok(guard) => guard,
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = RelayIdleCandidateRegistry::default();
registry.clear_poison();
guard
}
};
guard.pressure_event_seq = pressure_event_seq;
guard.pressure_consumed_seq = pressure_consumed_seq;
}
#[cfg(test)]
pub(crate) fn should_emit_full_desync_for_testing(
shared: &ProxySharedState,
key: u64,
all_full: bool,
now: Instant,
) -> bool {
if all_full {
return true;
}
let dedup_current = &shared.middle_relay.desync_dedup;
let dedup_previous = &shared.middle_relay.desync_dedup_previous;
let Ok(mut state) = shared.middle_relay.desync_dedup_rotation_state.lock() else {
return false;
};
let rotate_now = match state.current_started_at {
Some(current_started_at) => match now.checked_duration_since(current_started_at) {
Some(elapsed) => elapsed >= DESYNC_DEDUP_WINDOW,
None => true,
},
None => true,
};
if rotate_now {
dedup_previous.clear();
for entry in dedup_current.iter() {
dedup_previous.insert(*entry.key(), *entry.value());
}
dedup_current.clear();
state.current_started_at = Some(now);
}
if let Some(seen_at) = dedup_current.get(&key).map(|entry| *entry.value()) {
let within_window = match now.checked_duration_since(seen_at) {
Some(elapsed) => elapsed < DESYNC_DEDUP_WINDOW,
None => true,
};
if within_window {
return false;
}
dedup_current.insert(key, now);
return true;
}
if let Some(seen_at) = dedup_previous.get(&key).map(|entry| *entry.value()) {
let within_window = match now.checked_duration_since(seen_at) {
Some(elapsed) => elapsed < DESYNC_DEDUP_WINDOW,
None => true,
};
if within_window {
dedup_current.insert(key, seen_at);
return false;
}
dedup_previous.remove(&key);
}
if dedup_current.len() >= DESYNC_DEDUP_MAX_ENTRIES {
dedup_previous.clear();
for entry in dedup_current.iter() {
dedup_previous.insert(*entry.key(), *entry.value());
}
dedup_current.clear();
state.current_started_at = Some(now);
dedup_current.insert(key, now);
let Ok(mut last_emit_at) = shared.middle_relay.desync_full_cache_last_emit_at.lock() else {
return false;
};
return match *last_emit_at {
None => {
*last_emit_at = Some(now);
true
}
Some(last) => {
let Some(elapsed) = now.checked_duration_since(last) else {
*last_emit_at = Some(now);
return true;
};
if elapsed >= DESYNC_FULL_CACHE_EMIT_MIN_INTERVAL {
*last_emit_at = Some(now);
true
} else {
false
}
}
};
}
dedup_current.insert(key, now);
true
}
#[cfg(test)]
pub(crate) fn clear_desync_dedup_for_testing_in_shared(shared: &ProxySharedState) {
shared.middle_relay.desync_dedup.clear();
shared.middle_relay.desync_dedup_previous.clear();
if let Ok(mut rotation_state) = shared.middle_relay.desync_dedup_rotation_state.lock() {
*rotation_state = DesyncDedupRotationState::default();
}
if let Ok(mut last_emit_at) = shared.middle_relay.desync_full_cache_last_emit_at.lock() {
*last_emit_at = None;
}
}
#[cfg(test)]
pub(crate) fn desync_dedup_len_for_testing(shared: &ProxySharedState) -> usize {
shared.middle_relay.desync_dedup.len()
}
#[cfg(test)]
pub(crate) fn desync_dedup_insert_for_testing(shared: &ProxySharedState, key: u64, at: Instant) {
shared.middle_relay.desync_dedup.insert(key, at);
}
#[cfg(test)]
pub(crate) fn desync_dedup_get_for_testing(
shared: &ProxySharedState,
key: u64,
) -> Option<Instant> {
shared
.middle_relay
.desync_dedup
.get(&key)
.map(|entry| *entry.value())
}
#[cfg(test)]
pub(crate) fn desync_dedup_keys_for_testing(shared: &ProxySharedState) -> std::collections::HashSet<u64> {
shared
.middle_relay
.desync_dedup
.iter()
.map(|entry| *entry.key())
.collect()
}
async fn enqueue_c2me_command_in(
shared: &ProxySharedState,
tx: &mpsc::Sender<C2MeCommand>,
cmd: C2MeCommand,
send_timeout: Option<Duration>,
@@ -653,7 +876,7 @@ async fn enqueue_c2me_command(
Err(mpsc::error::TrySendError::Full(cmd)) => {
stats.increment_me_c2me_send_full_total();
stats.increment_me_c2me_send_high_water_total();
note_relay_pressure_event();
note_relay_pressure_event_in(shared);
// Cooperative yield reduces burst catch-up when the per-conn queue is near saturation.
if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS {
tokio::task::yield_now().await;
@@ -682,6 +905,17 @@ async fn enqueue_c2me_command(
}
}
#[cfg(test)]
async fn enqueue_c2me_command(
tx: &mpsc::Sender<C2MeCommand>,
cmd: C2MeCommand,
send_timeout: Option<Duration>,
stats: &Stats,
) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> {
let shared = ProxySharedState::new();
enqueue_c2me_command_in(shared.as_ref(), tx, cmd, send_timeout, stats).await
}
#[cfg(test)]
async fn run_relay_test_step_timeout<F, T>(context: &'static str, fut: F) -> T
where
@@ -705,6 +939,7 @@ pub(crate) async fn handle_via_middle_proxy<R, W>(
mut route_rx: watch::Receiver<RouteCutoverState>,
route_snapshot: RouteCutoverState,
session_id: u64,
shared: Arc<ProxySharedState>,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
@@ -735,7 +970,7 @@ where
conn_id,
user: user.clone(),
peer,
peer_hash: hash_ip(peer.ip()),
peer_hash: hash_ip_in(shared.as_ref(), peer.ip()),
started_at: Instant::now(),
bytes_c2me: 0,
bytes_me2c: bytes_me2c.clone(),
@@ -1184,10 +1419,11 @@ where
let mut client_closed = false;
let mut frame_counter: u64 = 0;
let mut route_watch_open = true;
let mut seen_pressure_seq = relay_pressure_event_seq();
let mut seen_pressure_seq = relay_pressure_event_seq_in(shared.as_ref());
loop {
if relay_idle_policy.enabled
&& maybe_evict_idle_candidate_on_pressure(
&& maybe_evict_idle_candidate_on_pressure_in(
shared.as_ref(),
conn_id,
&mut seen_pressure_seq,
stats.as_ref(),
@@ -1199,7 +1435,8 @@ where
user = %user,
"Middle-relay pressure eviction for idle-candidate session"
);
let _ = enqueue_c2me_command(
let _ = enqueue_c2me_command_in(
shared.as_ref(),
&c2me_tx,
C2MeCommand::Close,
c2me_send_timeout,
@@ -1224,7 +1461,8 @@ where
"Cutover affected middle session, closing client connection"
);
tokio::time::sleep(delay).await;
let _ = enqueue_c2me_command(
let _ = enqueue_c2me_command_in(
shared.as_ref(),
&c2me_tx,
C2MeCommand::Close,
c2me_send_timeout,
@@ -1241,7 +1479,7 @@ where
route_watch_open = false;
}
}
payload_result = read_client_payload_with_idle_policy(
payload_result = read_client_payload_with_idle_policy_in(
&mut crypto_reader,
proto_tag,
frame_limit,
@@ -1249,6 +1487,7 @@ where
&forensics,
&mut frame_counter,
&stats,
shared.as_ref(),
&relay_idle_policy,
&mut relay_idle_state,
last_downstream_activity_ms.as_ref(),
@@ -1288,7 +1527,8 @@ where
flags |= RPC_FLAG_NOT_ENCRYPTED;
}
// Keep client read loop lightweight: route heavy ME send path via a dedicated task.
if enqueue_c2me_command(
if enqueue_c2me_command_in(
shared.as_ref(),
&c2me_tx,
C2MeCommand::Data { payload, flags },
c2me_send_timeout,
@@ -1304,7 +1544,8 @@ where
Ok(None) => {
debug!(conn_id, "Client EOF");
client_closed = true;
let _ = enqueue_c2me_command(
let _ = enqueue_c2me_command_in(
shared.as_ref(),
&c2me_tx,
C2MeCommand::Close,
c2me_send_timeout,
@@ -1359,7 +1600,7 @@ where
frames_ok = frame_counter,
"ME relay cleanup"
);
clear_relay_idle_candidate(conn_id);
clear_relay_idle_candidate_in(shared.as_ref(), conn_id);
me_pool.registry().unregister(conn_id).await;
buffer_pool.trim_to(buffer_pool.max_buffers().min(64));
let pool_snapshot = buffer_pool.stats();
@@ -1371,7 +1612,7 @@ where
result
}
async fn read_client_payload_with_idle_policy<R>(
async fn read_client_payload_with_idle_policy_in<R>(
client_reader: &mut CryptoReader<R>,
proto_tag: ProtoTag,
max_frame: usize,
@@ -1379,6 +1620,7 @@ async fn read_client_payload_with_idle_policy<R>(
forensics: &RelayForensicsState,
frame_counter: &mut u64,
stats: &Stats,
shared: &ProxySharedState,
idle_policy: &RelayClientIdlePolicy,
idle_state: &mut RelayClientIdleState,
last_downstream_activity_ms: &AtomicU64,
@@ -1398,6 +1640,7 @@ where
session_started_at: Instant,
forensics: &RelayForensicsState,
stats: &Stats,
shared: &ProxySharedState,
read_label: &'static str,
) -> Result<()>
where
@@ -1433,7 +1676,7 @@ where
let hard_deadline =
hard_deadline(idle_policy, idle_state, session_started_at, downstream_ms);
if now >= hard_deadline {
clear_relay_idle_candidate(forensics.conn_id);
clear_relay_idle_candidate_in(shared, forensics.conn_id);
stats.increment_relay_idle_hard_close_total();
let client_idle_secs = now
.saturating_duration_since(idle_state.last_client_frame_at)
@@ -1471,7 +1714,7 @@ where
>= idle_policy.soft_idle
{
idle_state.soft_idle_marked = true;
if mark_relay_idle_candidate(forensics.conn_id) {
if mark_relay_idle_candidate_in(shared, forensics.conn_id) {
stats.increment_relay_idle_soft_mark_total();
}
info!(
@@ -1541,6 +1784,7 @@ where
session_started_at,
forensics,
stats,
shared,
"abridged.first_len_byte",
)
.await
@@ -1564,6 +1808,7 @@ where
session_started_at,
forensics,
stats,
shared,
"abridged.extended_len",
)
.await?;
@@ -1588,6 +1833,7 @@ where
session_started_at,
forensics,
stats,
shared,
"len_prefix",
)
.await
@@ -1644,7 +1890,8 @@ where
}
if len > max_frame {
return Err(report_desync_frame_too_large(
return Err(report_desync_frame_too_large_in(
shared,
forensics,
proto_tag,
*frame_counter,
@@ -1686,6 +1933,7 @@ where
session_started_at,
forensics,
stats,
shared,
"payload",
)
.await?;
@@ -1697,11 +1945,46 @@ where
*frame_counter += 1;
idle_state.on_client_frame(Instant::now());
idle_state.tiny_frame_debt = idle_state.tiny_frame_debt.saturating_sub(1);
clear_relay_idle_candidate(forensics.conn_id);
clear_relay_idle_candidate_in(shared, forensics.conn_id);
return Ok(Some((payload, quickack)));
}
}
#[cfg(test)]
async fn read_client_payload_with_idle_policy<R>(
client_reader: &mut CryptoReader<R>,
proto_tag: ProtoTag,
max_frame: usize,
buffer_pool: &Arc<BufferPool>,
forensics: &RelayForensicsState,
frame_counter: &mut u64,
stats: &Stats,
idle_policy: &RelayClientIdlePolicy,
idle_state: &mut RelayClientIdleState,
last_downstream_activity_ms: &AtomicU64,
session_started_at: Instant,
) -> Result<Option<(PooledBuffer, bool)>>
where
R: AsyncRead + Unpin + Send + 'static,
{
let shared = ProxySharedState::new();
read_client_payload_with_idle_policy_in(
client_reader,
proto_tag,
max_frame,
buffer_pool,
forensics,
frame_counter,
stats,
shared.as_ref(),
idle_policy,
idle_state,
last_downstream_activity_ms,
session_started_at,
)
.await
}
#[cfg(test)]
async fn read_client_payload_legacy<R>(
client_reader: &mut CryptoReader<R>,
@@ -1717,10 +2000,11 @@ where
R: AsyncRead + Unpin + Send + 'static,
{
let now = Instant::now();
let shared = ProxySharedState::new();
let mut idle_state = RelayClientIdleState::new(now);
let last_downstream_activity_ms = AtomicU64::new(0);
let idle_policy = RelayClientIdlePolicy::disabled(frame_read_timeout);
read_client_payload_with_idle_policy(
read_client_payload_with_idle_policy_in(
client_reader,
proto_tag,
max_frame,
@@ -1728,6 +2012,7 @@ where
forensics,
frame_counter,
stats,
shared.as_ref(),
&idle_policy,
&mut idle_state,
&last_downstream_activity_ms,