mirror of
https://github.com/telemt/telemt.git
synced 2026-06-24 03:41:10 +03:00
Atomically claim pressure eviction budget in MR
This commit is contained in:
@@ -98,7 +98,8 @@ pub(super) fn maybe_evict_idle_candidate_on_pressure_in(
|
||||
}
|
||||
*seen_pressure_seq = latest_pressure_seq;
|
||||
|
||||
if latest_pressure_seq == registry.pressure_consumed_seq.load(Ordering::Relaxed) {
|
||||
let consumed_pressure_seq = registry.pressure_consumed_seq.load(Ordering::Relaxed);
|
||||
if latest_pressure_seq == consumed_pressure_seq {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -106,9 +107,13 @@ pub(super) fn maybe_evict_idle_candidate_on_pressure_in(
|
||||
let mut ordered = registry.ordered.lock();
|
||||
loop {
|
||||
let Some((mark_order_seq, candidate_conn_id)) = ordered.iter().next().copied() else {
|
||||
registry
|
||||
.pressure_consumed_seq
|
||||
.store(latest_pressure_seq, Ordering::Relaxed);
|
||||
// Empty queues consume the event so later candidates cannot replay stale pressure.
|
||||
let _ = registry.pressure_consumed_seq.compare_exchange(
|
||||
consumed_pressure_seq,
|
||||
latest_pressure_seq,
|
||||
Ordering::Relaxed,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
return false;
|
||||
};
|
||||
let Some(candidate_meta) = registry.by_conn_id.get(&candidate_conn_id) else {
|
||||
@@ -138,15 +143,27 @@ pub(super) fn maybe_evict_idle_candidate_on_pressure_in(
|
||||
return false;
|
||||
}
|
||||
|
||||
// Claim the global pressure budget before removal; otherwise racing sessions
|
||||
// can observe the next FIFO item and spend the same event more than once.
|
||||
if registry
|
||||
.pressure_consumed_seq
|
||||
.compare_exchange(
|
||||
consumed_pressure_seq,
|
||||
latest_pressure_seq,
|
||||
Ordering::Relaxed,
|
||||
Ordering::Relaxed,
|
||||
)
|
||||
.is_err()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if let Some((_, meta)) = registry.by_conn_id.remove(&conn_id) {
|
||||
registry
|
||||
.ordered
|
||||
.lock()
|
||||
.remove(&(meta.mark_order_seq, conn_id));
|
||||
}
|
||||
registry
|
||||
.pressure_consumed_seq
|
||||
.store(latest_pressure_seq, Ordering::Relaxed);
|
||||
stats.increment_relay_pressure_evict_total();
|
||||
true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user