ME Routing Core

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-25 18:18:06 +03:00
parent 28d318d724
commit 0b78583cf5
No known key found for this signature in database
3 changed files with 29 additions and 10 deletions

View File

@ -60,6 +60,9 @@ static DESYNC_DEDUP_PREVIOUS: OnceLock<DashMap<u64, Instant>> = OnceLock::new();
static DESYNC_HASHER: OnceLock<RandomState> = OnceLock::new(); static DESYNC_HASHER: OnceLock<RandomState> = OnceLock::new();
static DESYNC_FULL_CACHE_LAST_EMIT_AT: OnceLock<Mutex<Option<Instant>>> = OnceLock::new(); static DESYNC_FULL_CACHE_LAST_EMIT_AT: OnceLock<Mutex<Option<Instant>>> = OnceLock::new();
static DESYNC_DEDUP_ROTATION_STATE: OnceLock<Mutex<DesyncDedupRotationState>> = OnceLock::new(); static DESYNC_DEDUP_ROTATION_STATE: OnceLock<Mutex<DesyncDedupRotationState>> = OnceLock::new();
// Invariant for async callers:
// this std::sync::Mutex is allowed only because critical sections are short,
// synchronous, and MUST never cross an `.await`.
static RELAY_IDLE_CANDIDATE_REGISTRY: OnceLock<Mutex<RelayIdleCandidateRegistry>> = OnceLock::new(); static RELAY_IDLE_CANDIDATE_REGISTRY: OnceLock<Mutex<RelayIdleCandidateRegistry>> = OnceLock::new();
static RELAY_IDLE_MARK_SEQ: AtomicU64 = AtomicU64::new(0); static RELAY_IDLE_MARK_SEQ: AtomicU64 = AtomicU64::new(0);
@ -100,6 +103,7 @@ fn relay_idle_candidate_registry() -> &'static Mutex<RelayIdleCandidateRegistry>
fn relay_idle_candidate_registry_lock() -> std::sync::MutexGuard<'static, RelayIdleCandidateRegistry> fn relay_idle_candidate_registry_lock() -> std::sync::MutexGuard<'static, RelayIdleCandidateRegistry>
{ {
// Keep lock scope narrow and synchronous: callers must drop guard before any `.await`.
let registry = relay_idle_candidate_registry(); let registry = relay_idle_candidate_registry();
match registry.lock() { match registry.lock() {
Ok(guard) => guard, Ok(guard) => guard,

View File

@ -257,11 +257,17 @@ pub struct SecretSnapshot {
pub secret: Vec<u8>, pub secret: Vec<u8>,
} }
#[allow(dead_code)] pub struct RoutingCore {
pub struct MePool {
pub(super) registry: Arc<ConnRegistry>, pub(super) registry: Arc<ConnRegistry>,
pub(super) writers: Arc<WritersState>, pub(super) writers: Arc<WritersState>,
pub(super) rr: AtomicU64, pub(super) rr: AtomicU64,
pub(super) writer_epoch: watch::Sender<u64>,
pub(super) preferred_endpoints_by_dc: ArcSwap<HashMap<i32, Vec<SocketAddr>>>,
}
#[allow(dead_code)]
pub struct MePool {
pub(super) routing: Arc<RoutingCore>,
pub(super) decision: NetworkDecision, pub(super) decision: NetworkDecision,
pub(super) upstream: Option<Arc<UpstreamManager>>, pub(super) upstream: Option<Arc<UpstreamManager>>,
pub(super) rng: Arc<SecureRandom>, pub(super) rng: Arc<SecureRandom>,
@ -332,7 +338,6 @@ pub struct MePool {
pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>, pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>,
pub(super) nat_reflection_singleflight_v4: Arc<Mutex<()>>, pub(super) nat_reflection_singleflight_v4: Arc<Mutex<()>>,
pub(super) nat_reflection_singleflight_v6: Arc<Mutex<()>>, pub(super) nat_reflection_singleflight_v6: Arc<Mutex<()>>,
pub(super) writer_epoch: watch::Sender<u64>,
pub(super) refill_inflight: Arc<Mutex<HashSet<RefillEndpointKey>>>, pub(super) refill_inflight: Arc<Mutex<HashSet<RefillEndpointKey>>>,
pub(super) refill_inflight_dc: Arc<Mutex<HashSet<RefillDcKey>>>, pub(super) refill_inflight_dc: Arc<Mutex<HashSet<RefillDcKey>>>,
pub(super) conn_count: AtomicUsize, pub(super) conn_count: AtomicUsize,
@ -389,7 +394,14 @@ pub struct MePool {
pub(super) me_last_drain_gate_updated_at_epoch_secs: AtomicU64, pub(super) me_last_drain_gate_updated_at_epoch_secs: AtomicU64,
pub(super) runtime_ready: AtomicBool, pub(super) runtime_ready: AtomicBool,
pool_size: usize, pool_size: usize,
pub(super) preferred_endpoints_by_dc: ArcSwap<HashMap<i32, Vec<SocketAddr>>>, }
impl Deref for MePool {
type Target = RoutingCore;
fn deref(&self) -> &Self::Target {
self.routing.as_ref()
}
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -524,9 +536,13 @@ impl MePool {
let (writer_epoch, _) = watch::channel(0u64); let (writer_epoch, _) = watch::channel(0u64);
let now_epoch_secs = Self::now_epoch_secs(); let now_epoch_secs = Self::now_epoch_secs();
Arc::new(Self { Arc::new(Self {
registry, routing: Arc::new(RoutingCore {
writers: Arc::new(WritersState::new()), registry,
rr: AtomicU64::new(0), writers: Arc::new(WritersState::new()),
rr: AtomicU64::new(0),
writer_epoch,
preferred_endpoints_by_dc: ArcSwap::from_pointee(preferred_endpoints_by_dc),
}),
decision, decision,
upstream, upstream,
rng, rng,
@ -644,7 +660,6 @@ impl MePool {
nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())), nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())),
nat_reflection_singleflight_v4: Arc::new(Mutex::new(())), nat_reflection_singleflight_v4: Arc::new(Mutex::new(())),
nat_reflection_singleflight_v6: Arc::new(Mutex::new(())), nat_reflection_singleflight_v6: Arc::new(Mutex::new(())),
writer_epoch,
refill_inflight: Arc::new(Mutex::new(HashSet::new())), refill_inflight: Arc::new(Mutex::new(HashSet::new())),
refill_inflight_dc: Arc::new(Mutex::new(HashSet::new())), refill_inflight_dc: Arc::new(Mutex::new(HashSet::new())),
conn_count: AtomicUsize::new(0), conn_count: AtomicUsize::new(0),
@ -731,7 +746,6 @@ impl MePool {
me_last_drain_gate_block_reason: AtomicU8::new(MeDrainGateReason::Open as u8), me_last_drain_gate_block_reason: AtomicU8::new(MeDrainGateReason::Open as u8),
me_last_drain_gate_updated_at_epoch_secs: AtomicU64::new(now_epoch_secs), me_last_drain_gate_updated_at_epoch_secs: AtomicU64::new(now_epoch_secs),
runtime_ready: AtomicBool::new(false), runtime_ready: AtomicBool::new(false),
preferred_endpoints_by_dc: ArcSwap::from_pointee(preferred_endpoints_by_dc),
}) })
} }

View File

@ -596,7 +596,8 @@ impl MePool {
let _ = self.registry.writer_lost(writer_id).await; let _ = self.registry.writer_lost(writer_id).await;
self.rtt_stats.lock().await.remove(&writer_id); self.rtt_stats.lock().await.remove(&writer_id);
if let Some(tx) = close_tx { if let Some(tx) = close_tx {
let _ = tx.send(WriterCommand::Close).await; // Keep teardown critical path non-blocking: close is best-effort only.
let _ = tx.try_send(WriterCommand::Close);
} }
if let Some(addr) = removed_addr { if let Some(addr) = removed_addr {
if let Some(uptime) = removed_uptime { if let Some(uptime) = removed_uptime {