Update mod.rs

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-04 01:41:43 +03:00
parent 349bbbb8fa
commit 3492566842
No known key found for this signature in database
1 changed files with 102 additions and 2 deletions

View File

@ -3,6 +3,7 @@ use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use http_body_util::{BodyExt, Full}; use http_body_util::{BodyExt, Full};
use hyper::body::{Bytes, Incoming}; use hyper::body::{Bytes, Incoming};
@ -19,6 +20,7 @@ use tracing::{debug, info, warn};
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
use crate::ip_tracker::UserIpTracker; use crate::ip_tracker::UserIpTracker;
use crate::stats::Stats; use crate::stats::Stats;
use crate::transport::middle_proxy::MePool;
mod config_store; mod config_store;
mod model; mod model;
@ -26,8 +28,9 @@ mod users;
use config_store::{current_revision, parse_if_match}; use config_store::{current_revision, parse_if_match};
use model::{ use model::{
ApiFailure, CreateUserRequest, ErrorBody, ErrorResponse, HealthData, PatchUserRequest, ApiFailure, CreateUserRequest, DcStatus, DcStatusData, ErrorBody, ErrorResponse, HealthData,
RotateSecretRequest, SuccessResponse, SummaryData, MeWriterStatus, MeWritersData, MeWritersSummary, PatchUserRequest, RotateSecretRequest,
SuccessResponse, SummaryData,
}; };
use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config}; use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config};
@ -35,6 +38,7 @@ use users::{create_user, delete_user, patch_user, rotate_secret, users_from_conf
pub(super) struct ApiShared { pub(super) struct ApiShared {
pub(super) stats: Arc<Stats>, pub(super) stats: Arc<Stats>,
pub(super) ip_tracker: Arc<UserIpTracker>, pub(super) ip_tracker: Arc<UserIpTracker>,
pub(super) me_pool: Option<Arc<MePool>>,
pub(super) config_path: PathBuf, pub(super) config_path: PathBuf,
pub(super) mutation_lock: Arc<Mutex<()>>, pub(super) mutation_lock: Arc<Mutex<()>>,
pub(super) request_id: Arc<AtomicU64>, pub(super) request_id: Arc<AtomicU64>,
@ -50,6 +54,7 @@ pub async fn serve(
listen: SocketAddr, listen: SocketAddr,
stats: Arc<Stats>, stats: Arc<Stats>,
ip_tracker: Arc<UserIpTracker>, ip_tracker: Arc<UserIpTracker>,
me_pool: Option<Arc<MePool>>,
config_rx: watch::Receiver<Arc<ProxyConfig>>, config_rx: watch::Receiver<Arc<ProxyConfig>>,
config_path: PathBuf, config_path: PathBuf,
) { ) {
@ -70,6 +75,7 @@ pub async fn serve(
let shared = Arc::new(ApiShared { let shared = Arc::new(ApiShared {
stats, stats,
ip_tracker, ip_tracker,
me_pool,
config_path, config_path,
mutation_lock: Arc::new(Mutex::new(())), mutation_lock: Arc::new(Mutex::new(())),
request_id: Arc::new(AtomicU64::new(1)), request_id: Arc::new(AtomicU64::new(1)),
@ -179,6 +185,93 @@ async fn handle(
}; };
Ok(success_response(StatusCode::OK, data, revision)) Ok(success_response(StatusCode::OK, data, revision))
} }
("GET", "/v1/stats/me-writers") => {
let revision = current_revision(&shared.config_path).await?;
let data = match &shared.me_pool {
Some(pool) => {
let snapshot = pool.api_status_snapshot().await;
let writers = snapshot
.writers
.into_iter()
.map(|entry| MeWriterStatus {
writer_id: entry.writer_id,
dc: entry.dc,
endpoint: entry.endpoint.to_string(),
generation: entry.generation,
state: entry.state,
draining: entry.draining,
degraded: entry.degraded,
bound_clients: entry.bound_clients,
idle_for_secs: entry.idle_for_secs,
rtt_ema_ms: entry.rtt_ema_ms,
})
.collect();
MeWritersData {
middle_proxy_enabled: true,
generated_at_epoch_secs: snapshot.generated_at_epoch_secs,
summary: MeWritersSummary {
configured_dc_groups: snapshot.configured_dc_groups,
configured_endpoints: snapshot.configured_endpoints,
available_endpoints: snapshot.available_endpoints,
available_pct: snapshot.available_pct,
required_writers: snapshot.required_writers,
alive_writers: snapshot.alive_writers,
coverage_pct: snapshot.coverage_pct,
},
writers,
}
}
None => MeWritersData {
middle_proxy_enabled: false,
generated_at_epoch_secs: now_epoch_secs(),
summary: MeWritersSummary {
configured_dc_groups: 0,
configured_endpoints: 0,
available_endpoints: 0,
available_pct: 0.0,
required_writers: 0,
alive_writers: 0,
coverage_pct: 0.0,
},
writers: Vec::new(),
},
};
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/stats/dcs") => {
let revision = current_revision(&shared.config_path).await?;
let data = match &shared.me_pool {
Some(pool) => {
let snapshot = pool.api_status_snapshot().await;
let dcs = snapshot
.dcs
.into_iter()
.map(|entry| DcStatus {
dc: entry.dc,
endpoints: entry.endpoints.into_iter().map(|value| value.to_string()).collect(),
available_endpoints: entry.available_endpoints,
available_pct: entry.available_pct,
required_writers: entry.required_writers,
alive_writers: entry.alive_writers,
coverage_pct: entry.coverage_pct,
rtt_ms: entry.rtt_ms,
load: entry.load,
})
.collect();
DcStatusData {
middle_proxy_enabled: true,
generated_at_epoch_secs: snapshot.generated_at_epoch_secs,
dcs,
}
}
None => DcStatusData {
middle_proxy_enabled: false,
generated_at_epoch_secs: now_epoch_secs(),
dcs: Vec::new(),
},
};
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/stats/users") | ("GET", "/v1/users") => { ("GET", "/v1/stats/users") | ("GET", "/v1/users") => {
let revision = current_revision(&shared.config_path).await?; let revision = current_revision(&shared.config_path).await?;
let users = users_from_config(&cfg, &shared.stats, &shared.ip_tracker).await; let users = users_from_config(&cfg, &shared.stats, &shared.ip_tracker).await;
@ -304,6 +397,13 @@ async fn handle(
} }
} }
fn now_epoch_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
fn success_response<T: Serialize>( fn success_response<T: Serialize>(
status: StatusCode, status: StatusCode,
data: T, data: T,