From 3492566842853d9a7e1912900e870ac3586cec31 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 4 Mar 2026 01:41:43 +0300 Subject: [PATCH] Update mod.rs Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/api/mod.rs | 104 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 102 insertions(+), 2 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index c13828e..56fb588 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; use http_body_util::{BodyExt, Full}; use hyper::body::{Bytes, Incoming}; @@ -19,6 +20,7 @@ use tracing::{debug, info, warn}; use crate::config::ProxyConfig; use crate::ip_tracker::UserIpTracker; use crate::stats::Stats; +use crate::transport::middle_proxy::MePool; mod config_store; mod model; @@ -26,8 +28,9 @@ mod users; use config_store::{current_revision, parse_if_match}; use model::{ - ApiFailure, CreateUserRequest, ErrorBody, ErrorResponse, HealthData, PatchUserRequest, - RotateSecretRequest, SuccessResponse, SummaryData, + ApiFailure, CreateUserRequest, DcStatus, DcStatusData, ErrorBody, ErrorResponse, HealthData, + MeWriterStatus, MeWritersData, MeWritersSummary, PatchUserRequest, RotateSecretRequest, + SuccessResponse, SummaryData, }; use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config}; @@ -35,6 +38,7 @@ use users::{create_user, delete_user, patch_user, rotate_secret, users_from_conf pub(super) struct ApiShared { pub(super) stats: Arc, pub(super) ip_tracker: Arc, + pub(super) me_pool: Option>, pub(super) config_path: PathBuf, pub(super) mutation_lock: Arc>, pub(super) request_id: Arc, @@ -50,6 +54,7 @@ pub async fn serve( listen: SocketAddr, stats: Arc, ip_tracker: Arc, + me_pool: Option>, config_rx: watch::Receiver>, config_path: PathBuf, ) { @@ -70,6 +75,7 @@ pub async fn serve( let shared = Arc::new(ApiShared { stats, ip_tracker, + me_pool, config_path, mutation_lock: Arc::new(Mutex::new(())), request_id: Arc::new(AtomicU64::new(1)), @@ -179,6 +185,93 @@ async fn handle( }; Ok(success_response(StatusCode::OK, data, revision)) } + ("GET", "/v1/stats/me-writers") => { + let revision = current_revision(&shared.config_path).await?; + let data = match &shared.me_pool { + Some(pool) => { + let snapshot = pool.api_status_snapshot().await; + let writers = snapshot + .writers + .into_iter() + .map(|entry| MeWriterStatus { + writer_id: entry.writer_id, + dc: entry.dc, + endpoint: entry.endpoint.to_string(), + generation: entry.generation, + state: entry.state, + draining: entry.draining, + degraded: entry.degraded, + bound_clients: entry.bound_clients, + idle_for_secs: entry.idle_for_secs, + rtt_ema_ms: entry.rtt_ema_ms, + }) + .collect(); + MeWritersData { + middle_proxy_enabled: true, + generated_at_epoch_secs: snapshot.generated_at_epoch_secs, + summary: MeWritersSummary { + configured_dc_groups: snapshot.configured_dc_groups, + configured_endpoints: snapshot.configured_endpoints, + available_endpoints: snapshot.available_endpoints, + available_pct: snapshot.available_pct, + required_writers: snapshot.required_writers, + alive_writers: snapshot.alive_writers, + coverage_pct: snapshot.coverage_pct, + }, + writers, + } + } + None => MeWritersData { + middle_proxy_enabled: false, + generated_at_epoch_secs: now_epoch_secs(), + summary: MeWritersSummary { + configured_dc_groups: 0, + configured_endpoints: 0, + available_endpoints: 0, + available_pct: 0.0, + required_writers: 0, + alive_writers: 0, + coverage_pct: 0.0, + }, + writers: Vec::new(), + }, + }; + Ok(success_response(StatusCode::OK, data, revision)) + } + ("GET", "/v1/stats/dcs") => { + let revision = current_revision(&shared.config_path).await?; + let data = match &shared.me_pool { + Some(pool) => { + let snapshot = pool.api_status_snapshot().await; + let dcs = snapshot + .dcs + .into_iter() + .map(|entry| DcStatus { + dc: entry.dc, + endpoints: entry.endpoints.into_iter().map(|value| value.to_string()).collect(), + available_endpoints: entry.available_endpoints, + available_pct: entry.available_pct, + required_writers: entry.required_writers, + alive_writers: entry.alive_writers, + coverage_pct: entry.coverage_pct, + rtt_ms: entry.rtt_ms, + load: entry.load, + }) + .collect(); + DcStatusData { + middle_proxy_enabled: true, + generated_at_epoch_secs: snapshot.generated_at_epoch_secs, + dcs, + } + } + None => DcStatusData { + middle_proxy_enabled: false, + generated_at_epoch_secs: now_epoch_secs(), + dcs: Vec::new(), + }, + }; + Ok(success_response(StatusCode::OK, data, revision)) + } ("GET", "/v1/stats/users") | ("GET", "/v1/users") => { let revision = current_revision(&shared.config_path).await?; let users = users_from_config(&cfg, &shared.stats, &shared.ip_tracker).await; @@ -304,6 +397,13 @@ async fn handle( } } +fn now_epoch_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + fn success_response( status: StatusCode, data: T,