mirror of
https://github.com/telemt/telemt.git
synced 2026-05-22 19:51:43 +03:00
Compare commits
2 Commits
658a565cb3
...
f0f2bc0482
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f0f2bc0482 | ||
|
|
86573be493 |
@@ -43,7 +43,7 @@ use events::ApiEventStore;
|
||||
use http_utils::{error_response, read_json, read_optional_json, success_response};
|
||||
use model::{
|
||||
ApiFailure, ClassCount, CreateUserRequest, DeleteUserResponse, HealthData, HealthReadyData,
|
||||
PatchUserRequest, RotateSecretRequest, SummaryData, UserActiveIps,
|
||||
PatchUserRequest, ResetUserQuotaResponse, RotateSecretRequest, SummaryData, UserActiveIps,
|
||||
};
|
||||
use runtime_edge::{
|
||||
EdgeConnectionsCacheEntry, build_runtime_connections_summary_data,
|
||||
@@ -80,6 +80,7 @@ pub(super) struct ApiShared {
|
||||
pub(super) me_pool: Arc<RwLock<Option<Arc<MePool>>>>,
|
||||
pub(super) upstream_manager: Arc<UpstreamManager>,
|
||||
pub(super) config_path: PathBuf,
|
||||
pub(super) quota_state_path: PathBuf,
|
||||
pub(super) detected_ips_rx: watch::Receiver<(Option<IpAddr>, Option<IpAddr>)>,
|
||||
pub(super) mutation_lock: Arc<Mutex<()>>,
|
||||
pub(super) minimal_cache: Arc<Mutex<Option<MinimalCacheEntry>>>,
|
||||
@@ -112,6 +113,7 @@ pub async fn serve(
|
||||
config_rx: watch::Receiver<Arc<ProxyConfig>>,
|
||||
admission_rx: watch::Receiver<bool>,
|
||||
config_path: PathBuf,
|
||||
quota_state_path: PathBuf,
|
||||
detected_ips_rx: watch::Receiver<(Option<IpAddr>, Option<IpAddr>)>,
|
||||
process_started_at_epoch_secs: u64,
|
||||
startup_tracker: Arc<StartupTracker>,
|
||||
@@ -143,6 +145,7 @@ pub async fn serve(
|
||||
me_pool,
|
||||
upstream_manager,
|
||||
config_path,
|
||||
quota_state_path,
|
||||
detected_ips_rx,
|
||||
mutation_lock: Arc::new(Mutex::new(())),
|
||||
minimal_cache: Arc::new(Mutex::new(None)),
|
||||
@@ -491,6 +494,56 @@ async fn handle(
|
||||
Ok(success_response(status, data, revision))
|
||||
}
|
||||
_ => {
|
||||
if method == Method::POST
|
||||
&& let Some(user) = normalized_path
|
||||
.strip_prefix("/v1/users/")
|
||||
.and_then(|path| path.strip_suffix("/reset-quota"))
|
||||
&& !user.is_empty()
|
||||
&& !user.contains('/')
|
||||
{
|
||||
if api_cfg.read_only {
|
||||
return Ok(error_response(
|
||||
request_id,
|
||||
ApiFailure::new(
|
||||
StatusCode::FORBIDDEN,
|
||||
"read_only",
|
||||
"API runs in read-only mode",
|
||||
),
|
||||
));
|
||||
}
|
||||
let snapshot = match crate::quota_state::reset_user_quota(
|
||||
&shared.quota_state_path,
|
||||
shared.stats.as_ref(),
|
||||
user,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(snapshot) => snapshot,
|
||||
Err(error) => {
|
||||
shared.runtime_events.record(
|
||||
"api.user.reset_quota.failed",
|
||||
format!("username={} error={}", user, error),
|
||||
);
|
||||
return Err(ApiFailure::internal(format!(
|
||||
"Failed to reset user quota: {}",
|
||||
error
|
||||
)));
|
||||
}
|
||||
};
|
||||
shared
|
||||
.runtime_events
|
||||
.record("api.user.reset_quota.ok", format!("username={}", user));
|
||||
let revision = current_revision(&shared.config_path).await?;
|
||||
return Ok(success_response(
|
||||
StatusCode::OK,
|
||||
ResetUserQuotaResponse {
|
||||
username: user.to_string(),
|
||||
used_bytes: snapshot.used_bytes,
|
||||
last_reset_epoch_secs: snapshot.last_reset_epoch_secs,
|
||||
},
|
||||
revision,
|
||||
));
|
||||
}
|
||||
if let Some(user) = normalized_path.strip_prefix("/v1/users/")
|
||||
&& !user.is_empty()
|
||||
&& !user.contains('/')
|
||||
|
||||
@@ -501,6 +501,13 @@ pub(super) struct DeleteUserResponse {
|
||||
pub(super) in_runtime: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(super) struct ResetUserQuotaResponse {
|
||||
pub(super) username: String,
|
||||
pub(super) used_bytes: u64,
|
||||
pub(super) last_reset_epoch_secs: u64,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub(super) struct CreateUserRequest {
|
||||
pub(super) username: String,
|
||||
|
||||
@@ -43,6 +43,7 @@ const TOP_LEVEL_CONFIG_KEYS: &[&str] = &[
|
||||
|
||||
const GENERAL_CONFIG_KEYS: &[&str] = &[
|
||||
"data_path",
|
||||
"quota_state_path",
|
||||
"config_strict",
|
||||
"modes",
|
||||
"prefer_ipv6",
|
||||
|
||||
@@ -26,6 +26,10 @@ pub enum LogLevel {
|
||||
Silent,
|
||||
}
|
||||
|
||||
fn default_quota_state_path() -> PathBuf {
|
||||
PathBuf::from("telemt.limit.json")
|
||||
}
|
||||
|
||||
impl LogLevel {
|
||||
/// Convert to tracing EnvFilter directive string.
|
||||
pub fn to_filter_str(&self) -> &'static str {
|
||||
@@ -375,6 +379,10 @@ pub struct GeneralConfig {
|
||||
#[serde(default)]
|
||||
pub data_path: Option<PathBuf>,
|
||||
|
||||
/// JSON state file for runtime per-user quota consumption.
|
||||
#[serde(default = "default_quota_state_path")]
|
||||
pub quota_state_path: PathBuf,
|
||||
|
||||
/// Reject unknown TOML config keys during load.
|
||||
/// Startup fails fast; hot-reload rejects the new snapshot and keeps the current config.
|
||||
#[serde(default)]
|
||||
@@ -979,6 +987,7 @@ impl Default for GeneralConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
data_path: None,
|
||||
quota_state_path: default_quota_state_path(),
|
||||
config_strict: false,
|
||||
modes: ProxyModes::default(),
|
||||
prefer_ipv6: false,
|
||||
|
||||
@@ -17,6 +17,7 @@ pub(crate) async fn configure_admission_gate(
|
||||
route_runtime: Arc<RouteRuntimeController>,
|
||||
admission_tx: &watch::Sender<bool>,
|
||||
config_rx: watch::Receiver<Arc<ProxyConfig>>,
|
||||
me_ready_rx: watch::Receiver<u64>,
|
||||
) {
|
||||
if config.general.use_middle_proxy {
|
||||
if let Some(pool) = me_pool.as_ref() {
|
||||
@@ -52,6 +53,7 @@ pub(crate) async fn configure_admission_gate(
|
||||
let admission_tx_gate = admission_tx.clone();
|
||||
let route_runtime_gate = route_runtime.clone();
|
||||
let mut config_rx_gate = config_rx.clone();
|
||||
let mut me_ready_rx_gate = me_ready_rx;
|
||||
let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1);
|
||||
tokio::spawn(async move {
|
||||
let mut gate_open = initial_gate_open;
|
||||
@@ -74,6 +76,11 @@ pub(crate) async fn configure_admission_gate(
|
||||
fast_fallback_enabled = cfg.general.me2dc_fallback && cfg.general.me2dc_fast;
|
||||
continue;
|
||||
}
|
||||
changed = me_ready_rx_gate.changed() => {
|
||||
if changed.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {}
|
||||
}
|
||||
let ready = pool_for_gate.admission_ready_conditional_cast().await;
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{RwLock, watch};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::config::ProxyConfig;
|
||||
@@ -29,6 +29,7 @@ pub(crate) async fn initialize_me_pool(
|
||||
rng: Arc<SecureRandom>,
|
||||
stats: Arc<Stats>,
|
||||
api_me_pool: Arc<RwLock<Option<Arc<MePool>>>>,
|
||||
me_ready_tx: watch::Sender<u64>,
|
||||
) -> Option<Arc<MePool>> {
|
||||
if !use_middle_proxy {
|
||||
return None;
|
||||
@@ -314,6 +315,7 @@ pub(crate) async fn initialize_me_pool(
|
||||
let pool_bg = pool.clone();
|
||||
let rng_bg = rng.clone();
|
||||
let startup_tracker_bg = startup_tracker.clone();
|
||||
let me_ready_tx_bg = me_ready_tx.clone();
|
||||
let retry_limit = if me_init_retry_attempts == 0 {
|
||||
String::from("unlimited")
|
||||
} else {
|
||||
@@ -347,6 +349,9 @@ pub(crate) async fn initialize_me_pool(
|
||||
startup_tracker_bg
|
||||
.set_me_status(StartupMeStatus::Ready, "ready")
|
||||
.await;
|
||||
me_ready_tx_bg.send_modify(|version| {
|
||||
*version = version.saturating_add(1);
|
||||
});
|
||||
info!(
|
||||
attempt = init_attempt,
|
||||
"Middle-End pool initialized successfully"
|
||||
@@ -474,6 +479,9 @@ pub(crate) async fn initialize_me_pool(
|
||||
startup_tracker
|
||||
.set_me_status(StartupMeStatus::Ready, "ready")
|
||||
.await;
|
||||
me_ready_tx.send_modify(|version| {
|
||||
*version = version.saturating_add(1);
|
||||
});
|
||||
info!(
|
||||
attempt = init_attempt,
|
||||
"Middle-End pool initialized successfully"
|
||||
|
||||
@@ -417,6 +417,8 @@ async fn run_telemt_core(
|
||||
|
||||
let stats = Arc::new(Stats::new());
|
||||
stats.apply_telemetry_policy(TelemetryPolicy::from_config(&config.general.telemetry));
|
||||
let quota_state_path = config.general.quota_state_path.clone();
|
||||
crate::quota_state::load_quota_state("a_state_path, stats.as_ref()).await;
|
||||
|
||||
let upstream_manager = Arc::new(UpstreamManager::new(
|
||||
config.upstreams.clone(),
|
||||
@@ -496,6 +498,7 @@ async fn run_telemt_core(
|
||||
let config_rx_api = api_config_rx.clone();
|
||||
let admission_rx_api = admission_rx.clone();
|
||||
let config_path_api = config_path.clone();
|
||||
let quota_state_path_api = quota_state_path.clone();
|
||||
let startup_tracker_api = startup_tracker.clone();
|
||||
let detected_ips_rx_api = detected_ips_rx.clone();
|
||||
tokio::spawn(async move {
|
||||
@@ -509,6 +512,7 @@ async fn run_telemt_core(
|
||||
config_rx_api,
|
||||
admission_rx_api,
|
||||
config_path_api,
|
||||
quota_state_path_api,
|
||||
detected_ips_rx_api,
|
||||
process_started_at_epoch_secs,
|
||||
startup_tracker_api,
|
||||
@@ -660,6 +664,8 @@ async fn run_telemt_core(
|
||||
.await;
|
||||
}
|
||||
|
||||
let (me_ready_tx, me_ready_rx) = watch::channel(0_u64);
|
||||
|
||||
let me_pool: Option<Arc<MePool>> = me_startup::initialize_me_pool(
|
||||
use_middle_proxy,
|
||||
&config,
|
||||
@@ -670,6 +676,7 @@ async fn run_telemt_core(
|
||||
rng.clone(),
|
||||
stats.clone(),
|
||||
api_me_pool.clone(),
|
||||
me_ready_tx.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -743,6 +750,7 @@ async fn run_telemt_core(
|
||||
api_config_tx.clone(),
|
||||
me_pool.clone(),
|
||||
shared_state.clone(),
|
||||
me_ready_tx.clone(),
|
||||
)
|
||||
.await;
|
||||
let config_rx = runtime_watches.config_rx;
|
||||
@@ -756,6 +764,7 @@ async fn run_telemt_core(
|
||||
route_runtime.clone(),
|
||||
&admission_tx,
|
||||
config_rx.clone(),
|
||||
me_ready_rx,
|
||||
)
|
||||
.await;
|
||||
let _admission_tx_hold = admission_tx;
|
||||
@@ -842,7 +851,7 @@ async fn run_telemt_core(
|
||||
max_connections.clone(),
|
||||
);
|
||||
|
||||
shutdown::wait_for_shutdown(process_started_at, me_pool, stats).await;
|
||||
shutdown::wait_for_shutdown(process_started_at, me_pool, stats, quota_state_path).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -53,6 +53,7 @@ pub(crate) async fn spawn_runtime_tasks(
|
||||
api_config_tx: watch::Sender<Arc<ProxyConfig>>,
|
||||
me_pool_for_policy: Option<Arc<MePool>>,
|
||||
shared_state: Arc<ProxySharedState>,
|
||||
me_ready_tx: watch::Sender<u64>,
|
||||
) -> RuntimeWatches {
|
||||
let um_clone = upstream_manager.clone();
|
||||
let dc_overrides_for_health = config.dc_overrides.clone();
|
||||
@@ -250,12 +251,14 @@ pub(crate) async fn spawn_runtime_tasks(
|
||||
let pool_clone_sched = pool.clone();
|
||||
let rng_clone_sched = rng.clone();
|
||||
let config_rx_clone_sched = config_rx.clone();
|
||||
let me_ready_tx_sched = me_ready_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
crate::transport::middle_proxy::me_reinit_scheduler(
|
||||
pool_clone_sched,
|
||||
rng_clone_sched,
|
||||
config_rx_clone_sched,
|
||||
reinit_rx,
|
||||
me_ready_tx_sched,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
//! SIGHUP is handled separately in config/hot_reload.rs for config reload.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::path::PathBuf;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[cfg(not(unix))]
|
||||
@@ -48,9 +49,10 @@ pub(crate) async fn wait_for_shutdown(
|
||||
process_started_at: Instant,
|
||||
me_pool: Option<Arc<MePool>>,
|
||||
stats: Arc<Stats>,
|
||||
quota_state_path: PathBuf,
|
||||
) {
|
||||
let signal = wait_for_shutdown_signal().await;
|
||||
perform_shutdown(signal, process_started_at, me_pool, &stats).await;
|
||||
perform_shutdown(signal, process_started_at, me_pool, &stats, quota_state_path).await;
|
||||
}
|
||||
|
||||
/// Waits for any shutdown signal (SIGINT, SIGTERM, SIGQUIT).
|
||||
@@ -79,6 +81,7 @@ async fn perform_shutdown(
|
||||
process_started_at: Instant,
|
||||
me_pool: Option<Arc<MePool>>,
|
||||
stats: &Stats,
|
||||
quota_state_path: PathBuf,
|
||||
) {
|
||||
let shutdown_started_at = Instant::now();
|
||||
info!(signal = %signal, "Received shutdown signal");
|
||||
@@ -109,6 +112,22 @@ async fn perform_shutdown(
|
||||
}
|
||||
}
|
||||
|
||||
match crate::quota_state::save_quota_state("a_state_path, stats).await {
|
||||
Ok(()) => {
|
||||
info!(
|
||||
path = %quota_state_path.display(),
|
||||
"Persisted per-user quota state"
|
||||
);
|
||||
}
|
||||
Err(error) => {
|
||||
warn!(
|
||||
error = %error,
|
||||
path = %quota_state_path.display(),
|
||||
"Failed to persist per-user quota state"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let shutdown_secs = shutdown_started_at.elapsed().as_secs();
|
||||
info!(
|
||||
"Shutdown completed successfully in {} {}.",
|
||||
|
||||
@@ -25,6 +25,7 @@ mod metrics;
|
||||
mod network;
|
||||
mod protocol;
|
||||
mod proxy;
|
||||
mod quota_state;
|
||||
mod service;
|
||||
mod startup;
|
||||
mod stats;
|
||||
|
||||
114
src/quota_state.rs
Normal file
114
src/quota_state.rs
Normal file
@@ -0,0 +1,114 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::Path;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::stats::{Stats, UserQuotaSnapshot};
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
pub(crate) struct QuotaStateFile {
|
||||
pub(crate) last_reset_epoch_secs: u64,
|
||||
pub(crate) users: BTreeMap<String, QuotaUserState>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
pub(crate) struct QuotaUserState {
|
||||
pub(crate) used_bytes: u64,
|
||||
pub(crate) last_reset_epoch_secs: u64,
|
||||
}
|
||||
|
||||
fn now_epoch_secs() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
pub(crate) async fn load_quota_state(path: &Path, stats: &Stats) {
|
||||
let bytes = match tokio::fs::read(path).await {
|
||||
Ok(bytes) => bytes,
|
||||
Err(error) if error.kind() == std::io::ErrorKind::NotFound => return,
|
||||
Err(error) => {
|
||||
warn!(
|
||||
error = %error,
|
||||
path = %path.display(),
|
||||
"Failed to read quota state file"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let state = match serde_json::from_slice::<QuotaStateFile>(&bytes) {
|
||||
Ok(state) => state,
|
||||
Err(error) => {
|
||||
warn!(
|
||||
error = %error,
|
||||
path = %path.display(),
|
||||
"Failed to parse quota state file"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let loaded_users = state.users.len();
|
||||
for (user, quota) in state.users {
|
||||
stats.load_user_quota_state(&user, quota.used_bytes, quota.last_reset_epoch_secs);
|
||||
}
|
||||
info!(
|
||||
path = %path.display(),
|
||||
loaded_users,
|
||||
"Loaded per-user quota state"
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) async fn save_quota_state(path: &Path, stats: &Stats) -> std::io::Result<()> {
|
||||
let mut users = BTreeMap::new();
|
||||
let mut last_reset_epoch_secs = 0;
|
||||
for (user, quota) in stats.user_quota_snapshot() {
|
||||
last_reset_epoch_secs = last_reset_epoch_secs.max(quota.last_reset_epoch_secs);
|
||||
users.insert(user, quota_user_state(quota));
|
||||
}
|
||||
|
||||
let state = QuotaStateFile {
|
||||
last_reset_epoch_secs,
|
||||
users,
|
||||
};
|
||||
write_state_file(path, &state).await
|
||||
}
|
||||
|
||||
pub(crate) async fn reset_user_quota(
|
||||
path: &Path,
|
||||
stats: &Stats,
|
||||
user: &str,
|
||||
) -> std::io::Result<UserQuotaSnapshot> {
|
||||
let snapshot = stats.reset_user_quota(user);
|
||||
save_quota_state(path, stats).await?;
|
||||
Ok(snapshot)
|
||||
}
|
||||
|
||||
async fn write_state_file(path: &Path, state: &QuotaStateFile) -> std::io::Result<()> {
|
||||
if let Some(parent) = path.parent()
|
||||
&& !parent.as_os_str().is_empty()
|
||||
{
|
||||
tokio::fs::create_dir_all(parent).await?;
|
||||
}
|
||||
|
||||
let tmp_path = path.with_extension(format!("tmp.{}", now_epoch_secs()));
|
||||
let payload = serde_json::to_vec_pretty(state)?;
|
||||
let mut file = tokio::fs::File::create(&tmp_path).await?;
|
||||
file.write_all(&payload).await?;
|
||||
file.write_all(b"\n").await?;
|
||||
file.sync_all().await?;
|
||||
drop(file);
|
||||
tokio::fs::rename(&tmp_path, path).await
|
||||
}
|
||||
|
||||
fn quota_user_state(quota: UserQuotaSnapshot) -> QuotaUserState {
|
||||
QuotaUserState {
|
||||
used_bytes: quota.used_bytes,
|
||||
last_reset_epoch_secs: quota.last_reset_epoch_secs,
|
||||
}
|
||||
}
|
||||
@@ -8,7 +8,7 @@ pub mod telemetry;
|
||||
use dashmap::DashMap;
|
||||
use lru::LruCache;
|
||||
use parking_lot::Mutex;
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::num::NonZeroUsize;
|
||||
@@ -297,9 +297,16 @@ pub struct UserStats {
|
||||
/// This counter is the single source of truth for quota enforcement and
|
||||
/// intentionally tracks attempted traffic, not guaranteed delivery.
|
||||
pub quota_used: AtomicU64,
|
||||
pub quota_last_reset_epoch_secs: AtomicU64,
|
||||
pub last_seen_epoch_secs: AtomicU64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UserQuotaSnapshot {
|
||||
pub used_bytes: u64,
|
||||
pub last_reset_epoch_secs: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum QuotaReserveError {
|
||||
LimitExceeded,
|
||||
@@ -2408,6 +2415,52 @@ impl Stats {
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
pub fn load_user_quota_state(
|
||||
&self,
|
||||
user: &str,
|
||||
used_bytes: u64,
|
||||
last_reset_epoch_secs: u64,
|
||||
) {
|
||||
let stats = self.get_or_create_user_stats_handle(user);
|
||||
stats.quota_used.store(used_bytes, Ordering::Relaxed);
|
||||
stats
|
||||
.quota_last_reset_epoch_secs
|
||||
.store(last_reset_epoch_secs, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn reset_user_quota(&self, user: &str) -> UserQuotaSnapshot {
|
||||
let stats = self.get_or_create_user_stats_handle(user);
|
||||
let last_reset_epoch_secs = Self::now_epoch_secs();
|
||||
stats.quota_used.store(0, Ordering::Relaxed);
|
||||
stats
|
||||
.quota_last_reset_epoch_secs
|
||||
.store(last_reset_epoch_secs, Ordering::Relaxed);
|
||||
UserQuotaSnapshot {
|
||||
used_bytes: 0,
|
||||
last_reset_epoch_secs,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn user_quota_snapshot(&self) -> HashMap<String, UserQuotaSnapshot> {
|
||||
let mut out = HashMap::new();
|
||||
for entry in self.user_stats.iter() {
|
||||
let stats = entry.value();
|
||||
let used_bytes = stats.quota_used.load(Ordering::Relaxed);
|
||||
let last_reset_epoch_secs = stats.quota_last_reset_epoch_secs.load(Ordering::Relaxed);
|
||||
if used_bytes == 0 && last_reset_epoch_secs == 0 {
|
||||
continue;
|
||||
}
|
||||
out.insert(
|
||||
entry.key().clone(),
|
||||
UserQuotaSnapshot {
|
||||
used_bytes,
|
||||
last_reset_epoch_secs,
|
||||
},
|
||||
);
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
pub fn get_handshake_timeouts(&self) -> u64 {
|
||||
self.handshake_timeouts.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
@@ -365,7 +365,10 @@ impl MePool {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn zero_downtime_reinit_after_map_change(self: &Arc<Self>, rng: &SecureRandom) {
|
||||
pub async fn zero_downtime_reinit_after_map_change(
|
||||
self: &Arc<Self>,
|
||||
rng: &SecureRandom,
|
||||
) -> bool {
|
||||
let desired_by_dc = self.desired_dc_endpoints().await;
|
||||
let now_epoch_secs = Self::now_epoch_secs();
|
||||
let v4_suppressed = self.is_family_temporarily_suppressed(IpFamily::V4, now_epoch_secs);
|
||||
@@ -380,7 +383,7 @@ impl MePool {
|
||||
MeDrainGateReason::CoverageQuorum
|
||||
};
|
||||
self.set_last_drain_gate(false, false, reason, now_epoch_secs);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
let desired_map_hash = Self::desired_map_hash(&desired_by_dc);
|
||||
@@ -490,7 +493,7 @@ impl MePool {
|
||||
missing_dc = ?missing_dc,
|
||||
"ME reinit coverage below threshold; keeping stale writers"
|
||||
);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
if hardswap {
|
||||
@@ -520,7 +523,7 @@ impl MePool {
|
||||
missing_dc = ?fresh_missing_dc,
|
||||
"ME hardswap pending: fresh generation DC coverage incomplete"
|
||||
);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -567,7 +570,7 @@ impl MePool {
|
||||
self.clear_pending_hardswap_state();
|
||||
}
|
||||
debug!("ME reinit cycle completed with no stale writers");
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
|
||||
let drain_timeout = self.force_close_timeout();
|
||||
@@ -606,10 +609,11 @@ impl MePool {
|
||||
if hardswap {
|
||||
self.clear_pending_hardswap_state();
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
pub async fn zero_downtime_reinit_periodic(self: &Arc<Self>, rng: &SecureRandom) {
|
||||
self.zero_downtime_reinit_after_map_change(rng).await;
|
||||
pub async fn zero_downtime_reinit_periodic(self: &Arc<Self>, rng: &SecureRandom) -> bool {
|
||||
self.zero_downtime_reinit_after_map_change(rng).await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -47,6 +47,7 @@ pub async fn me_reinit_scheduler(
|
||||
rng: Arc<SecureRandom>,
|
||||
config_rx: watch::Receiver<Arc<ProxyConfig>>,
|
||||
mut trigger_rx: mpsc::Receiver<MeReinitTrigger>,
|
||||
me_ready_tx: watch::Sender<u64>,
|
||||
) {
|
||||
info!("ME reinit scheduler started");
|
||||
loop {
|
||||
@@ -90,15 +91,25 @@ pub async fn me_reinit_scheduler(
|
||||
|
||||
if cfg.general.me_reinit_singleflight {
|
||||
debug!(reason, "ME reinit scheduled (single-flight)");
|
||||
pool.zero_downtime_reinit_periodic(rng.as_ref()).await;
|
||||
if pool.zero_downtime_reinit_periodic(rng.as_ref()).await {
|
||||
me_ready_tx.send_modify(|version| {
|
||||
*version = version.saturating_add(1);
|
||||
});
|
||||
}
|
||||
} else {
|
||||
debug!(reason, "ME reinit scheduled (concurrent mode)");
|
||||
let pool_clone = pool.clone();
|
||||
let rng_clone = rng.clone();
|
||||
let me_ready_tx_clone = me_ready_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
pool_clone
|
||||
if pool_clone
|
||||
.zero_downtime_reinit_periodic(rng_clone.as_ref())
|
||||
.await;
|
||||
.await
|
||||
{
|
||||
me_ready_tx_clone.send_modify(|version| {
|
||||
*version = version.saturating_add(1);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user