mirror of
https://github.com/telemt/telemt.git
synced 2026-05-13 23:31:44 +03:00
Limit&Quota Saving as File + API
This commit is contained in:
@@ -43,7 +43,7 @@ use events::ApiEventStore;
|
|||||||
use http_utils::{error_response, read_json, read_optional_json, success_response};
|
use http_utils::{error_response, read_json, read_optional_json, success_response};
|
||||||
use model::{
|
use model::{
|
||||||
ApiFailure, ClassCount, CreateUserRequest, DeleteUserResponse, HealthData, HealthReadyData,
|
ApiFailure, ClassCount, CreateUserRequest, DeleteUserResponse, HealthData, HealthReadyData,
|
||||||
PatchUserRequest, RotateSecretRequest, SummaryData, UserActiveIps,
|
PatchUserRequest, ResetUserQuotaResponse, RotateSecretRequest, SummaryData, UserActiveIps,
|
||||||
};
|
};
|
||||||
use runtime_edge::{
|
use runtime_edge::{
|
||||||
EdgeConnectionsCacheEntry, build_runtime_connections_summary_data,
|
EdgeConnectionsCacheEntry, build_runtime_connections_summary_data,
|
||||||
@@ -80,6 +80,7 @@ pub(super) struct ApiShared {
|
|||||||
pub(super) me_pool: Arc<RwLock<Option<Arc<MePool>>>>,
|
pub(super) me_pool: Arc<RwLock<Option<Arc<MePool>>>>,
|
||||||
pub(super) upstream_manager: Arc<UpstreamManager>,
|
pub(super) upstream_manager: Arc<UpstreamManager>,
|
||||||
pub(super) config_path: PathBuf,
|
pub(super) config_path: PathBuf,
|
||||||
|
pub(super) quota_state_path: PathBuf,
|
||||||
pub(super) detected_ips_rx: watch::Receiver<(Option<IpAddr>, Option<IpAddr>)>,
|
pub(super) detected_ips_rx: watch::Receiver<(Option<IpAddr>, Option<IpAddr>)>,
|
||||||
pub(super) mutation_lock: Arc<Mutex<()>>,
|
pub(super) mutation_lock: Arc<Mutex<()>>,
|
||||||
pub(super) minimal_cache: Arc<Mutex<Option<MinimalCacheEntry>>>,
|
pub(super) minimal_cache: Arc<Mutex<Option<MinimalCacheEntry>>>,
|
||||||
@@ -112,6 +113,7 @@ pub async fn serve(
|
|||||||
config_rx: watch::Receiver<Arc<ProxyConfig>>,
|
config_rx: watch::Receiver<Arc<ProxyConfig>>,
|
||||||
admission_rx: watch::Receiver<bool>,
|
admission_rx: watch::Receiver<bool>,
|
||||||
config_path: PathBuf,
|
config_path: PathBuf,
|
||||||
|
quota_state_path: PathBuf,
|
||||||
detected_ips_rx: watch::Receiver<(Option<IpAddr>, Option<IpAddr>)>,
|
detected_ips_rx: watch::Receiver<(Option<IpAddr>, Option<IpAddr>)>,
|
||||||
process_started_at_epoch_secs: u64,
|
process_started_at_epoch_secs: u64,
|
||||||
startup_tracker: Arc<StartupTracker>,
|
startup_tracker: Arc<StartupTracker>,
|
||||||
@@ -143,6 +145,7 @@ pub async fn serve(
|
|||||||
me_pool,
|
me_pool,
|
||||||
upstream_manager,
|
upstream_manager,
|
||||||
config_path,
|
config_path,
|
||||||
|
quota_state_path,
|
||||||
detected_ips_rx,
|
detected_ips_rx,
|
||||||
mutation_lock: Arc::new(Mutex::new(())),
|
mutation_lock: Arc::new(Mutex::new(())),
|
||||||
minimal_cache: Arc::new(Mutex::new(None)),
|
minimal_cache: Arc::new(Mutex::new(None)),
|
||||||
@@ -491,6 +494,56 @@ async fn handle(
|
|||||||
Ok(success_response(status, data, revision))
|
Ok(success_response(status, data, revision))
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
|
if method == Method::POST
|
||||||
|
&& let Some(user) = normalized_path
|
||||||
|
.strip_prefix("/v1/users/")
|
||||||
|
.and_then(|path| path.strip_suffix("/reset-quota"))
|
||||||
|
&& !user.is_empty()
|
||||||
|
&& !user.contains('/')
|
||||||
|
{
|
||||||
|
if api_cfg.read_only {
|
||||||
|
return Ok(error_response(
|
||||||
|
request_id,
|
||||||
|
ApiFailure::new(
|
||||||
|
StatusCode::FORBIDDEN,
|
||||||
|
"read_only",
|
||||||
|
"API runs in read-only mode",
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let snapshot = match crate::quota_state::reset_user_quota(
|
||||||
|
&shared.quota_state_path,
|
||||||
|
shared.stats.as_ref(),
|
||||||
|
user,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(snapshot) => snapshot,
|
||||||
|
Err(error) => {
|
||||||
|
shared.runtime_events.record(
|
||||||
|
"api.user.reset_quota.failed",
|
||||||
|
format!("username={} error={}", user, error),
|
||||||
|
);
|
||||||
|
return Err(ApiFailure::internal(format!(
|
||||||
|
"Failed to reset user quota: {}",
|
||||||
|
error
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
shared
|
||||||
|
.runtime_events
|
||||||
|
.record("api.user.reset_quota.ok", format!("username={}", user));
|
||||||
|
let revision = current_revision(&shared.config_path).await?;
|
||||||
|
return Ok(success_response(
|
||||||
|
StatusCode::OK,
|
||||||
|
ResetUserQuotaResponse {
|
||||||
|
username: user.to_string(),
|
||||||
|
used_bytes: snapshot.used_bytes,
|
||||||
|
last_reset_epoch_secs: snapshot.last_reset_epoch_secs,
|
||||||
|
},
|
||||||
|
revision,
|
||||||
|
));
|
||||||
|
}
|
||||||
if let Some(user) = normalized_path.strip_prefix("/v1/users/")
|
if let Some(user) = normalized_path.strip_prefix("/v1/users/")
|
||||||
&& !user.is_empty()
|
&& !user.is_empty()
|
||||||
&& !user.contains('/')
|
&& !user.contains('/')
|
||||||
|
|||||||
@@ -501,6 +501,13 @@ pub(super) struct DeleteUserResponse {
|
|||||||
pub(super) in_runtime: bool,
|
pub(super) in_runtime: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
pub(super) struct ResetUserQuotaResponse {
|
||||||
|
pub(super) username: String,
|
||||||
|
pub(super) used_bytes: u64,
|
||||||
|
pub(super) last_reset_epoch_secs: u64,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub(super) struct CreateUserRequest {
|
pub(super) struct CreateUserRequest {
|
||||||
pub(super) username: String,
|
pub(super) username: String,
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ const TOP_LEVEL_CONFIG_KEYS: &[&str] = &[
|
|||||||
|
|
||||||
const GENERAL_CONFIG_KEYS: &[&str] = &[
|
const GENERAL_CONFIG_KEYS: &[&str] = &[
|
||||||
"data_path",
|
"data_path",
|
||||||
|
"quota_state_path",
|
||||||
"config_strict",
|
"config_strict",
|
||||||
"modes",
|
"modes",
|
||||||
"prefer_ipv6",
|
"prefer_ipv6",
|
||||||
|
|||||||
@@ -26,6 +26,10 @@ pub enum LogLevel {
|
|||||||
Silent,
|
Silent,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn default_quota_state_path() -> PathBuf {
|
||||||
|
PathBuf::from("telemt.limit.json")
|
||||||
|
}
|
||||||
|
|
||||||
impl LogLevel {
|
impl LogLevel {
|
||||||
/// Convert to tracing EnvFilter directive string.
|
/// Convert to tracing EnvFilter directive string.
|
||||||
pub fn to_filter_str(&self) -> &'static str {
|
pub fn to_filter_str(&self) -> &'static str {
|
||||||
@@ -375,6 +379,10 @@ pub struct GeneralConfig {
|
|||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub data_path: Option<PathBuf>,
|
pub data_path: Option<PathBuf>,
|
||||||
|
|
||||||
|
/// JSON state file for runtime per-user quota consumption.
|
||||||
|
#[serde(default = "default_quota_state_path")]
|
||||||
|
pub quota_state_path: PathBuf,
|
||||||
|
|
||||||
/// Reject unknown TOML config keys during load.
|
/// Reject unknown TOML config keys during load.
|
||||||
/// Startup fails fast; hot-reload rejects the new snapshot and keeps the current config.
|
/// Startup fails fast; hot-reload rejects the new snapshot and keeps the current config.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
@@ -979,6 +987,7 @@ impl Default for GeneralConfig {
|
|||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
data_path: None,
|
data_path: None,
|
||||||
|
quota_state_path: default_quota_state_path(),
|
||||||
config_strict: false,
|
config_strict: false,
|
||||||
modes: ProxyModes::default(),
|
modes: ProxyModes::default(),
|
||||||
prefer_ipv6: false,
|
prefer_ipv6: false,
|
||||||
|
|||||||
@@ -417,6 +417,8 @@ async fn run_telemt_core(
|
|||||||
|
|
||||||
let stats = Arc::new(Stats::new());
|
let stats = Arc::new(Stats::new());
|
||||||
stats.apply_telemetry_policy(TelemetryPolicy::from_config(&config.general.telemetry));
|
stats.apply_telemetry_policy(TelemetryPolicy::from_config(&config.general.telemetry));
|
||||||
|
let quota_state_path = config.general.quota_state_path.clone();
|
||||||
|
crate::quota_state::load_quota_state("a_state_path, stats.as_ref()).await;
|
||||||
|
|
||||||
let upstream_manager = Arc::new(UpstreamManager::new(
|
let upstream_manager = Arc::new(UpstreamManager::new(
|
||||||
config.upstreams.clone(),
|
config.upstreams.clone(),
|
||||||
@@ -496,6 +498,7 @@ async fn run_telemt_core(
|
|||||||
let config_rx_api = api_config_rx.clone();
|
let config_rx_api = api_config_rx.clone();
|
||||||
let admission_rx_api = admission_rx.clone();
|
let admission_rx_api = admission_rx.clone();
|
||||||
let config_path_api = config_path.clone();
|
let config_path_api = config_path.clone();
|
||||||
|
let quota_state_path_api = quota_state_path.clone();
|
||||||
let startup_tracker_api = startup_tracker.clone();
|
let startup_tracker_api = startup_tracker.clone();
|
||||||
let detected_ips_rx_api = detected_ips_rx.clone();
|
let detected_ips_rx_api = detected_ips_rx.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
@@ -509,6 +512,7 @@ async fn run_telemt_core(
|
|||||||
config_rx_api,
|
config_rx_api,
|
||||||
admission_rx_api,
|
admission_rx_api,
|
||||||
config_path_api,
|
config_path_api,
|
||||||
|
quota_state_path_api,
|
||||||
detected_ips_rx_api,
|
detected_ips_rx_api,
|
||||||
process_started_at_epoch_secs,
|
process_started_at_epoch_secs,
|
||||||
startup_tracker_api,
|
startup_tracker_api,
|
||||||
@@ -847,7 +851,7 @@ async fn run_telemt_core(
|
|||||||
max_connections.clone(),
|
max_connections.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
shutdown::wait_for_shutdown(process_started_at, me_pool, stats).await;
|
shutdown::wait_for_shutdown(process_started_at, me_pool, stats, quota_state_path).await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@
|
|||||||
//! SIGHUP is handled separately in config/hot_reload.rs for config reload.
|
//! SIGHUP is handled separately in config/hot_reload.rs for config reload.
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::path::PathBuf;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
#[cfg(not(unix))]
|
#[cfg(not(unix))]
|
||||||
@@ -48,9 +49,10 @@ pub(crate) async fn wait_for_shutdown(
|
|||||||
process_started_at: Instant,
|
process_started_at: Instant,
|
||||||
me_pool: Option<Arc<MePool>>,
|
me_pool: Option<Arc<MePool>>,
|
||||||
stats: Arc<Stats>,
|
stats: Arc<Stats>,
|
||||||
|
quota_state_path: PathBuf,
|
||||||
) {
|
) {
|
||||||
let signal = wait_for_shutdown_signal().await;
|
let signal = wait_for_shutdown_signal().await;
|
||||||
perform_shutdown(signal, process_started_at, me_pool, &stats).await;
|
perform_shutdown(signal, process_started_at, me_pool, &stats, quota_state_path).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Waits for any shutdown signal (SIGINT, SIGTERM, SIGQUIT).
|
/// Waits for any shutdown signal (SIGINT, SIGTERM, SIGQUIT).
|
||||||
@@ -79,6 +81,7 @@ async fn perform_shutdown(
|
|||||||
process_started_at: Instant,
|
process_started_at: Instant,
|
||||||
me_pool: Option<Arc<MePool>>,
|
me_pool: Option<Arc<MePool>>,
|
||||||
stats: &Stats,
|
stats: &Stats,
|
||||||
|
quota_state_path: PathBuf,
|
||||||
) {
|
) {
|
||||||
let shutdown_started_at = Instant::now();
|
let shutdown_started_at = Instant::now();
|
||||||
info!(signal = %signal, "Received shutdown signal");
|
info!(signal = %signal, "Received shutdown signal");
|
||||||
@@ -109,6 +112,22 @@ async fn perform_shutdown(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
match crate::quota_state::save_quota_state("a_state_path, stats).await {
|
||||||
|
Ok(()) => {
|
||||||
|
info!(
|
||||||
|
path = %quota_state_path.display(),
|
||||||
|
"Persisted per-user quota state"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
warn!(
|
||||||
|
error = %error,
|
||||||
|
path = %quota_state_path.display(),
|
||||||
|
"Failed to persist per-user quota state"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let shutdown_secs = shutdown_started_at.elapsed().as_secs();
|
let shutdown_secs = shutdown_started_at.elapsed().as_secs();
|
||||||
info!(
|
info!(
|
||||||
"Shutdown completed successfully in {} {}.",
|
"Shutdown completed successfully in {} {}.",
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ mod metrics;
|
|||||||
mod network;
|
mod network;
|
||||||
mod protocol;
|
mod protocol;
|
||||||
mod proxy;
|
mod proxy;
|
||||||
|
mod quota_state;
|
||||||
mod service;
|
mod service;
|
||||||
mod startup;
|
mod startup;
|
||||||
mod stats;
|
mod stats;
|
||||||
|
|||||||
114
src/quota_state.rs
Normal file
114
src/quota_state.rs
Normal file
@@ -0,0 +1,114 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
use crate::stats::{Stats, UserQuotaSnapshot};
|
||||||
|
|
||||||
|
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||||
|
pub(crate) struct QuotaStateFile {
|
||||||
|
pub(crate) last_reset_epoch_secs: u64,
|
||||||
|
pub(crate) users: BTreeMap<String, QuotaUserState>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||||
|
pub(crate) struct QuotaUserState {
|
||||||
|
pub(crate) used_bytes: u64,
|
||||||
|
pub(crate) last_reset_epoch_secs: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn now_epoch_secs() -> u64 {
|
||||||
|
SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_secs()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn load_quota_state(path: &Path, stats: &Stats) {
|
||||||
|
let bytes = match tokio::fs::read(path).await {
|
||||||
|
Ok(bytes) => bytes,
|
||||||
|
Err(error) if error.kind() == std::io::ErrorKind::NotFound => return,
|
||||||
|
Err(error) => {
|
||||||
|
warn!(
|
||||||
|
error = %error,
|
||||||
|
path = %path.display(),
|
||||||
|
"Failed to read quota state file"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let state = match serde_json::from_slice::<QuotaStateFile>(&bytes) {
|
||||||
|
Ok(state) => state,
|
||||||
|
Err(error) => {
|
||||||
|
warn!(
|
||||||
|
error = %error,
|
||||||
|
path = %path.display(),
|
||||||
|
"Failed to parse quota state file"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let loaded_users = state.users.len();
|
||||||
|
for (user, quota) in state.users {
|
||||||
|
stats.load_user_quota_state(&user, quota.used_bytes, quota.last_reset_epoch_secs);
|
||||||
|
}
|
||||||
|
info!(
|
||||||
|
path = %path.display(),
|
||||||
|
loaded_users,
|
||||||
|
"Loaded per-user quota state"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn save_quota_state(path: &Path, stats: &Stats) -> std::io::Result<()> {
|
||||||
|
let mut users = BTreeMap::new();
|
||||||
|
let mut last_reset_epoch_secs = 0;
|
||||||
|
for (user, quota) in stats.user_quota_snapshot() {
|
||||||
|
last_reset_epoch_secs = last_reset_epoch_secs.max(quota.last_reset_epoch_secs);
|
||||||
|
users.insert(user, quota_user_state(quota));
|
||||||
|
}
|
||||||
|
|
||||||
|
let state = QuotaStateFile {
|
||||||
|
last_reset_epoch_secs,
|
||||||
|
users,
|
||||||
|
};
|
||||||
|
write_state_file(path, &state).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn reset_user_quota(
|
||||||
|
path: &Path,
|
||||||
|
stats: &Stats,
|
||||||
|
user: &str,
|
||||||
|
) -> std::io::Result<UserQuotaSnapshot> {
|
||||||
|
let snapshot = stats.reset_user_quota(user);
|
||||||
|
save_quota_state(path, stats).await?;
|
||||||
|
Ok(snapshot)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write_state_file(path: &Path, state: &QuotaStateFile) -> std::io::Result<()> {
|
||||||
|
if let Some(parent) = path.parent()
|
||||||
|
&& !parent.as_os_str().is_empty()
|
||||||
|
{
|
||||||
|
tokio::fs::create_dir_all(parent).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let tmp_path = path.with_extension(format!("tmp.{}", now_epoch_secs()));
|
||||||
|
let payload = serde_json::to_vec_pretty(state)?;
|
||||||
|
let mut file = tokio::fs::File::create(&tmp_path).await?;
|
||||||
|
file.write_all(&payload).await?;
|
||||||
|
file.write_all(b"\n").await?;
|
||||||
|
file.sync_all().await?;
|
||||||
|
drop(file);
|
||||||
|
tokio::fs::rename(&tmp_path, path).await
|
||||||
|
}
|
||||||
|
|
||||||
|
fn quota_user_state(quota: UserQuotaSnapshot) -> QuotaUserState {
|
||||||
|
QuotaUserState {
|
||||||
|
used_bytes: quota.used_bytes,
|
||||||
|
last_reset_epoch_secs: quota.last_reset_epoch_secs,
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -8,7 +8,7 @@ pub mod telemetry;
|
|||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use std::collections::VecDeque;
|
use std::collections::{HashMap, VecDeque};
|
||||||
use std::collections::hash_map::DefaultHasher;
|
use std::collections::hash_map::DefaultHasher;
|
||||||
use std::hash::{Hash, Hasher};
|
use std::hash::{Hash, Hasher};
|
||||||
use std::num::NonZeroUsize;
|
use std::num::NonZeroUsize;
|
||||||
@@ -297,9 +297,16 @@ pub struct UserStats {
|
|||||||
/// This counter is the single source of truth for quota enforcement and
|
/// This counter is the single source of truth for quota enforcement and
|
||||||
/// intentionally tracks attempted traffic, not guaranteed delivery.
|
/// intentionally tracks attempted traffic, not guaranteed delivery.
|
||||||
pub quota_used: AtomicU64,
|
pub quota_used: AtomicU64,
|
||||||
|
pub quota_last_reset_epoch_secs: AtomicU64,
|
||||||
pub last_seen_epoch_secs: AtomicU64,
|
pub last_seen_epoch_secs: AtomicU64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct UserQuotaSnapshot {
|
||||||
|
pub used_bytes: u64,
|
||||||
|
pub last_reset_epoch_secs: u64,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
pub enum QuotaReserveError {
|
pub enum QuotaReserveError {
|
||||||
LimitExceeded,
|
LimitExceeded,
|
||||||
@@ -2408,6 +2415,52 @@ impl Stats {
|
|||||||
.unwrap_or(0)
|
.unwrap_or(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn load_user_quota_state(
|
||||||
|
&self,
|
||||||
|
user: &str,
|
||||||
|
used_bytes: u64,
|
||||||
|
last_reset_epoch_secs: u64,
|
||||||
|
) {
|
||||||
|
let stats = self.get_or_create_user_stats_handle(user);
|
||||||
|
stats.quota_used.store(used_bytes, Ordering::Relaxed);
|
||||||
|
stats
|
||||||
|
.quota_last_reset_epoch_secs
|
||||||
|
.store(last_reset_epoch_secs, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reset_user_quota(&self, user: &str) -> UserQuotaSnapshot {
|
||||||
|
let stats = self.get_or_create_user_stats_handle(user);
|
||||||
|
let last_reset_epoch_secs = Self::now_epoch_secs();
|
||||||
|
stats.quota_used.store(0, Ordering::Relaxed);
|
||||||
|
stats
|
||||||
|
.quota_last_reset_epoch_secs
|
||||||
|
.store(last_reset_epoch_secs, Ordering::Relaxed);
|
||||||
|
UserQuotaSnapshot {
|
||||||
|
used_bytes: 0,
|
||||||
|
last_reset_epoch_secs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn user_quota_snapshot(&self) -> HashMap<String, UserQuotaSnapshot> {
|
||||||
|
let mut out = HashMap::new();
|
||||||
|
for entry in self.user_stats.iter() {
|
||||||
|
let stats = entry.value();
|
||||||
|
let used_bytes = stats.quota_used.load(Ordering::Relaxed);
|
||||||
|
let last_reset_epoch_secs = stats.quota_last_reset_epoch_secs.load(Ordering::Relaxed);
|
||||||
|
if used_bytes == 0 && last_reset_epoch_secs == 0 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
out.insert(
|
||||||
|
entry.key().clone(),
|
||||||
|
UserQuotaSnapshot {
|
||||||
|
used_bytes,
|
||||||
|
last_reset_epoch_secs,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_handshake_timeouts(&self) -> u64 {
|
pub fn get_handshake_timeouts(&self) -> u64 {
|
||||||
self.handshake_timeouts.load(Ordering::Relaxed)
|
self.handshake_timeouts.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user