Redesign Quotas on Atomics

This commit is contained in:
Alexey
2026-03-23 15:53:44 +03:00
parent 0c3c9009a9
commit 6f4356f72a
10 changed files with 408 additions and 1043 deletions

View File

@@ -238,10 +238,12 @@ pub struct Stats {
me_inline_recovery_total: AtomicU64,
ip_reservation_rollback_tcp_limit_total: AtomicU64,
ip_reservation_rollback_quota_limit_total: AtomicU64,
quota_write_fail_bytes_total: AtomicU64,
quota_write_fail_events_total: AtomicU64,
telemetry_core_enabled: AtomicBool,
telemetry_user_enabled: AtomicBool,
telemetry_me_level: AtomicU8,
user_stats: DashMap<String, UserStats>,
user_stats: DashMap<String, Arc<UserStats>>,
user_stats_last_cleanup_epoch_secs: AtomicU64,
start_time: parking_lot::RwLock<Option<Instant>>,
}
@@ -254,9 +256,51 @@ pub struct UserStats {
pub octets_to_client: AtomicU64,
pub msgs_from_client: AtomicU64,
pub msgs_to_client: AtomicU64,
/// Total bytes charged against per-user quota admission.
///
/// This counter is the single source of truth for quota enforcement and
/// intentionally tracks attempted traffic, not guaranteed delivery.
pub quota_used: AtomicU64,
pub last_seen_epoch_secs: AtomicU64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QuotaReserveError {
LimitExceeded,
Contended,
}
impl UserStats {
#[inline]
pub fn quota_used(&self) -> u64 {
self.quota_used.load(Ordering::Relaxed)
}
/// Attempts one CAS reservation step against the quota counter.
///
/// Callers control retry/yield policy. This primitive intentionally does
/// not block or sleep so both sync poll paths and async paths can wrap it
/// with their own contention strategy.
#[inline]
pub fn quota_try_reserve(&self, bytes: u64, limit: u64) -> Result<u64, QuotaReserveError> {
let current = self.quota_used.load(Ordering::Relaxed);
if bytes > limit.saturating_sub(current) {
return Err(QuotaReserveError::LimitExceeded);
}
let next = current.saturating_add(bytes);
match self.quota_used.compare_exchange_weak(
current,
next,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => Ok(next),
Err(_) => Err(QuotaReserveError::Contended),
}
}
}
impl Stats {
pub fn new() -> Self {
let stats = Self::default();
@@ -316,6 +360,70 @@ impl Stats {
.store(Self::now_epoch_secs(), Ordering::Relaxed);
}
pub(crate) fn get_or_create_user_stats_handle(&self, user: &str) -> Arc<UserStats> {
self.maybe_cleanup_user_stats();
if let Some(existing) = self.user_stats.get(user) {
let handle = Arc::clone(existing.value());
Self::touch_user_stats(handle.as_ref());
return handle;
}
let entry = self.user_stats.entry(user.to_string()).or_default();
if entry.last_seen_epoch_secs.load(Ordering::Relaxed) == 0 {
Self::touch_user_stats(entry.value().as_ref());
}
Arc::clone(entry.value())
}
#[inline]
pub(crate) fn add_user_octets_from_handle(&self, user_stats: &UserStats, bytes: u64) {
if !self.telemetry_user_enabled() {
return;
}
Self::touch_user_stats(user_stats);
user_stats.octets_from_client.fetch_add(bytes, Ordering::Relaxed);
}
#[inline]
pub(crate) fn add_user_octets_to_handle(&self, user_stats: &UserStats, bytes: u64) {
if !self.telemetry_user_enabled() {
return;
}
Self::touch_user_stats(user_stats);
user_stats.octets_to_client.fetch_add(bytes, Ordering::Relaxed);
}
#[inline]
pub(crate) fn increment_user_msgs_from_handle(&self, user_stats: &UserStats) {
if !self.telemetry_user_enabled() {
return;
}
Self::touch_user_stats(user_stats);
user_stats.msgs_from_client.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub(crate) fn increment_user_msgs_to_handle(&self, user_stats: &UserStats) {
if !self.telemetry_user_enabled() {
return;
}
Self::touch_user_stats(user_stats);
user_stats.msgs_to_client.fetch_add(1, Ordering::Relaxed);
}
/// Charges already committed bytes in a post-I/O path.
///
/// This helper is intentionally separate from `quota_try_reserve` to avoid
/// mixing reserve and post-charge on a single I/O event.
#[inline]
pub(crate) fn quota_charge_post_write(&self, user_stats: &UserStats, bytes: u64) -> u64 {
Self::touch_user_stats(user_stats);
user_stats
.quota_used
.fetch_add(bytes, Ordering::Relaxed)
.saturating_add(bytes)
}
fn maybe_cleanup_user_stats(&self) {
const USER_STATS_CLEANUP_INTERVAL_SECS: u64 = 60;
const USER_STATS_IDLE_TTL_SECS: u64 = 24 * 60 * 60;
@@ -1114,6 +1222,18 @@ impl Stats {
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn add_quota_write_fail_bytes_total(&self, bytes: u64) {
if self.telemetry_core_enabled() {
self.quota_write_fail_bytes_total
.fetch_add(bytes, Ordering::Relaxed);
}
}
pub fn increment_quota_write_fail_events_total(&self) {
if self.telemetry_core_enabled() {
self.quota_write_fail_events_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_endpoint_quarantine_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_endpoint_quarantine_total
@@ -1764,19 +1884,19 @@ impl Stats {
self.ip_reservation_rollback_quota_limit_total
.load(Ordering::Relaxed)
}
pub fn get_quota_write_fail_bytes_total(&self) -> u64 {
self.quota_write_fail_bytes_total.load(Ordering::Relaxed)
}
pub fn get_quota_write_fail_events_total(&self) -> u64 {
self.quota_write_fail_events_total.load(Ordering::Relaxed)
}
pub fn increment_user_connects(&self, user: &str) {
if !self.telemetry_user_enabled() {
return;
}
self.maybe_cleanup_user_stats();
if let Some(stats) = self.user_stats.get(user) {
Self::touch_user_stats(stats.value());
stats.connects.fetch_add(1, Ordering::Relaxed);
return;
}
let stats = self.user_stats.entry(user.to_string()).or_default();
Self::touch_user_stats(stats.value());
let stats = self.get_or_create_user_stats_handle(user);
Self::touch_user_stats(stats.as_ref());
stats.connects.fetch_add(1, Ordering::Relaxed);
}
@@ -1784,14 +1904,8 @@ impl Stats {
if !self.telemetry_user_enabled() {
return;
}
self.maybe_cleanup_user_stats();
if let Some(stats) = self.user_stats.get(user) {
Self::touch_user_stats(stats.value());
stats.curr_connects.fetch_add(1, Ordering::Relaxed);
return;
}
let stats = self.user_stats.entry(user.to_string()).or_default();
Self::touch_user_stats(stats.value());
let stats = self.get_or_create_user_stats_handle(user);
Self::touch_user_stats(stats.as_ref());
stats.curr_connects.fetch_add(1, Ordering::Relaxed);
}
@@ -1800,9 +1914,8 @@ impl Stats {
return true;
}
self.maybe_cleanup_user_stats();
let stats = self.user_stats.entry(user.to_string()).or_default();
Self::touch_user_stats(stats.value());
let stats = self.get_or_create_user_stats_handle(user);
Self::touch_user_stats(stats.as_ref());
let counter = &stats.curr_connects;
let mut current = counter.load(Ordering::Relaxed);
@@ -1827,7 +1940,7 @@ impl Stats {
pub fn decrement_user_curr_connects(&self, user: &str) {
self.maybe_cleanup_user_stats();
if let Some(stats) = self.user_stats.get(user) {
Self::touch_user_stats(stats.value());
Self::touch_user_stats(stats.value().as_ref());
let counter = &stats.curr_connects;
let mut current = counter.load(Ordering::Relaxed);
loop {
@@ -1858,86 +1971,32 @@ impl Stats {
if !self.telemetry_user_enabled() {
return;
}
self.maybe_cleanup_user_stats();
if let Some(stats) = self.user_stats.get(user) {
Self::touch_user_stats(stats.value());
stats.octets_from_client.fetch_add(bytes, Ordering::Relaxed);
return;
}
let stats = self.user_stats.entry(user.to_string()).or_default();
Self::touch_user_stats(stats.value());
stats.octets_from_client.fetch_add(bytes, Ordering::Relaxed);
let stats = self.get_or_create_user_stats_handle(user);
self.add_user_octets_from_handle(stats.as_ref(), bytes);
}
pub fn add_user_octets_to(&self, user: &str, bytes: u64) {
if !self.telemetry_user_enabled() {
return;
}
self.maybe_cleanup_user_stats();
if let Some(stats) = self.user_stats.get(user) {
Self::touch_user_stats(stats.value());
stats.octets_to_client.fetch_add(bytes, Ordering::Relaxed);
return;
}
let stats = self.user_stats.entry(user.to_string()).or_default();
Self::touch_user_stats(stats.value());
stats.octets_to_client.fetch_add(bytes, Ordering::Relaxed);
}
pub fn sub_user_octets_to(&self, user: &str, bytes: u64) {
if !self.telemetry_user_enabled() {
return;
}
self.maybe_cleanup_user_stats();
let Some(stats) = self.user_stats.get(user) else {
return;
};
Self::touch_user_stats(stats.value());
let counter = &stats.octets_to_client;
let mut current = counter.load(Ordering::Relaxed);
loop {
let next = current.saturating_sub(bytes);
match counter.compare_exchange_weak(
current,
next,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
let stats = self.get_or_create_user_stats_handle(user);
self.add_user_octets_to_handle(stats.as_ref(), bytes);
}
pub fn increment_user_msgs_from(&self, user: &str) {
if !self.telemetry_user_enabled() {
return;
}
self.maybe_cleanup_user_stats();
if let Some(stats) = self.user_stats.get(user) {
Self::touch_user_stats(stats.value());
stats.msgs_from_client.fetch_add(1, Ordering::Relaxed);
return;
}
let stats = self.user_stats.entry(user.to_string()).or_default();
Self::touch_user_stats(stats.value());
stats.msgs_from_client.fetch_add(1, Ordering::Relaxed);
let stats = self.get_or_create_user_stats_handle(user);
self.increment_user_msgs_from_handle(stats.as_ref());
}
pub fn increment_user_msgs_to(&self, user: &str) {
if !self.telemetry_user_enabled() {
return;
}
self.maybe_cleanup_user_stats();
if let Some(stats) = self.user_stats.get(user) {
Self::touch_user_stats(stats.value());
stats.msgs_to_client.fetch_add(1, Ordering::Relaxed);
return;
}
let stats = self.user_stats.entry(user.to_string()).or_default();
Self::touch_user_stats(stats.value());
stats.msgs_to_client.fetch_add(1, Ordering::Relaxed);
let stats = self.get_or_create_user_stats_handle(user);
self.increment_user_msgs_to_handle(stats.as_ref());
}
pub fn get_user_total_octets(&self, user: &str) -> u64 {
@@ -1950,6 +2009,13 @@ impl Stats {
.unwrap_or(0)
}
pub fn get_user_quota_used(&self, user: &str) -> u64 {
self.user_stats
.get(user)
.map(|s| s.quota_used.load(Ordering::Relaxed))
.unwrap_or(0)
}
pub fn get_handshake_timeouts(&self) -> u64 {
self.handshake_timeouts.load(Ordering::Relaxed)
}
@@ -2015,7 +2081,7 @@ impl Stats {
.load(Ordering::Relaxed)
}
pub fn iter_user_stats(&self) -> dashmap::iter::Iter<'_, String, UserStats> {
pub fn iter_user_stats(&self) -> dashmap::iter::Iter<'_, String, Arc<UserStats>> {
self.user_stats.iter()
}
@@ -2163,6 +2229,22 @@ impl ReplayChecker {
found
}
fn check_only_internal(
&self,
data: &[u8],
shards: &[Mutex<ReplayShard>],
window: Duration,
) -> bool {
self.checks.fetch_add(1, Ordering::Relaxed);
let idx = self.get_shard_idx(data);
let mut shard = shards[idx].lock();
let found = shard.check(data, Instant::now(), window);
if found {
self.hits.fetch_add(1, Ordering::Relaxed);
}
found
}
fn add_only(&self, data: &[u8], shards: &[Mutex<ReplayShard>], window: Duration) {
self.additions.fetch_add(1, Ordering::Relaxed);
let idx = self.get_shard_idx(data);
@@ -2186,7 +2268,7 @@ impl ReplayChecker {
self.add_only(data, &self.handshake_shards, self.window)
}
pub fn check_tls_digest(&self, data: &[u8]) -> bool {
self.check_and_add_tls_digest(data)
self.check_only_internal(data, &self.tls_shards, self.tls_window)
}
pub fn add_tls_digest(&self, data: &[u8]) {
self.add_only(data, &self.tls_shards, self.tls_window)
@@ -2289,6 +2371,7 @@ impl ReplayStats {
mod tests {
use super::*;
use crate::config::MeTelemetryLevel;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[test]
@@ -2457,6 +2540,60 @@ mod tests {
}
assert_eq!(checker.stats().total_entries, 500);
}
#[test]
fn test_quota_reserve_under_contention_hits_limit_exactly() {
let user_stats = Arc::new(UserStats::default());
let successes = Arc::new(AtomicU64::new(0));
let limit = 8_192u64;
let mut workers = Vec::new();
for _ in 0..8 {
let user_stats = user_stats.clone();
let successes = successes.clone();
workers.push(std::thread::spawn(move || {
loop {
match user_stats.quota_try_reserve(1, limit) {
Ok(_) => {
successes.fetch_add(1, Ordering::Relaxed);
}
Err(QuotaReserveError::Contended) => {
std::hint::spin_loop();
}
Err(QuotaReserveError::LimitExceeded) => {
break;
}
}
}
}));
}
for worker in workers {
worker.join().expect("worker thread must finish");
}
assert_eq!(
successes.load(Ordering::Relaxed),
limit,
"successful reservations must stop exactly at limit"
);
assert_eq!(user_stats.quota_used(), limit);
}
#[test]
fn test_quota_used_is_authoritative_and_independent_from_octets_telemetry() {
let stats = Stats::new();
let user = "quota-authoritative-user";
let user_stats = stats.get_or_create_user_stats_handle(user);
stats.add_user_octets_to_handle(&user_stats, 5);
assert_eq!(stats.get_user_total_octets(user), 5);
assert_eq!(stats.get_user_quota_used(user), 0);
stats.quota_charge_post_write(&user_stats, 7);
assert_eq!(stats.get_user_total_octets(user), 5);
assert_eq!(stats.get_user_quota_used(user), 7);
}
}
#[cfg(test)]
@@ -2466,7 +2603,3 @@ mod connection_lease_security_tests;
#[cfg(test)]
#[path = "tests/replay_checker_security_tests.rs"]
mod replay_checker_security_tests;
#[cfg(test)]
#[path = "tests/user_octets_sub_security_tests.rs"]
mod user_octets_sub_security_tests;

View File

@@ -1,151 +0,0 @@
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn sub_user_octets_to_underflow_saturates_at_zero() {
let stats = Stats::new();
let user = "sub-underflow-user";
stats.add_user_octets_to(user, 3);
stats.sub_user_octets_to(user, 100);
assert_eq!(stats.get_user_total_octets(user), 0);
}
#[test]
fn sub_user_octets_to_does_not_affect_octets_from_client() {
let stats = Stats::new();
let user = "sub-isolation-user";
stats.add_user_octets_from(user, 17);
stats.add_user_octets_to(user, 5);
stats.sub_user_octets_to(user, 3);
assert_eq!(stats.get_user_total_octets(user), 19);
}
#[test]
fn light_fuzz_add_sub_model_matches_saturating_reference() {
let stats = Stats::new();
let user = "sub-fuzz-user";
let mut seed = 0x91D2_4CB8_EE77_1101u64;
let mut model_to = 0u64;
for _ in 0..8192 {
seed ^= seed << 7;
seed ^= seed >> 9;
seed ^= seed << 8;
let amt = ((seed >> 8) & 0x3f) + 1;
if (seed & 1) == 0 {
stats.add_user_octets_to(user, amt);
model_to = model_to.saturating_add(amt);
} else {
stats.sub_user_octets_to(user, amt);
model_to = model_to.saturating_sub(amt);
}
}
assert_eq!(stats.get_user_total_octets(user), model_to);
}
#[test]
fn stress_parallel_add_sub_never_underflows_or_panics() {
let stats = Arc::new(Stats::new());
let user = "sub-stress-user";
// Pre-fund with a large offset so subtractions never saturate at zero.
// This guarantees commutative updates, making the final state deterministic.
let base_offset = 10_000_000u64;
stats.add_user_octets_to(user, base_offset);
let mut workers = Vec::new();
for tid in 0..16u64 {
let stats_for_thread = Arc::clone(&stats);
workers.push(thread::spawn(move || {
let mut seed = 0xD00D_1000_0000_0000u64 ^ tid;
let mut net_delta = 0i64;
for _ in 0..4096 {
seed ^= seed << 7;
seed ^= seed >> 9;
seed ^= seed << 8;
let amt = ((seed >> 8) & 0x1f) + 1;
if (seed & 1) == 0 {
stats_for_thread.add_user_octets_to(user, amt);
net_delta += amt as i64;
} else {
stats_for_thread.sub_user_octets_to(user, amt);
net_delta -= amt as i64;
}
}
net_delta
}));
}
let mut expected_net_delta = 0i64;
for worker in workers {
expected_net_delta += worker
.join()
.expect("sub-user stress worker must not panic");
}
let expected_total = (base_offset as i64 + expected_net_delta) as u64;
let total = stats.get_user_total_octets(user);
assert_eq!(
total, expected_total,
"concurrent add/sub lost updates or suffered ABA races"
);
}
#[test]
fn sub_user_octets_to_missing_user_is_noop() {
let stats = Stats::new();
stats.sub_user_octets_to("missing-user", 1024);
assert_eq!(stats.get_user_total_octets("missing-user"), 0);
}
#[test]
fn stress_parallel_per_user_models_remain_exact() {
let stats = Arc::new(Stats::new());
let mut workers = Vec::new();
for tid in 0..16u64 {
let stats_for_thread = Arc::clone(&stats);
workers.push(thread::spawn(move || {
let user = format!("sub-per-user-{tid}");
let mut seed = 0xFACE_0000_0000_0000u64 ^ tid;
let mut model = 0u64;
for _ in 0..4096 {
seed ^= seed << 7;
seed ^= seed >> 9;
seed ^= seed << 8;
let amt = ((seed >> 8) & 0x3f) + 1;
if (seed & 1) == 0 {
stats_for_thread.add_user_octets_to(&user, amt);
model = model.saturating_add(amt);
} else {
stats_for_thread.sub_user_octets_to(&user, amt);
model = model.saturating_sub(amt);
}
}
(user, model)
}));
}
for worker in workers {
let (user, model) = worker
.join()
.expect("per-user subtract stress worker must not panic");
assert_eq!(
stats.get_user_total_octets(&user),
model,
"per-user parallel model diverged"
);
}
}