From f0f2bc0482334a74b9de480e1d1eb37488db26c7 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 8 May 2026 14:38:24 +0300 Subject: [PATCH] Limit&Quota Saving as File + API --- src/api/mod.rs | 55 ++++++++++++++++++- src/api/model.rs | 7 +++ src/config/load.rs | 1 + src/config/types.rs | 9 ++++ src/maestro/mod.rs | 6 ++- src/maestro/shutdown.rs | 21 +++++++- src/main.rs | 1 + src/quota_state.rs | 114 ++++++++++++++++++++++++++++++++++++++++ src/stats/mod.rs | 55 ++++++++++++++++++- 9 files changed, 265 insertions(+), 4 deletions(-) create mode 100644 src/quota_state.rs diff --git a/src/api/mod.rs b/src/api/mod.rs index 1778d7d..8087497 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -43,7 +43,7 @@ use events::ApiEventStore; use http_utils::{error_response, read_json, read_optional_json, success_response}; use model::{ ApiFailure, ClassCount, CreateUserRequest, DeleteUserResponse, HealthData, HealthReadyData, - PatchUserRequest, RotateSecretRequest, SummaryData, UserActiveIps, + PatchUserRequest, ResetUserQuotaResponse, RotateSecretRequest, SummaryData, UserActiveIps, }; use runtime_edge::{ EdgeConnectionsCacheEntry, build_runtime_connections_summary_data, @@ -80,6 +80,7 @@ pub(super) struct ApiShared { pub(super) me_pool: Arc>>>, pub(super) upstream_manager: Arc, pub(super) config_path: PathBuf, + pub(super) quota_state_path: PathBuf, pub(super) detected_ips_rx: watch::Receiver<(Option, Option)>, pub(super) mutation_lock: Arc>, pub(super) minimal_cache: Arc>>, @@ -112,6 +113,7 @@ pub async fn serve( config_rx: watch::Receiver>, admission_rx: watch::Receiver, config_path: PathBuf, + quota_state_path: PathBuf, detected_ips_rx: watch::Receiver<(Option, Option)>, process_started_at_epoch_secs: u64, startup_tracker: Arc, @@ -143,6 +145,7 @@ pub async fn serve( me_pool, upstream_manager, config_path, + quota_state_path, detected_ips_rx, mutation_lock: Arc::new(Mutex::new(())), minimal_cache: Arc::new(Mutex::new(None)), @@ -491,6 +494,56 @@ async fn handle( Ok(success_response(status, data, revision)) } _ => { + if method == Method::POST + && let Some(user) = normalized_path + .strip_prefix("/v1/users/") + .and_then(|path| path.strip_suffix("/reset-quota")) + && !user.is_empty() + && !user.contains('/') + { + if api_cfg.read_only { + return Ok(error_response( + request_id, + ApiFailure::new( + StatusCode::FORBIDDEN, + "read_only", + "API runs in read-only mode", + ), + )); + } + let snapshot = match crate::quota_state::reset_user_quota( + &shared.quota_state_path, + shared.stats.as_ref(), + user, + ) + .await + { + Ok(snapshot) => snapshot, + Err(error) => { + shared.runtime_events.record( + "api.user.reset_quota.failed", + format!("username={} error={}", user, error), + ); + return Err(ApiFailure::internal(format!( + "Failed to reset user quota: {}", + error + ))); + } + }; + shared + .runtime_events + .record("api.user.reset_quota.ok", format!("username={}", user)); + let revision = current_revision(&shared.config_path).await?; + return Ok(success_response( + StatusCode::OK, + ResetUserQuotaResponse { + username: user.to_string(), + used_bytes: snapshot.used_bytes, + last_reset_epoch_secs: snapshot.last_reset_epoch_secs, + }, + revision, + )); + } if let Some(user) = normalized_path.strip_prefix("/v1/users/") && !user.is_empty() && !user.contains('/') diff --git a/src/api/model.rs b/src/api/model.rs index 1604491..85f0d45 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -501,6 +501,13 @@ pub(super) struct DeleteUserResponse { pub(super) in_runtime: bool, } +#[derive(Serialize)] +pub(super) struct ResetUserQuotaResponse { + pub(super) username: String, + pub(super) used_bytes: u64, + pub(super) last_reset_epoch_secs: u64, +} + #[derive(Deserialize)] pub(super) struct CreateUserRequest { pub(super) username: String, diff --git a/src/config/load.rs b/src/config/load.rs index 2f2cb55..fdb4d59 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -43,6 +43,7 @@ const TOP_LEVEL_CONFIG_KEYS: &[&str] = &[ const GENERAL_CONFIG_KEYS: &[&str] = &[ "data_path", + "quota_state_path", "config_strict", "modes", "prefer_ipv6", diff --git a/src/config/types.rs b/src/config/types.rs index 38501e6..30031d1 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -26,6 +26,10 @@ pub enum LogLevel { Silent, } +fn default_quota_state_path() -> PathBuf { + PathBuf::from("telemt.limit.json") +} + impl LogLevel { /// Convert to tracing EnvFilter directive string. pub fn to_filter_str(&self) -> &'static str { @@ -375,6 +379,10 @@ pub struct GeneralConfig { #[serde(default)] pub data_path: Option, + /// JSON state file for runtime per-user quota consumption. + #[serde(default = "default_quota_state_path")] + pub quota_state_path: PathBuf, + /// Reject unknown TOML config keys during load. /// Startup fails fast; hot-reload rejects the new snapshot and keeps the current config. #[serde(default)] @@ -979,6 +987,7 @@ impl Default for GeneralConfig { fn default() -> Self { Self { data_path: None, + quota_state_path: default_quota_state_path(), config_strict: false, modes: ProxyModes::default(), prefer_ipv6: false, diff --git a/src/maestro/mod.rs b/src/maestro/mod.rs index 1792f21..7baebd8 100644 --- a/src/maestro/mod.rs +++ b/src/maestro/mod.rs @@ -417,6 +417,8 @@ async fn run_telemt_core( let stats = Arc::new(Stats::new()); stats.apply_telemetry_policy(TelemetryPolicy::from_config(&config.general.telemetry)); + let quota_state_path = config.general.quota_state_path.clone(); + crate::quota_state::load_quota_state("a_state_path, stats.as_ref()).await; let upstream_manager = Arc::new(UpstreamManager::new( config.upstreams.clone(), @@ -496,6 +498,7 @@ async fn run_telemt_core( let config_rx_api = api_config_rx.clone(); let admission_rx_api = admission_rx.clone(); let config_path_api = config_path.clone(); + let quota_state_path_api = quota_state_path.clone(); let startup_tracker_api = startup_tracker.clone(); let detected_ips_rx_api = detected_ips_rx.clone(); tokio::spawn(async move { @@ -509,6 +512,7 @@ async fn run_telemt_core( config_rx_api, admission_rx_api, config_path_api, + quota_state_path_api, detected_ips_rx_api, process_started_at_epoch_secs, startup_tracker_api, @@ -847,7 +851,7 @@ async fn run_telemt_core( max_connections.clone(), ); - shutdown::wait_for_shutdown(process_started_at, me_pool, stats).await; + shutdown::wait_for_shutdown(process_started_at, me_pool, stats, quota_state_path).await; Ok(()) } diff --git a/src/maestro/shutdown.rs b/src/maestro/shutdown.rs index f6e50ca..7b4a4ce 100644 --- a/src/maestro/shutdown.rs +++ b/src/maestro/shutdown.rs @@ -9,6 +9,7 @@ //! SIGHUP is handled separately in config/hot_reload.rs for config reload. use std::sync::Arc; +use std::path::PathBuf; use std::time::{Duration, Instant}; #[cfg(not(unix))] @@ -48,9 +49,10 @@ pub(crate) async fn wait_for_shutdown( process_started_at: Instant, me_pool: Option>, stats: Arc, + quota_state_path: PathBuf, ) { let signal = wait_for_shutdown_signal().await; - perform_shutdown(signal, process_started_at, me_pool, &stats).await; + perform_shutdown(signal, process_started_at, me_pool, &stats, quota_state_path).await; } /// Waits for any shutdown signal (SIGINT, SIGTERM, SIGQUIT). @@ -79,6 +81,7 @@ async fn perform_shutdown( process_started_at: Instant, me_pool: Option>, stats: &Stats, + quota_state_path: PathBuf, ) { let shutdown_started_at = Instant::now(); info!(signal = %signal, "Received shutdown signal"); @@ -109,6 +112,22 @@ async fn perform_shutdown( } } + match crate::quota_state::save_quota_state("a_state_path, stats).await { + Ok(()) => { + info!( + path = %quota_state_path.display(), + "Persisted per-user quota state" + ); + } + Err(error) => { + warn!( + error = %error, + path = %quota_state_path.display(), + "Failed to persist per-user quota state" + ); + } + } + let shutdown_secs = shutdown_started_at.elapsed().as_secs(); info!( "Shutdown completed successfully in {} {}.", diff --git a/src/main.rs b/src/main.rs index ce6b943..7d0b377 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,6 +25,7 @@ mod metrics; mod network; mod protocol; mod proxy; +mod quota_state; mod service; mod startup; mod stats; diff --git a/src/quota_state.rs b/src/quota_state.rs new file mode 100644 index 0000000..74abce8 --- /dev/null +++ b/src/quota_state.rs @@ -0,0 +1,114 @@ +use std::collections::BTreeMap; +use std::path::Path; +use std::time::{SystemTime, UNIX_EPOCH}; + +use serde::{Deserialize, Serialize}; +use tokio::io::AsyncWriteExt; +use tracing::{info, warn}; + +use crate::stats::{Stats, UserQuotaSnapshot}; + +#[derive(Debug, Default, Serialize, Deserialize)] +pub(crate) struct QuotaStateFile { + pub(crate) last_reset_epoch_secs: u64, + pub(crate) users: BTreeMap, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub(crate) struct QuotaUserState { + pub(crate) used_bytes: u64, + pub(crate) last_reset_epoch_secs: u64, +} + +fn now_epoch_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +pub(crate) async fn load_quota_state(path: &Path, stats: &Stats) { + let bytes = match tokio::fs::read(path).await { + Ok(bytes) => bytes, + Err(error) if error.kind() == std::io::ErrorKind::NotFound => return, + Err(error) => { + warn!( + error = %error, + path = %path.display(), + "Failed to read quota state file" + ); + return; + } + }; + + let state = match serde_json::from_slice::(&bytes) { + Ok(state) => state, + Err(error) => { + warn!( + error = %error, + path = %path.display(), + "Failed to parse quota state file" + ); + return; + } + }; + + let loaded_users = state.users.len(); + for (user, quota) in state.users { + stats.load_user_quota_state(&user, quota.used_bytes, quota.last_reset_epoch_secs); + } + info!( + path = %path.display(), + loaded_users, + "Loaded per-user quota state" + ); +} + +pub(crate) async fn save_quota_state(path: &Path, stats: &Stats) -> std::io::Result<()> { + let mut users = BTreeMap::new(); + let mut last_reset_epoch_secs = 0; + for (user, quota) in stats.user_quota_snapshot() { + last_reset_epoch_secs = last_reset_epoch_secs.max(quota.last_reset_epoch_secs); + users.insert(user, quota_user_state(quota)); + } + + let state = QuotaStateFile { + last_reset_epoch_secs, + users, + }; + write_state_file(path, &state).await +} + +pub(crate) async fn reset_user_quota( + path: &Path, + stats: &Stats, + user: &str, +) -> std::io::Result { + let snapshot = stats.reset_user_quota(user); + save_quota_state(path, stats).await?; + Ok(snapshot) +} + +async fn write_state_file(path: &Path, state: &QuotaStateFile) -> std::io::Result<()> { + if let Some(parent) = path.parent() + && !parent.as_os_str().is_empty() + { + tokio::fs::create_dir_all(parent).await?; + } + + let tmp_path = path.with_extension(format!("tmp.{}", now_epoch_secs())); + let payload = serde_json::to_vec_pretty(state)?; + let mut file = tokio::fs::File::create(&tmp_path).await?; + file.write_all(&payload).await?; + file.write_all(b"\n").await?; + file.sync_all().await?; + drop(file); + tokio::fs::rename(&tmp_path, path).await +} + +fn quota_user_state(quota: UserQuotaSnapshot) -> QuotaUserState { + QuotaUserState { + used_bytes: quota.used_bytes, + last_reset_epoch_secs: quota.last_reset_epoch_secs, + } +} diff --git a/src/stats/mod.rs b/src/stats/mod.rs index cff9571..792ea8d 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -8,7 +8,7 @@ pub mod telemetry; use dashmap::DashMap; use lru::LruCache; use parking_lot::Mutex; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; use std::num::NonZeroUsize; @@ -297,9 +297,16 @@ pub struct UserStats { /// This counter is the single source of truth for quota enforcement and /// intentionally tracks attempted traffic, not guaranteed delivery. pub quota_used: AtomicU64, + pub quota_last_reset_epoch_secs: AtomicU64, pub last_seen_epoch_secs: AtomicU64, } +#[derive(Debug, Clone)] +pub struct UserQuotaSnapshot { + pub used_bytes: u64, + pub last_reset_epoch_secs: u64, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum QuotaReserveError { LimitExceeded, @@ -2408,6 +2415,52 @@ impl Stats { .unwrap_or(0) } + pub fn load_user_quota_state( + &self, + user: &str, + used_bytes: u64, + last_reset_epoch_secs: u64, + ) { + let stats = self.get_or_create_user_stats_handle(user); + stats.quota_used.store(used_bytes, Ordering::Relaxed); + stats + .quota_last_reset_epoch_secs + .store(last_reset_epoch_secs, Ordering::Relaxed); + } + + pub fn reset_user_quota(&self, user: &str) -> UserQuotaSnapshot { + let stats = self.get_or_create_user_stats_handle(user); + let last_reset_epoch_secs = Self::now_epoch_secs(); + stats.quota_used.store(0, Ordering::Relaxed); + stats + .quota_last_reset_epoch_secs + .store(last_reset_epoch_secs, Ordering::Relaxed); + UserQuotaSnapshot { + used_bytes: 0, + last_reset_epoch_secs, + } + } + + pub fn user_quota_snapshot(&self) -> HashMap { + let mut out = HashMap::new(); + for entry in self.user_stats.iter() { + let stats = entry.value(); + let used_bytes = stats.quota_used.load(Ordering::Relaxed); + let last_reset_epoch_secs = stats.quota_last_reset_epoch_secs.load(Ordering::Relaxed); + if used_bytes == 0 && last_reset_epoch_secs == 0 { + continue; + } + out.insert( + entry.key().clone(), + UserQuotaSnapshot { + used_bytes, + last_reset_epoch_secs, + }, + ); + } + out + } + pub fn get_handshake_timeouts(&self) -> u64 { self.handshake_timeouts.load(Ordering::Relaxed) }