mirror of
https://github.com/telemt/telemt.git
synced 2026-05-23 04:01:44 +03:00
Compare commits
6 Commits
4d9e835fa2
...
flow
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dc8951eae8 | ||
|
|
77a7f89075 | ||
|
|
9abaf9006c | ||
|
|
231f04a810 | ||
|
|
b32daf79bc | ||
|
|
f668759c05 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2791,7 +2791,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "telemt"
|
name = "telemt"
|
||||||
version = "3.4.11"
|
version = "3.4.12"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aes",
|
"aes",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "telemt"
|
name = "telemt"
|
||||||
version = "3.4.11"
|
version = "3.4.12"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
use dashmap::DashMap;
|
||||||
|
|
||||||
mod read;
|
mod read;
|
||||||
|
|
||||||
@@ -10,10 +11,10 @@ pub(crate) use self::read::{
|
|||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub(crate) struct RelayIdleCandidateRegistry {
|
pub(crate) struct RelayIdleCandidateRegistry {
|
||||||
pub(in crate::proxy::middle_relay) by_conn_id: HashMap<u64, RelayIdleCandidateMeta>,
|
pub(in crate::proxy::middle_relay) by_conn_id: DashMap<u64, RelayIdleCandidateMeta>,
|
||||||
pub(in crate::proxy::middle_relay) ordered: BTreeSet<(u64, u64)>,
|
pub(in crate::proxy::middle_relay) ordered: parking_lot::Mutex<BTreeSet<(u64, u64)>>,
|
||||||
pressure_event_seq: u64,
|
pressure_event_seq: AtomicU64,
|
||||||
pressure_consumed_seq: u64,
|
pressure_consumed_seq: AtomicU64,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Queue metadata used to preserve FIFO ordering for idle relay eviction.
|
/// Queue metadata used to preserve FIFO ordering for idle relay eviction.
|
||||||
@@ -23,25 +24,10 @@ pub(in crate::proxy::middle_relay) struct RelayIdleCandidateMeta {
|
|||||||
pub(in crate::proxy::middle_relay) mark_pressure_seq: u64,
|
pub(in crate::proxy::middle_relay) mark_pressure_seq: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn relay_idle_candidate_registry_lock_in(
|
|
||||||
shared: &ProxySharedState,
|
|
||||||
) -> std::sync::MutexGuard<'_, RelayIdleCandidateRegistry> {
|
|
||||||
let registry = &shared.middle_relay.relay_idle_registry;
|
|
||||||
match registry.lock() {
|
|
||||||
Ok(guard) => guard,
|
|
||||||
Err(poisoned) => {
|
|
||||||
let mut guard = poisoned.into_inner();
|
|
||||||
*guard = RelayIdleCandidateRegistry::default();
|
|
||||||
registry.clear_poison();
|
|
||||||
guard
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn mark_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u64) -> bool {
|
pub(super) fn mark_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u64) -> bool {
|
||||||
let mut guard = relay_idle_candidate_registry_lock_in(shared);
|
let registry = &shared.middle_relay.relay_idle_registry;
|
||||||
|
|
||||||
if guard.by_conn_id.contains_key(&conn_id) {
|
if registry.by_conn_id.contains_key(&conn_id) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -52,24 +38,35 @@ pub(super) fn mark_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u
|
|||||||
.saturating_add(1);
|
.saturating_add(1);
|
||||||
let meta = RelayIdleCandidateMeta {
|
let meta = RelayIdleCandidateMeta {
|
||||||
mark_order_seq,
|
mark_order_seq,
|
||||||
mark_pressure_seq: guard.pressure_event_seq,
|
mark_pressure_seq: registry.pressure_event_seq.load(Ordering::Relaxed),
|
||||||
};
|
};
|
||||||
guard.by_conn_id.insert(conn_id, meta);
|
match registry.by_conn_id.entry(conn_id) {
|
||||||
guard.ordered.insert((meta.mark_order_seq, conn_id));
|
dashmap::mapref::entry::Entry::Occupied(_) => false,
|
||||||
|
dashmap::mapref::entry::Entry::Vacant(entry) => {
|
||||||
|
entry.insert(meta);
|
||||||
|
registry.ordered.lock().insert((meta.mark_order_seq, conn_id));
|
||||||
true
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn clear_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u64) {
|
pub(super) fn clear_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u64) {
|
||||||
let mut guard = relay_idle_candidate_registry_lock_in(shared);
|
let registry = &shared.middle_relay.relay_idle_registry;
|
||||||
|
|
||||||
if let Some(meta) = guard.by_conn_id.remove(&conn_id) {
|
if let Some((_, meta)) = registry.by_conn_id.remove(&conn_id) {
|
||||||
guard.ordered.remove(&(meta.mark_order_seq, conn_id));
|
registry
|
||||||
|
.ordered
|
||||||
|
.lock()
|
||||||
|
.remove(&(meta.mark_order_seq, conn_id));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn note_relay_pressure_event_in(shared: &ProxySharedState) {
|
pub(super) fn note_relay_pressure_event_in(shared: &ProxySharedState) {
|
||||||
let mut guard = relay_idle_candidate_registry_lock_in(shared);
|
shared
|
||||||
guard.pressure_event_seq = guard.pressure_event_seq.wrapping_add(1);
|
.middle_relay
|
||||||
|
.relay_idle_registry
|
||||||
|
.pressure_event_seq
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn note_global_relay_pressure(shared: &ProxySharedState) {
|
pub(crate) fn note_global_relay_pressure(shared: &ProxySharedState) {
|
||||||
@@ -77,8 +74,11 @@ pub(crate) fn note_global_relay_pressure(shared: &ProxySharedState) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn relay_pressure_event_seq_in(shared: &ProxySharedState) -> u64 {
|
pub(super) fn relay_pressure_event_seq_in(shared: &ProxySharedState) -> u64 {
|
||||||
let guard = relay_idle_candidate_registry_lock_in(shared);
|
shared
|
||||||
guard.pressure_event_seq
|
.middle_relay
|
||||||
|
.relay_idle_registry
|
||||||
|
.pressure_event_seq
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn maybe_evict_idle_candidate_on_pressure_in(
|
pub(super) fn maybe_evict_idle_candidate_on_pressure_in(
|
||||||
@@ -87,33 +87,43 @@ pub(super) fn maybe_evict_idle_candidate_on_pressure_in(
|
|||||||
seen_pressure_seq: &mut u64,
|
seen_pressure_seq: &mut u64,
|
||||||
stats: &Stats,
|
stats: &Stats,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
let mut guard = relay_idle_candidate_registry_lock_in(shared);
|
let registry = &shared.middle_relay.relay_idle_registry;
|
||||||
|
|
||||||
let latest_pressure_seq = guard.pressure_event_seq;
|
let latest_pressure_seq = registry.pressure_event_seq.load(Ordering::Relaxed);
|
||||||
if latest_pressure_seq == *seen_pressure_seq {
|
if latest_pressure_seq == *seen_pressure_seq {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
*seen_pressure_seq = latest_pressure_seq;
|
*seen_pressure_seq = latest_pressure_seq;
|
||||||
|
|
||||||
if latest_pressure_seq == guard.pressure_consumed_seq {
|
if latest_pressure_seq == registry.pressure_consumed_seq.load(Ordering::Relaxed) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if guard.ordered.is_empty() {
|
let oldest = {
|
||||||
guard.pressure_consumed_seq = latest_pressure_seq;
|
let mut ordered = registry.ordered.lock();
|
||||||
|
loop {
|
||||||
|
let Some((mark_order_seq, candidate_conn_id)) = ordered.iter().next().copied() else {
|
||||||
|
registry
|
||||||
|
.pressure_consumed_seq
|
||||||
|
.store(latest_pressure_seq, Ordering::Relaxed);
|
||||||
return false;
|
return false;
|
||||||
|
};
|
||||||
|
let Some(candidate_meta) = registry.by_conn_id.get(&candidate_conn_id) else {
|
||||||
|
ordered.remove(&(mark_order_seq, candidate_conn_id));
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
if candidate_meta.mark_order_seq != mark_order_seq {
|
||||||
|
ordered.remove(&(mark_order_seq, candidate_conn_id));
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
break Some(candidate_conn_id);
|
||||||
let oldest = guard
|
}
|
||||||
.ordered
|
};
|
||||||
.iter()
|
|
||||||
.next()
|
|
||||||
.map(|(_, candidate_conn_id)| *candidate_conn_id);
|
|
||||||
if oldest != Some(conn_id) {
|
if oldest != Some(conn_id) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
let Some(candidate_meta) = guard.by_conn_id.get(&conn_id).copied() else {
|
let Some(candidate_meta) = registry.by_conn_id.get(&conn_id).map(|entry| *entry.value()) else {
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -121,10 +131,15 @@ pub(super) fn maybe_evict_idle_candidate_on_pressure_in(
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(meta) = guard.by_conn_id.remove(&conn_id) {
|
if let Some((_, meta)) = registry.by_conn_id.remove(&conn_id) {
|
||||||
guard.ordered.remove(&(meta.mark_order_seq, conn_id));
|
registry
|
||||||
|
.ordered
|
||||||
|
.lock()
|
||||||
|
.remove(&(meta.mark_order_seq, conn_id));
|
||||||
}
|
}
|
||||||
guard.pressure_consumed_seq = latest_pressure_seq;
|
registry
|
||||||
|
.pressure_consumed_seq
|
||||||
|
.store(latest_pressure_seq, Ordering::Relaxed);
|
||||||
stats.increment_relay_pressure_evict_total();
|
stats.increment_relay_pressure_evict_total();
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
@@ -220,72 +235,32 @@ pub(crate) fn mark_relay_idle_candidate_for_testing(
|
|||||||
shared: &ProxySharedState,
|
shared: &ProxySharedState,
|
||||||
conn_id: u64,
|
conn_id: u64,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
let registry = &shared.middle_relay.relay_idle_registry;
|
mark_relay_idle_candidate_in(shared, conn_id)
|
||||||
let mut guard = match registry.lock() {
|
|
||||||
Ok(guard) => guard,
|
|
||||||
Err(poisoned) => {
|
|
||||||
let mut guard = poisoned.into_inner();
|
|
||||||
*guard = RelayIdleCandidateRegistry::default();
|
|
||||||
registry.clear_poison();
|
|
||||||
guard
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if guard.by_conn_id.contains_key(&conn_id) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mark_order_seq = shared
|
|
||||||
.middle_relay
|
|
||||||
.relay_idle_mark_seq
|
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
|
||||||
let mark_pressure_seq = guard.pressure_event_seq;
|
|
||||||
let meta = RelayIdleCandidateMeta {
|
|
||||||
mark_order_seq,
|
|
||||||
mark_pressure_seq,
|
|
||||||
};
|
|
||||||
guard.by_conn_id.insert(conn_id, meta);
|
|
||||||
guard.ordered.insert((mark_order_seq, conn_id));
|
|
||||||
true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) fn oldest_relay_idle_candidate_for_testing(shared: &ProxySharedState) -> Option<u64> {
|
pub(crate) fn oldest_relay_idle_candidate_for_testing(shared: &ProxySharedState) -> Option<u64> {
|
||||||
let registry = &shared.middle_relay.relay_idle_registry;
|
let registry = &shared.middle_relay.relay_idle_registry;
|
||||||
let guard = match registry.lock() {
|
registry
|
||||||
Ok(guard) => guard,
|
.ordered
|
||||||
Err(poisoned) => {
|
.lock()
|
||||||
let mut guard = poisoned.into_inner();
|
.iter()
|
||||||
*guard = RelayIdleCandidateRegistry::default();
|
.next()
|
||||||
registry.clear_poison();
|
.map(|(_, conn_id)| *conn_id)
|
||||||
guard
|
|
||||||
}
|
|
||||||
};
|
|
||||||
guard.ordered.iter().next().map(|(_, conn_id)| *conn_id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) fn clear_relay_idle_candidate_for_testing(shared: &ProxySharedState, conn_id: u64) {
|
pub(crate) fn clear_relay_idle_candidate_for_testing(shared: &ProxySharedState, conn_id: u64) {
|
||||||
let registry = &shared.middle_relay.relay_idle_registry;
|
clear_relay_idle_candidate_in(shared, conn_id);
|
||||||
let mut guard = match registry.lock() {
|
|
||||||
Ok(guard) => guard,
|
|
||||||
Err(poisoned) => {
|
|
||||||
let mut guard = poisoned.into_inner();
|
|
||||||
*guard = RelayIdleCandidateRegistry::default();
|
|
||||||
registry.clear_poison();
|
|
||||||
guard
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if let Some(meta) = guard.by_conn_id.remove(&conn_id) {
|
|
||||||
guard.ordered.remove(&(meta.mark_order_seq, conn_id));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) fn clear_relay_idle_pressure_state_for_testing_in_shared(shared: &ProxySharedState) {
|
pub(crate) fn clear_relay_idle_pressure_state_for_testing_in_shared(shared: &ProxySharedState) {
|
||||||
if let Ok(mut guard) = shared.middle_relay.relay_idle_registry.lock() {
|
let registry = &shared.middle_relay.relay_idle_registry;
|
||||||
*guard = RelayIdleCandidateRegistry::default();
|
registry.by_conn_id.clear();
|
||||||
}
|
registry.ordered.lock().clear();
|
||||||
|
registry.pressure_event_seq.store(0, Ordering::Relaxed);
|
||||||
|
registry.pressure_consumed_seq.store(0, Ordering::Relaxed);
|
||||||
shared
|
shared
|
||||||
.middle_relay
|
.middle_relay
|
||||||
.relay_idle_mark_seq
|
.relay_idle_mark_seq
|
||||||
@@ -327,15 +302,10 @@ pub(crate) fn set_relay_pressure_state_for_testing(
|
|||||||
pressure_consumed_seq: u64,
|
pressure_consumed_seq: u64,
|
||||||
) {
|
) {
|
||||||
let registry = &shared.middle_relay.relay_idle_registry;
|
let registry = &shared.middle_relay.relay_idle_registry;
|
||||||
let mut guard = match registry.lock() {
|
registry
|
||||||
Ok(guard) => guard,
|
.pressure_event_seq
|
||||||
Err(poisoned) => {
|
.store(pressure_event_seq, Ordering::Relaxed);
|
||||||
let mut guard = poisoned.into_inner();
|
registry
|
||||||
*guard = RelayIdleCandidateRegistry::default();
|
.pressure_consumed_seq
|
||||||
registry.clear_poison();
|
.store(pressure_consumed_seq, Ordering::Relaxed);
|
||||||
guard
|
|
||||||
}
|
|
||||||
};
|
|
||||||
guard.pressure_event_seq = pressure_event_seq;
|
|
||||||
guard.pressure_consumed_seq = pressure_consumed_seq;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,11 +41,12 @@ pub(super) async fn reserve_user_quota_with_yield(
|
|||||||
return Err(MiddleQuotaReserveError::DeadlineExceeded);
|
return Err(MiddleQuotaReserveError::DeadlineExceeded);
|
||||||
}
|
}
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}
|
biased;
|
||||||
_ = cancel.cancelled() => {
|
_ = cancel.cancelled() => {
|
||||||
stats.increment_quota_acquire_cancelled_total();
|
stats.increment_quota_acquire_cancelled_total();
|
||||||
return Err(MiddleQuotaReserveError::Cancelled);
|
return Err(MiddleQuotaReserveError::Cancelled);
|
||||||
}
|
}
|
||||||
|
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}
|
||||||
}
|
}
|
||||||
backoff_rounds = backoff_rounds.saturating_add(1);
|
backoff_rounds = backoff_rounds.saturating_add(1);
|
||||||
if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS {
|
if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS {
|
||||||
@@ -128,11 +129,12 @@ pub(super) async fn wait_for_traffic_budget_or_cancel(
|
|||||||
return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded);
|
return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded);
|
||||||
}
|
}
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = tokio::time::sleep(next_refill_delay()) => {}
|
biased;
|
||||||
_ = cancel.cancelled() => {
|
_ = cancel.cancelled() => {
|
||||||
stats.increment_flow_wait_middle_rate_limit_cancelled_total();
|
stats.increment_flow_wait_middle_rate_limit_cancelled_total();
|
||||||
return Err(ProxyError::TrafficBudgetWaitCancelled);
|
return Err(ProxyError::TrafficBudgetWaitCancelled);
|
||||||
}
|
}
|
||||||
|
_ = tokio::time::sleep(next_refill_delay()) => {}
|
||||||
}
|
}
|
||||||
let wait_ms = wait_started_at
|
let wait_ms = wait_started_at
|
||||||
.elapsed()
|
.elapsed()
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ pub(crate) struct MiddleRelaySharedState {
|
|||||||
pub(crate) desync_hasher: RandomState,
|
pub(crate) desync_hasher: RandomState,
|
||||||
pub(crate) desync_full_cache_last_emit_at: Mutex<Option<Instant>>,
|
pub(crate) desync_full_cache_last_emit_at: Mutex<Option<Instant>>,
|
||||||
pub(crate) desync_dedup_rotation_state: Mutex<DesyncDedupRotationState>,
|
pub(crate) desync_dedup_rotation_state: Mutex<DesyncDedupRotationState>,
|
||||||
pub(crate) relay_idle_registry: Mutex<RelayIdleCandidateRegistry>,
|
pub(crate) relay_idle_registry: RelayIdleCandidateRegistry,
|
||||||
pub(crate) relay_idle_mark_seq: AtomicU64,
|
pub(crate) relay_idle_mark_seq: AtomicU64,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -97,7 +97,7 @@ impl ProxySharedState {
|
|||||||
desync_hasher: RandomState::new(),
|
desync_hasher: RandomState::new(),
|
||||||
desync_full_cache_last_emit_at: Mutex::new(None),
|
desync_full_cache_last_emit_at: Mutex::new(None),
|
||||||
desync_dedup_rotation_state: Mutex::new(DesyncDedupRotationState::default()),
|
desync_dedup_rotation_state: Mutex::new(DesyncDedupRotationState::default()),
|
||||||
relay_idle_registry: Mutex::new(RelayIdleCandidateRegistry::default()),
|
relay_idle_registry: RelayIdleCandidateRegistry::default(),
|
||||||
relay_idle_mark_seq: AtomicU64::new(0),
|
relay_idle_mark_seq: AtomicU64::new(0),
|
||||||
},
|
},
|
||||||
traffic_limiter: TrafficLimiter::new(),
|
traffic_limiter: TrafficLimiter::new(),
|
||||||
|
|||||||
@@ -1,33 +1,21 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
use std::panic::{AssertUnwindSafe, catch_unwind};
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn blackhat_registry_poison_recovers_with_fail_closed_reset_and_pressure_accounting() {
|
fn blackhat_registry_stale_order_entry_is_skipped_and_pressure_accounting_continues() {
|
||||||
let shared = ProxySharedState::new();
|
let shared = ProxySharedState::new();
|
||||||
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
|
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
|
||||||
|
|
||||||
let _ = catch_unwind(AssertUnwindSafe(|| {
|
shared
|
||||||
let mut guard = shared
|
|
||||||
.middle_relay
|
.middle_relay
|
||||||
.relay_idle_registry
|
.relay_idle_registry
|
||||||
|
.ordered
|
||||||
.lock()
|
.lock()
|
||||||
.expect("registry lock must be acquired before poison");
|
.insert((0, 999));
|
||||||
guard.by_conn_id.insert(
|
|
||||||
999,
|
|
||||||
RelayIdleCandidateMeta {
|
|
||||||
mark_order_seq: 1,
|
|
||||||
mark_pressure_seq: 0,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
guard.ordered.insert((1, 999));
|
|
||||||
panic!("intentional poison for idle-registry recovery");
|
|
||||||
}));
|
|
||||||
|
|
||||||
// Helper lock must recover from poison, reset stale state, and continue.
|
|
||||||
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 42));
|
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 42));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
|
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
|
||||||
Some(42)
|
Some(999)
|
||||||
);
|
);
|
||||||
|
|
||||||
let before = relay_pressure_event_seq_for_testing(shared.as_ref());
|
let before = relay_pressure_event_seq_for_testing(shared.as_ref());
|
||||||
@@ -35,25 +23,43 @@ fn blackhat_registry_poison_recovers_with_fail_closed_reset_and_pressure_account
|
|||||||
let after = relay_pressure_event_seq_for_testing(shared.as_ref());
|
let after = relay_pressure_event_seq_for_testing(shared.as_ref());
|
||||||
assert!(
|
assert!(
|
||||||
after > before,
|
after > before,
|
||||||
"pressure accounting must still advance after poison"
|
"pressure accounting must still advance with stale ordered entries"
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut seen_pressure_seq = before;
|
||||||
|
assert!(maybe_evict_idle_candidate_on_pressure_for_testing(
|
||||||
|
shared.as_ref(),
|
||||||
|
42,
|
||||||
|
&mut seen_pressure_seq,
|
||||||
|
&Stats::new()
|
||||||
|
));
|
||||||
|
assert_eq!(
|
||||||
|
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
|
||||||
|
None
|
||||||
);
|
);
|
||||||
|
|
||||||
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
|
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn clear_state_helper_must_reset_poisoned_registry_for_deterministic_fifo_tests() {
|
fn clear_state_helper_must_reset_split_registry_for_deterministic_fifo_tests() {
|
||||||
let shared = ProxySharedState::new();
|
let shared = ProxySharedState::new();
|
||||||
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
|
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
|
||||||
|
|
||||||
let _ = catch_unwind(AssertUnwindSafe(|| {
|
shared.middle_relay.relay_idle_registry.by_conn_id.insert(
|
||||||
let _guard = shared
|
999,
|
||||||
|
RelayIdleCandidateMeta {
|
||||||
|
mark_order_seq: 1,
|
||||||
|
mark_pressure_seq: 0,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
shared
|
||||||
.middle_relay
|
.middle_relay
|
||||||
.relay_idle_registry
|
.relay_idle_registry
|
||||||
|
.ordered
|
||||||
.lock()
|
.lock()
|
||||||
.expect("registry lock must be acquired before poison");
|
.insert((1, 999));
|
||||||
panic!("intentional poison while lock held");
|
set_relay_pressure_state_for_testing(shared.as_ref(), 7, 6);
|
||||||
}));
|
|
||||||
|
|
||||||
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
|
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
|
||||||
|
|
||||||
|
|||||||
@@ -52,6 +52,8 @@ async fn writer_command_loop(
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
biased;
|
||||||
|
_ = cancel.cancelled() => return Ok(()),
|
||||||
cmd = rx.recv() => {
|
cmd = rx.recv() => {
|
||||||
match cmd {
|
match cmd {
|
||||||
Some(WriterCommand::Data(payload)) => {
|
Some(WriterCommand::Data(payload)) => {
|
||||||
@@ -69,7 +71,6 @@ async fn writer_command_loop(
|
|||||||
Some(WriterCommand::Close) | None => return Ok(()),
|
Some(WriterCommand::Close) | None => return Ok(()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = cancel.cancelled() => return Ok(()),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -108,6 +109,7 @@ async fn ping_loop(
|
|||||||
Duration::from_secs(wait)
|
Duration::from_secs(wait)
|
||||||
};
|
};
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
biased;
|
||||||
_ = cancel_ping_token.cancelled() => return,
|
_ = cancel_ping_token.cancelled() => return,
|
||||||
_ = tokio::time::sleep(startup_jitter) => {}
|
_ = tokio::time::sleep(startup_jitter) => {}
|
||||||
}
|
}
|
||||||
@@ -131,6 +133,7 @@ async fn ping_loop(
|
|||||||
Duration::from_secs(secs)
|
Duration::from_secs(secs)
|
||||||
};
|
};
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
biased;
|
||||||
_ = cancel_ping_token.cancelled() => return,
|
_ = cancel_ping_token.cancelled() => return,
|
||||||
_ = tokio::time::sleep(wait) => {}
|
_ = tokio::time::sleep(wait) => {}
|
||||||
}
|
}
|
||||||
@@ -191,6 +194,7 @@ async fn rpc_proxy_req_signal_loop(
|
|||||||
};
|
};
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
biased;
|
||||||
_ = cancel_signal.cancelled() => return,
|
_ = cancel_signal.cancelled() => return,
|
||||||
_ = tokio::time::sleep(Duration::from_millis(startup_jitter_ms)) => {}
|
_ = tokio::time::sleep(Duration::from_millis(startup_jitter_ms)) => {}
|
||||||
}
|
}
|
||||||
@@ -207,6 +211,7 @@ async fn rpc_proxy_req_signal_loop(
|
|||||||
};
|
};
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
biased;
|
||||||
_ = cancel_signal.cancelled() => return,
|
_ = cancel_signal.cancelled() => return,
|
||||||
_ = tokio::time::sleep(wait) => {}
|
_ = tokio::time::sleep(wait) => {}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -242,6 +242,7 @@ pub(crate) async fn reader_loop(
|
|||||||
let mut raw = enc_leftover;
|
let mut raw = enc_leftover;
|
||||||
let mut expected_seq: i32 = 0;
|
let mut expected_seq: i32 = 0;
|
||||||
let mut data_route_queue_full_streak = HashMap::<u64, u8>::new();
|
let mut data_route_queue_full_streak = HashMap::<u64, u8>::new();
|
||||||
|
let mut tmp = [0u8; 65_536];
|
||||||
let mut fairness = WorkerFairnessState::new(
|
let mut fairness = WorkerFairnessState::new(
|
||||||
WorkerFairnessConfig {
|
WorkerFairnessConfig {
|
||||||
worker_id: (writer_id as u16).saturating_add(1),
|
worker_id: (writer_id as u16).saturating_add(1),
|
||||||
@@ -263,18 +264,18 @@ pub(crate) async fn reader_loop(
|
|||||||
let fairshare_enabled = route_fairshare_enabled.load(Ordering::Relaxed);
|
let fairshare_enabled = route_fairshare_enabled.load(Ordering::Relaxed);
|
||||||
fairness.set_backpressure_enabled(backpressure_enabled);
|
fairness.set_backpressure_enabled(backpressure_enabled);
|
||||||
let fairness_has_backlog = should_schedule_fairness_retry(&fairness_snapshot);
|
let fairness_has_backlog = should_schedule_fairness_retry(&fairness_snapshot);
|
||||||
let mut tmp = [0u8; 65_536];
|
|
||||||
let backlog_retry_enabled = fairness_has_backlog;
|
let backlog_retry_enabled = fairness_has_backlog;
|
||||||
let backlog_retry_delay =
|
let backlog_retry_delay =
|
||||||
fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed));
|
fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed));
|
||||||
let mut retry_only = false;
|
let mut retry_only = false;
|
||||||
let n = tokio::select! {
|
let n = tokio::select! {
|
||||||
|
biased;
|
||||||
|
_ = cancel.cancelled() => return Ok(()),
|
||||||
res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?,
|
res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?,
|
||||||
_ = tokio::time::sleep(backlog_retry_delay), if backlog_retry_enabled => {
|
_ = tokio::time::sleep(backlog_retry_delay), if backlog_retry_enabled => {
|
||||||
retry_only = true;
|
retry_only = true;
|
||||||
0usize
|
0usize
|
||||||
},
|
},
|
||||||
_ = cancel.cancelled() => return Ok(()),
|
|
||||||
};
|
};
|
||||||
if retry_only {
|
if retry_only {
|
||||||
let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
|
let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
|
||||||
|
|||||||
@@ -77,26 +77,24 @@ struct HotBindingTable {
|
|||||||
|
|
||||||
struct BindingState {
|
struct BindingState {
|
||||||
inner: Mutex<BindingInner>,
|
inner: Mutex<BindingInner>,
|
||||||
|
writer_idle_since_epoch_secs: DashMap<u64, u64>,
|
||||||
|
bound_clients_by_writer: DashMap<u64, usize>,
|
||||||
|
active_sessions_by_target_dc: DashMap<i16, usize>,
|
||||||
|
last_meta_for_writer: DashMap<u64, ConnMeta>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct BindingInner {
|
struct BindingInner {
|
||||||
writers: HashMap<u64, mpsc::Sender<WriterCommand>>,
|
|
||||||
writer_for_conn: HashMap<u64, u64>,
|
writer_for_conn: HashMap<u64, u64>,
|
||||||
conns_for_writer: HashMap<u64, HashSet<u64>>,
|
conns_for_writer: HashMap<u64, HashSet<u64>>,
|
||||||
meta: HashMap<u64, ConnMeta>,
|
meta: HashMap<u64, ConnMeta>,
|
||||||
last_meta_for_writer: HashMap<u64, ConnMeta>,
|
|
||||||
writer_idle_since_epoch_secs: HashMap<u64, u64>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BindingInner {
|
impl BindingInner {
|
||||||
fn new() -> Self {
|
fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
writers: HashMap::new(),
|
|
||||||
writer_for_conn: HashMap::new(),
|
writer_for_conn: HashMap::new(),
|
||||||
conns_for_writer: HashMap::new(),
|
conns_for_writer: HashMap::new(),
|
||||||
meta: HashMap::new(),
|
meta: HashMap::new(),
|
||||||
last_meta_for_writer: HashMap::new(),
|
|
||||||
writer_idle_since_epoch_secs: HashMap::new(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -149,6 +147,10 @@ impl ConnRegistry {
|
|||||||
},
|
},
|
||||||
binding: BindingState {
|
binding: BindingState {
|
||||||
inner: Mutex::new(BindingInner::new()),
|
inner: Mutex::new(BindingInner::new()),
|
||||||
|
writer_idle_since_epoch_secs: DashMap::new(),
|
||||||
|
bound_clients_by_writer: DashMap::new(),
|
||||||
|
active_sessions_by_target_dc: DashMap::new(),
|
||||||
|
last_meta_for_writer: DashMap::new(),
|
||||||
},
|
},
|
||||||
next_id: AtomicU64::new(start),
|
next_id: AtomicU64::new(start),
|
||||||
route_channel_capacity,
|
route_channel_capacity,
|
||||||
|
|||||||
@@ -13,13 +13,55 @@ use super::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
impl ConnRegistry {
|
impl ConnRegistry {
|
||||||
|
fn set_writer_bound_count(&self, writer_id: u64, count: usize) {
|
||||||
|
self.binding.bound_clients_by_writer.insert(writer_id, count);
|
||||||
|
if count == 0 {
|
||||||
|
self.binding
|
||||||
|
.writer_idle_since_epoch_secs
|
||||||
|
.entry(writer_id)
|
||||||
|
.or_insert_with(Self::now_epoch_secs);
|
||||||
|
} else {
|
||||||
|
self.binding.writer_idle_since_epoch_secs.remove(&writer_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn adjust_active_target_dc(&self, target_dc: i16, delta: isize) {
|
||||||
|
if target_dc == 0 || delta == 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if delta > 0 {
|
||||||
|
self.binding
|
||||||
|
.active_sessions_by_target_dc
|
||||||
|
.entry(target_dc)
|
||||||
|
.and_modify(|count| *count = count.saturating_add(delta as usize))
|
||||||
|
.or_insert(delta as usize);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let remove =
|
||||||
|
if let Some(mut count) = self.binding.active_sessions_by_target_dc.get_mut(&target_dc) {
|
||||||
|
let decrement = delta.unsigned_abs();
|
||||||
|
*count = count.saturating_sub(decrement);
|
||||||
|
*count == 0
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
if remove {
|
||||||
|
self.binding.active_sessions_by_target_dc.remove(&target_dc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender<WriterCommand>) {
|
pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender<WriterCommand>) {
|
||||||
let mut binding = self.binding.inner.lock().await;
|
let mut binding = self.binding.inner.lock().await;
|
||||||
binding.writers.insert(writer_id, tx.clone());
|
|
||||||
binding
|
binding
|
||||||
.conns_for_writer
|
.conns_for_writer
|
||||||
.entry(writer_id)
|
.entry(writer_id)
|
||||||
.or_insert_with(HashSet::new);
|
.or_insert_with(HashSet::new);
|
||||||
|
self.binding.bound_clients_by_writer.entry(writer_id).or_insert(0);
|
||||||
|
self.binding
|
||||||
|
.writer_idle_since_epoch_secs
|
||||||
|
.entry(writer_id)
|
||||||
|
.or_insert_with(Self::now_epoch_secs);
|
||||||
self.writers.map.insert(writer_id, tx);
|
self.writers.map.insert(writer_id, tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -29,19 +71,18 @@ impl ConnRegistry {
|
|||||||
self.routing.byte_budget.remove(&id);
|
self.routing.byte_budget.remove(&id);
|
||||||
self.hot_binding.map.remove(&id);
|
self.hot_binding.map.remove(&id);
|
||||||
let mut binding = self.binding.inner.lock().await;
|
let mut binding = self.binding.inner.lock().await;
|
||||||
binding.meta.remove(&id);
|
let previous_meta = binding.meta.remove(&id);
|
||||||
if let Some(writer_id) = binding.writer_for_conn.remove(&id) {
|
if let Some(meta) = previous_meta.as_ref() {
|
||||||
let became_empty = if let Some(set) = binding.conns_for_writer.get_mut(&writer_id) {
|
self.adjust_active_target_dc(meta.target_dc, -1);
|
||||||
set.remove(&id);
|
|
||||||
set.is_empty()
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
};
|
|
||||||
if became_empty {
|
|
||||||
binding
|
|
||||||
.writer_idle_since_epoch_secs
|
|
||||||
.insert(writer_id, Self::now_epoch_secs());
|
|
||||||
}
|
}
|
||||||
|
if let Some(writer_id) = binding.writer_for_conn.remove(&id) {
|
||||||
|
let next_count = if let Some(set) = binding.conns_for_writer.get_mut(&writer_id) {
|
||||||
|
set.remove(&id);
|
||||||
|
set.len()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
self.set_writer_bound_count(writer_id, next_count);
|
||||||
return Some(writer_id);
|
return Some(writer_id);
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
@@ -248,7 +289,7 @@ impl ConnRegistry {
|
|||||||
if !self.routing.map.contains_key(&conn_id) {
|
if !self.routing.map.contains_key(&conn_id) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if !binding.writers.contains_key(&writer_id) {
|
if !self.writers.map.contains_key(&writer_id) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -256,28 +297,32 @@ impl ConnRegistry {
|
|||||||
if let Some(previous_writer_id) = previous_writer_id
|
if let Some(previous_writer_id) = previous_writer_id
|
||||||
&& previous_writer_id != writer_id
|
&& previous_writer_id != writer_id
|
||||||
{
|
{
|
||||||
let became_empty =
|
let next_count =
|
||||||
if let Some(set) = binding.conns_for_writer.get_mut(&previous_writer_id) {
|
if let Some(set) = binding.conns_for_writer.get_mut(&previous_writer_id) {
|
||||||
set.remove(&conn_id);
|
set.remove(&conn_id);
|
||||||
set.is_empty()
|
set.len()
|
||||||
} else {
|
} else {
|
||||||
false
|
0
|
||||||
};
|
};
|
||||||
if became_empty {
|
self.set_writer_bound_count(previous_writer_id, next_count);
|
||||||
binding
|
|
||||||
.writer_idle_since_epoch_secs
|
|
||||||
.insert(previous_writer_id, Self::now_epoch_secs());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
binding.meta.insert(conn_id, meta.clone());
|
if let Some(previous_meta) = binding.meta.insert(conn_id, meta.clone()) {
|
||||||
binding.last_meta_for_writer.insert(writer_id, meta.clone());
|
self.adjust_active_target_dc(previous_meta.target_dc, -1);
|
||||||
binding.writer_idle_since_epoch_secs.remove(&writer_id);
|
}
|
||||||
binding
|
self.adjust_active_target_dc(meta.target_dc, 1);
|
||||||
|
self.binding
|
||||||
|
.last_meta_for_writer
|
||||||
|
.insert(writer_id, meta.clone());
|
||||||
|
let next_count = {
|
||||||
|
let set = binding
|
||||||
.conns_for_writer
|
.conns_for_writer
|
||||||
.entry(writer_id)
|
.entry(writer_id)
|
||||||
.or_insert_with(HashSet::new)
|
.or_insert_with(HashSet::new);
|
||||||
.insert(conn_id);
|
set.insert(conn_id);
|
||||||
|
set.len()
|
||||||
|
};
|
||||||
|
self.set_writer_bound_count(writer_id, next_count);
|
||||||
self.hot_binding
|
self.hot_binding
|
||||||
.map
|
.map
|
||||||
.insert(conn_id, HotConnBinding { writer_id, meta });
|
.insert(conn_id, HotConnBinding { writer_id, meta });
|
||||||
@@ -290,27 +335,38 @@ impl ConnRegistry {
|
|||||||
.conns_for_writer
|
.conns_for_writer
|
||||||
.entry(writer_id)
|
.entry(writer_id)
|
||||||
.or_insert_with(HashSet::new);
|
.or_insert_with(HashSet::new);
|
||||||
binding
|
let count = binding
|
||||||
.writer_idle_since_epoch_secs
|
.conns_for_writer
|
||||||
.entry(writer_id)
|
.get(&writer_id)
|
||||||
.or_insert(Self::now_epoch_secs());
|
.map(|set| set.len())
|
||||||
|
.unwrap_or(0);
|
||||||
|
self.set_writer_bound_count(writer_id, count);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_last_writer_meta(&self, writer_id: u64) -> Option<ConnMeta> {
|
pub async fn get_last_writer_meta(&self, writer_id: u64) -> Option<ConnMeta> {
|
||||||
let binding = self.binding.inner.lock().await;
|
self.binding
|
||||||
binding.last_meta_for_writer.get(&writer_id).cloned()
|
.last_meta_for_writer
|
||||||
|
.get(&writer_id)
|
||||||
|
.map(|entry| entry.value().clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn writer_idle_since_snapshot(&self) -> HashMap<u64, u64> {
|
pub async fn writer_idle_since_snapshot(&self) -> HashMap<u64, u64> {
|
||||||
let binding = self.binding.inner.lock().await;
|
self.binding
|
||||||
binding.writer_idle_since_epoch_secs.clone()
|
.writer_idle_since_epoch_secs
|
||||||
|
.iter()
|
||||||
|
.map(|entry| (*entry.key(), *entry.value()))
|
||||||
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn writer_idle_since_for_writer_ids(&self, writer_ids: &[u64]) -> HashMap<u64, u64> {
|
pub async fn writer_idle_since_for_writer_ids(&self, writer_ids: &[u64]) -> HashMap<u64, u64> {
|
||||||
let binding = self.binding.inner.lock().await;
|
|
||||||
let mut out = HashMap::<u64, u64>::with_capacity(writer_ids.len());
|
let mut out = HashMap::<u64, u64>::with_capacity(writer_ids.len());
|
||||||
for writer_id in writer_ids {
|
for writer_id in writer_ids {
|
||||||
if let Some(idle_since) = binding.writer_idle_since_epoch_secs.get(writer_id).copied() {
|
if let Some(idle_since) = self
|
||||||
|
.binding
|
||||||
|
.writer_idle_since_epoch_secs
|
||||||
|
.get(writer_id)
|
||||||
|
.map(|entry| *entry.value())
|
||||||
|
{
|
||||||
out.insert(*writer_id, idle_since);
|
out.insert(*writer_id, idle_since);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -320,25 +376,19 @@ impl ConnRegistry {
|
|||||||
pub(in crate::transport::middle_proxy) async fn writer_activity_snapshot(
|
pub(in crate::transport::middle_proxy) async fn writer_activity_snapshot(
|
||||||
&self,
|
&self,
|
||||||
) -> WriterActivitySnapshot {
|
) -> WriterActivitySnapshot {
|
||||||
let binding = self.binding.inner.lock().await;
|
|
||||||
let mut bound_clients_by_writer = HashMap::<u64, usize>::new();
|
|
||||||
let mut active_sessions_by_target_dc = HashMap::<i16, usize>::new();
|
|
||||||
|
|
||||||
for (writer_id, conn_ids) in &binding.conns_for_writer {
|
|
||||||
bound_clients_by_writer.insert(*writer_id, conn_ids.len());
|
|
||||||
}
|
|
||||||
for conn_meta in binding.meta.values() {
|
|
||||||
if conn_meta.target_dc == 0 {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
*active_sessions_by_target_dc
|
|
||||||
.entry(conn_meta.target_dc)
|
|
||||||
.or_insert(0) += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
WriterActivitySnapshot {
|
WriterActivitySnapshot {
|
||||||
bound_clients_by_writer,
|
bound_clients_by_writer: self
|
||||||
active_sessions_by_target_dc,
|
.binding
|
||||||
|
.bound_clients_by_writer
|
||||||
|
.iter()
|
||||||
|
.map(|entry| (*entry.key(), *entry.value()))
|
||||||
|
.collect(),
|
||||||
|
active_sessions_by_target_dc: self
|
||||||
|
.binding
|
||||||
|
.active_sessions_by_target_dc
|
||||||
|
.iter()
|
||||||
|
.map(|entry| (*entry.key(), *entry.value()))
|
||||||
|
.collect(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -393,10 +443,10 @@ impl ConnRegistry {
|
|||||||
|
|
||||||
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
|
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
|
||||||
let mut binding = self.binding.inner.lock().await;
|
let mut binding = self.binding.inner.lock().await;
|
||||||
binding.writers.remove(&writer_id);
|
|
||||||
self.writers.map.remove(&writer_id);
|
self.writers.map.remove(&writer_id);
|
||||||
binding.last_meta_for_writer.remove(&writer_id);
|
self.binding.last_meta_for_writer.remove(&writer_id);
|
||||||
binding.writer_idle_since_epoch_secs.remove(&writer_id);
|
self.binding.writer_idle_since_epoch_secs.remove(&writer_id);
|
||||||
|
self.binding.bound_clients_by_writer.remove(&writer_id);
|
||||||
let conns = binding
|
let conns = binding
|
||||||
.conns_for_writer
|
.conns_for_writer
|
||||||
.remove(&writer_id)
|
.remove(&writer_id)
|
||||||
@@ -410,6 +460,10 @@ impl ConnRegistry {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
binding.writer_for_conn.remove(&conn_id);
|
binding.writer_for_conn.remove(&conn_id);
|
||||||
|
let meta = binding.meta.remove(&conn_id);
|
||||||
|
if let Some(meta) = meta.as_ref() {
|
||||||
|
self.adjust_active_target_dc(meta.target_dc, -1);
|
||||||
|
}
|
||||||
let remove_hot = self
|
let remove_hot = self
|
||||||
.hot_binding
|
.hot_binding
|
||||||
.map
|
.map
|
||||||
@@ -419,10 +473,10 @@ impl ConnRegistry {
|
|||||||
if remove_hot {
|
if remove_hot {
|
||||||
self.hot_binding.map.remove(&conn_id);
|
self.hot_binding.map.remove(&conn_id);
|
||||||
}
|
}
|
||||||
if let Some(m) = binding.meta.get(&conn_id) {
|
if let Some(m) = meta {
|
||||||
out.push(BoundConn {
|
out.push(BoundConn {
|
||||||
conn_id,
|
conn_id,
|
||||||
meta: m.clone(),
|
meta: m,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -438,11 +492,10 @@ impl ConnRegistry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn is_writer_empty(&self, writer_id: u64) -> bool {
|
pub async fn is_writer_empty(&self, writer_id: u64) -> bool {
|
||||||
let binding = self.binding.inner.lock().await;
|
self.binding
|
||||||
binding
|
.bound_clients_by_writer
|
||||||
.conns_for_writer
|
|
||||||
.get(&writer_id)
|
.get(&writer_id)
|
||||||
.map(|s| s.is_empty())
|
.map(|count| *count.value() == 0)
|
||||||
.unwrap_or(true)
|
.unwrap_or(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -457,21 +510,20 @@ impl ConnRegistry {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
binding.writers.remove(&writer_id);
|
|
||||||
self.writers.map.remove(&writer_id);
|
self.writers.map.remove(&writer_id);
|
||||||
binding.last_meta_for_writer.remove(&writer_id);
|
self.binding.last_meta_for_writer.remove(&writer_id);
|
||||||
binding.writer_idle_since_epoch_secs.remove(&writer_id);
|
self.binding.writer_idle_since_epoch_secs.remove(&writer_id);
|
||||||
|
self.binding.bound_clients_by_writer.remove(&writer_id);
|
||||||
binding.conns_for_writer.remove(&writer_id);
|
binding.conns_for_writer.remove(&writer_id);
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub(super) async fn non_empty_writer_ids(&self, writer_ids: &[u64]) -> HashSet<u64> {
|
pub(super) async fn non_empty_writer_ids(&self, writer_ids: &[u64]) -> HashSet<u64> {
|
||||||
let binding = self.binding.inner.lock().await;
|
|
||||||
let mut out = HashSet::<u64>::with_capacity(writer_ids.len());
|
let mut out = HashSet::<u64>::with_capacity(writer_ids.len());
|
||||||
for writer_id in writer_ids {
|
for writer_id in writer_ids {
|
||||||
if let Some(conns) = binding.conns_for_writer.get(writer_id)
|
if let Some(count) = self.binding.bound_clients_by_writer.get(writer_id)
|
||||||
&& !conns.is_empty()
|
&& *count.value() > 0
|
||||||
{
|
{
|
||||||
out.insert(*writer_id);
|
out.insert(*writer_id);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ use super::registry::ConnMeta;
|
|||||||
use super::wire::build_proxy_req_payload;
|
use super::wire::build_proxy_req_payload;
|
||||||
use crate::config::{MeRouteNoWriterMode, MeWriterPickMode};
|
use crate::config::{MeRouteNoWriterMode, MeWriterPickMode};
|
||||||
use crate::error::{ProxyError, Result};
|
use crate::error::{ProxyError, Result};
|
||||||
use crate::network::IpFamily;
|
|
||||||
use crate::stream::PooledBuffer;
|
use crate::stream::PooledBuffer;
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
|
|
||||||
@@ -124,9 +123,8 @@ impl MePool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut writers_snapshot = {
|
let mut writers_snapshot = {
|
||||||
let ws = self.writers.read().await;
|
let ws = self.writers.snapshot();
|
||||||
if ws.is_empty() {
|
if ws.is_empty() {
|
||||||
drop(ws);
|
|
||||||
match no_writer_mode {
|
match no_writer_mode {
|
||||||
MeRouteNoWriterMode::AsyncRecoveryFailfast => {
|
MeRouteNoWriterMode::AsyncRecoveryFailfast => {
|
||||||
let deadline = *no_writer_deadline.get_or_insert_with(|| {
|
let deadline = *no_writer_deadline.get_or_insert_with(|| {
|
||||||
@@ -154,38 +152,32 @@ impl MePool {
|
|||||||
for _ in
|
for _ in
|
||||||
0..self.route_runtime.me_route_inline_recovery_attempts.max(1)
|
0..self.route_runtime.me_route_inline_recovery_attempts.max(1)
|
||||||
{
|
{
|
||||||
for family in self.family_order() {
|
let preferred = self.preferred_endpoints_by_dc.load_full();
|
||||||
let map = match family {
|
for (dc, addrs) in preferred.iter() {
|
||||||
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
|
for addr in addrs {
|
||||||
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
|
|
||||||
};
|
|
||||||
for (dc, addrs) in &map {
|
|
||||||
for (ip, port) in addrs {
|
|
||||||
let addr = SocketAddr::new(*ip, *port);
|
|
||||||
let _ = self
|
let _ = self
|
||||||
.connect_one_for_dc(
|
.connect_one_for_dc(
|
||||||
addr,
|
*addr,
|
||||||
*dc,
|
*dc,
|
||||||
self.rng.as_ref(),
|
self.rng.as_ref(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
if !self.writers.snapshot().is_empty() {
|
||||||
if !self.writers.read().await.is_empty() {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !self.writers.read().await.is_empty() {
|
if !self.writers.snapshot().is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let deadline = *no_writer_deadline.get_or_insert_with(|| {
|
let deadline = *no_writer_deadline.get_or_insert_with(|| {
|
||||||
Instant::now() + self.route_runtime.me_route_inline_recovery_wait
|
Instant::now() + self.route_runtime.me_route_inline_recovery_wait
|
||||||
});
|
});
|
||||||
if !self.wait_for_writer_until(deadline).await {
|
if !self.wait_for_writer_until(deadline).await {
|
||||||
if !self.writers.read().await.is_empty() {
|
if !self.writers.snapshot().is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
self.stats.increment_me_no_writer_failfast_total();
|
self.stats.increment_me_no_writer_failfast_total();
|
||||||
@@ -222,7 +214,7 @@ impl MePool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ws.clone()
|
ws
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut candidate_indices = self
|
let mut candidate_indices = self
|
||||||
@@ -285,7 +277,12 @@ impl MePool {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
emergency_attempts += 1;
|
emergency_attempts += 1;
|
||||||
let mut endpoints = self.endpoint_candidates_for_target_dc(routed_dc).await;
|
let mut endpoints = self
|
||||||
|
.preferred_endpoints_by_dc
|
||||||
|
.load()
|
||||||
|
.get(&routed_dc)
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_default();
|
||||||
endpoints.shuffle(&mut rand::rng());
|
endpoints.shuffle(&mut rand::rng());
|
||||||
for addr in endpoints {
|
for addr in endpoints {
|
||||||
if self
|
if self
|
||||||
@@ -298,9 +295,7 @@ impl MePool {
|
|||||||
}
|
}
|
||||||
tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64))
|
tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64))
|
||||||
.await;
|
.await;
|
||||||
let ws2 = self.writers.read().await;
|
writers_snapshot = self.writers.snapshot();
|
||||||
writers_snapshot = ws2.clone();
|
|
||||||
drop(ws2);
|
|
||||||
candidate_indices = self
|
candidate_indices = self
|
||||||
.candidate_indices_for_dc(&writers_snapshot, routed_dc, false)
|
.candidate_indices_for_dc(&writers_snapshot, routed_dc, false)
|
||||||
.await;
|
.await;
|
||||||
|
|||||||
@@ -1,13 +1,9 @@
|
|||||||
use std::collections::HashSet;
|
|
||||||
use std::net::SocketAddr;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use tracing::warn;
|
use tracing::warn;
|
||||||
|
|
||||||
use crate::network::IpFamily;
|
|
||||||
|
|
||||||
use super::super::MePool;
|
use super::super::MePool;
|
||||||
use super::{
|
use super::{
|
||||||
HYBRID_GLOBAL_BURST_PERIOD_ROUNDS, HYBRID_RECENT_SUCCESS_WINDOW_MS,
|
HYBRID_GLOBAL_BURST_PERIOD_ROUNDS, HYBRID_RECENT_SUCCESS_WINDOW_MS,
|
||||||
@@ -17,18 +13,18 @@ use super::{
|
|||||||
impl MePool {
|
impl MePool {
|
||||||
pub(super) async fn wait_for_writer_until(&self, deadline: Instant) -> bool {
|
pub(super) async fn wait_for_writer_until(&self, deadline: Instant) -> bool {
|
||||||
let mut rx = self.writer_epoch.subscribe();
|
let mut rx = self.writer_epoch.subscribe();
|
||||||
if !self.writers.read().await.is_empty() {
|
if !self.writers.snapshot().is_empty() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
if now >= deadline {
|
if now >= deadline {
|
||||||
return !self.writers.read().await.is_empty();
|
return !self.writers.snapshot().is_empty();
|
||||||
}
|
}
|
||||||
let timeout = deadline.saturating_duration_since(now);
|
let timeout = deadline.saturating_duration_since(now);
|
||||||
if tokio::time::timeout(timeout, rx.changed()).await.is_ok() {
|
if tokio::time::timeout(timeout, rx.changed()).await.is_ok() {
|
||||||
return !self.writers.read().await.is_empty();
|
return !self.writers.snapshot().is_empty();
|
||||||
}
|
}
|
||||||
!self.writers.read().await.is_empty()
|
!self.writers.snapshot().is_empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn wait_for_candidate_until(&self, routed_dc: i32, deadline: Instant) -> bool {
|
pub(super) async fn wait_for_candidate_until(&self, routed_dc: i32, deadline: Instant) -> bool {
|
||||||
@@ -58,11 +54,11 @@ impl MePool {
|
|||||||
|
|
||||||
pub(super) async fn has_candidate_for_target_dc(&self, routed_dc: i32) -> bool {
|
pub(super) async fn has_candidate_for_target_dc(&self, routed_dc: i32) -> bool {
|
||||||
let writers_snapshot = {
|
let writers_snapshot = {
|
||||||
let ws = self.writers.read().await;
|
let ws = self.writers.snapshot();
|
||||||
if ws.is_empty() {
|
if ws.is_empty() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
ws.clone()
|
ws
|
||||||
};
|
};
|
||||||
let mut candidate_indices = self
|
let mut candidate_indices = self
|
||||||
.candidate_indices_for_dc(&writers_snapshot, routed_dc, false)
|
.candidate_indices_for_dc(&writers_snapshot, routed_dc, false)
|
||||||
@@ -79,7 +75,7 @@ impl MePool {
|
|||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
routed_dc: i32,
|
routed_dc: i32,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
let endpoints = self.endpoint_candidates_for_target_dc(routed_dc).await;
|
let endpoints = self.preferred_endpoints_for_dc(routed_dc).await;
|
||||||
if endpoints.is_empty() {
|
if endpoints.is_empty() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@@ -92,32 +88,18 @@ impl MePool {
|
|||||||
|
|
||||||
pub(super) async fn trigger_async_recovery_global(self: &Arc<Self>) {
|
pub(super) async fn trigger_async_recovery_global(self: &Arc<Self>) {
|
||||||
self.stats.increment_me_async_recovery_trigger_total();
|
self.stats.increment_me_async_recovery_trigger_total();
|
||||||
let mut seen = HashSet::<(i32, SocketAddr)>::new();
|
let preferred = self.preferred_endpoints_by_dc.load();
|
||||||
for family in self.family_order() {
|
let mut triggered = 0usize;
|
||||||
let map_guard = match family {
|
for (dc, addrs) in preferred.iter() {
|
||||||
IpFamily::V4 => self.proxy_map_v4.read().await,
|
for addr in addrs {
|
||||||
IpFamily::V6 => self.proxy_map_v6.read().await,
|
self.trigger_immediate_refill_for_dc(*addr, *dc);
|
||||||
};
|
triggered = triggered.saturating_add(1);
|
||||||
for (dc, addrs) in map_guard.iter() {
|
if triggered >= 8 {
|
||||||
for (ip, port) in addrs {
|
|
||||||
let addr = SocketAddr::new(*ip, *port);
|
|
||||||
if seen.insert((*dc, addr)) {
|
|
||||||
self.trigger_immediate_refill_for_dc(addr, *dc);
|
|
||||||
}
|
|
||||||
if seen.len() >= 8 {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) async fn endpoint_candidates_for_target_dc(
|
|
||||||
&self,
|
|
||||||
routed_dc: i32,
|
|
||||||
) -> Vec<SocketAddr> {
|
|
||||||
self.preferred_endpoints_for_dc(routed_dc).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) async fn maybe_trigger_hybrid_recovery(
|
pub(super) async fn maybe_trigger_hybrid_recovery(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
|
|||||||
@@ -15,7 +15,10 @@ impl MePool {
|
|||||||
routed_dc: i32,
|
routed_dc: i32,
|
||||||
include_warm: bool,
|
include_warm: bool,
|
||||||
) -> Vec<usize> {
|
) -> Vec<usize> {
|
||||||
let preferred = self.preferred_endpoints_for_dc(routed_dc).await;
|
let preferred_snapshot = self.preferred_endpoints_by_dc.load();
|
||||||
|
let Some(preferred) = preferred_snapshot.get(&routed_dc) else {
|
||||||
|
return Vec::new();
|
||||||
|
};
|
||||||
if preferred.is_empty() {
|
if preferred.is_empty() {
|
||||||
return Vec::new();
|
return Vec::new();
|
||||||
}
|
}
|
||||||
@@ -25,7 +28,7 @@ impl MePool {
|
|||||||
if !self.writer_eligible_for_selection(w, include_warm) {
|
if !self.writer_eligible_for_selection(w, include_warm) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if w.writer_dc == routed_dc && preferred.contains(&w.addr) {
|
if w.writer_dc == routed_dc && preferred.binary_search(&w.addr).is_ok() {
|
||||||
out.push(idx);
|
out.push(idx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user