From 716b4adef25ec5661821dad45dae27ffad8fea6d Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 4 Mar 2026 02:46:47 +0300 Subject: [PATCH] Runtime Stats in API Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/api/mod.rs | 111 +++++--------------------- src/api/model.rs | 200 +++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 216 insertions(+), 95 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index 56fb588..299d5a1 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -3,7 +3,6 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::{SystemTime, UNIX_EPOCH}; use http_body_util::{BodyExt, Full}; use hyper::body::{Bytes, Incoming}; @@ -24,13 +23,17 @@ use crate::transport::middle_proxy::MePool; mod config_store; mod model; +mod runtime_stats; mod users; use config_store::{current_revision, parse_if_match}; use model::{ - ApiFailure, CreateUserRequest, DcStatus, DcStatusData, ErrorBody, ErrorResponse, HealthData, - MeWriterStatus, MeWritersData, MeWritersSummary, PatchUserRequest, RotateSecretRequest, - SuccessResponse, SummaryData, + ApiFailure, CreateUserRequest, ErrorBody, ErrorResponse, HealthData, PatchUserRequest, + RotateSecretRequest, SuccessResponse, SummaryData, +}; +use runtime_stats::{ + MinimalCacheEntry, build_dcs_data, build_me_writers_data, build_minimal_all_data, + build_zero_all_data, }; use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config}; @@ -41,6 +44,7 @@ pub(super) struct ApiShared { pub(super) me_pool: Option>, pub(super) config_path: PathBuf, pub(super) mutation_lock: Arc>, + pub(super) minimal_cache: Arc>>, pub(super) request_id: Arc, } @@ -78,6 +82,7 @@ pub async fn serve( me_pool, config_path, mutation_lock: Arc::new(Mutex::new(())), + minimal_cache: Arc::new(Mutex::new(None)), request_id: Arc::new(AtomicU64::new(1)), }); @@ -185,91 +190,24 @@ async fn handle( }; Ok(success_response(StatusCode::OK, data, revision)) } + ("GET", "/v1/stats/zero/all") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_zero_all_data(&shared.stats, cfg.access.users.len()); + Ok(success_response(StatusCode::OK, data, revision)) + } + ("GET", "/v1/stats/minimal/all") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_minimal_all_data(shared.as_ref(), api_cfg).await; + Ok(success_response(StatusCode::OK, data, revision)) + } ("GET", "/v1/stats/me-writers") => { let revision = current_revision(&shared.config_path).await?; - let data = match &shared.me_pool { - Some(pool) => { - let snapshot = pool.api_status_snapshot().await; - let writers = snapshot - .writers - .into_iter() - .map(|entry| MeWriterStatus { - writer_id: entry.writer_id, - dc: entry.dc, - endpoint: entry.endpoint.to_string(), - generation: entry.generation, - state: entry.state, - draining: entry.draining, - degraded: entry.degraded, - bound_clients: entry.bound_clients, - idle_for_secs: entry.idle_for_secs, - rtt_ema_ms: entry.rtt_ema_ms, - }) - .collect(); - MeWritersData { - middle_proxy_enabled: true, - generated_at_epoch_secs: snapshot.generated_at_epoch_secs, - summary: MeWritersSummary { - configured_dc_groups: snapshot.configured_dc_groups, - configured_endpoints: snapshot.configured_endpoints, - available_endpoints: snapshot.available_endpoints, - available_pct: snapshot.available_pct, - required_writers: snapshot.required_writers, - alive_writers: snapshot.alive_writers, - coverage_pct: snapshot.coverage_pct, - }, - writers, - } - } - None => MeWritersData { - middle_proxy_enabled: false, - generated_at_epoch_secs: now_epoch_secs(), - summary: MeWritersSummary { - configured_dc_groups: 0, - configured_endpoints: 0, - available_endpoints: 0, - available_pct: 0.0, - required_writers: 0, - alive_writers: 0, - coverage_pct: 0.0, - }, - writers: Vec::new(), - }, - }; + let data = build_me_writers_data(shared.as_ref(), api_cfg).await; Ok(success_response(StatusCode::OK, data, revision)) } ("GET", "/v1/stats/dcs") => { let revision = current_revision(&shared.config_path).await?; - let data = match &shared.me_pool { - Some(pool) => { - let snapshot = pool.api_status_snapshot().await; - let dcs = snapshot - .dcs - .into_iter() - .map(|entry| DcStatus { - dc: entry.dc, - endpoints: entry.endpoints.into_iter().map(|value| value.to_string()).collect(), - available_endpoints: entry.available_endpoints, - available_pct: entry.available_pct, - required_writers: entry.required_writers, - alive_writers: entry.alive_writers, - coverage_pct: entry.coverage_pct, - rtt_ms: entry.rtt_ms, - load: entry.load, - }) - .collect(); - DcStatusData { - middle_proxy_enabled: true, - generated_at_epoch_secs: snapshot.generated_at_epoch_secs, - dcs, - } - } - None => DcStatusData { - middle_proxy_enabled: false, - generated_at_epoch_secs: now_epoch_secs(), - dcs: Vec::new(), - }, - }; + let data = build_dcs_data(shared.as_ref(), api_cfg).await; Ok(success_response(StatusCode::OK, data, revision)) } ("GET", "/v1/stats/users") | ("GET", "/v1/users") => { @@ -397,13 +335,6 @@ async fn handle( } } -fn now_epoch_secs() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_secs() -} - fn success_response( status: StatusCode, data: T, diff --git a/src/api/model.rs b/src/api/model.rs index 43d4173..8b2d279 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -65,7 +65,123 @@ pub(super) struct SummaryData { pub(super) configured_users: usize, } -#[derive(Serialize)] +#[derive(Serialize, Clone)] +pub(super) struct ZeroCodeCount { + pub(super) code: i32, + pub(super) total: u64, +} + +#[derive(Serialize, Clone)] +pub(super) struct ZeroCoreData { + pub(super) uptime_seconds: f64, + pub(super) connections_total: u64, + pub(super) connections_bad_total: u64, + pub(super) handshake_timeouts_total: u64, + pub(super) configured_users: usize, + pub(super) telemetry_core_enabled: bool, + pub(super) telemetry_user_enabled: bool, + pub(super) telemetry_me_level: String, +} + +#[derive(Serialize, Clone)] +pub(super) struct ZeroUpstreamData { + pub(super) connect_attempt_total: u64, + pub(super) connect_success_total: u64, + pub(super) connect_fail_total: u64, + pub(super) connect_failfast_hard_error_total: u64, + pub(super) connect_attempts_bucket_1: u64, + pub(super) connect_attempts_bucket_2: u64, + pub(super) connect_attempts_bucket_3_4: u64, + pub(super) connect_attempts_bucket_gt_4: u64, + pub(super) connect_duration_success_bucket_le_100ms: u64, + pub(super) connect_duration_success_bucket_101_500ms: u64, + pub(super) connect_duration_success_bucket_501_1000ms: u64, + pub(super) connect_duration_success_bucket_gt_1000ms: u64, + pub(super) connect_duration_fail_bucket_le_100ms: u64, + pub(super) connect_duration_fail_bucket_101_500ms: u64, + pub(super) connect_duration_fail_bucket_501_1000ms: u64, + pub(super) connect_duration_fail_bucket_gt_1000ms: u64, +} + +#[derive(Serialize, Clone)] +pub(super) struct ZeroMiddleProxyData { + pub(super) keepalive_sent_total: u64, + pub(super) keepalive_failed_total: u64, + pub(super) keepalive_pong_total: u64, + pub(super) keepalive_timeout_total: u64, + pub(super) rpc_proxy_req_signal_sent_total: u64, + pub(super) rpc_proxy_req_signal_failed_total: u64, + pub(super) rpc_proxy_req_signal_skipped_no_meta_total: u64, + pub(super) rpc_proxy_req_signal_response_total: u64, + pub(super) rpc_proxy_req_signal_close_sent_total: u64, + pub(super) reconnect_attempt_total: u64, + pub(super) reconnect_success_total: u64, + pub(super) handshake_reject_total: u64, + pub(super) handshake_error_codes: Vec, + pub(super) reader_eof_total: u64, + pub(super) idle_close_by_peer_total: u64, + pub(super) route_drop_no_conn_total: u64, + pub(super) route_drop_channel_closed_total: u64, + pub(super) route_drop_queue_full_total: u64, + pub(super) route_drop_queue_full_base_total: u64, + pub(super) route_drop_queue_full_high_total: u64, + pub(super) socks_kdf_strict_reject_total: u64, + pub(super) socks_kdf_compat_fallback_total: u64, + pub(super) endpoint_quarantine_total: u64, + pub(super) kdf_drift_total: u64, + pub(super) kdf_port_only_drift_total: u64, + pub(super) hardswap_pending_reuse_total: u64, + pub(super) hardswap_pending_ttl_expired_total: u64, + pub(super) single_endpoint_outage_enter_total: u64, + pub(super) single_endpoint_outage_exit_total: u64, + pub(super) single_endpoint_outage_reconnect_attempt_total: u64, + pub(super) single_endpoint_outage_reconnect_success_total: u64, + pub(super) single_endpoint_quarantine_bypass_total: u64, + pub(super) single_endpoint_shadow_rotate_total: u64, + pub(super) single_endpoint_shadow_rotate_skipped_quarantine_total: u64, + pub(super) floor_mode_switch_total: u64, + pub(super) floor_mode_switch_static_to_adaptive_total: u64, + pub(super) floor_mode_switch_adaptive_to_static_total: u64, +} + +#[derive(Serialize, Clone)] +pub(super) struct ZeroPoolData { + pub(super) pool_swap_total: u64, + pub(super) pool_drain_active: u64, + pub(super) pool_force_close_total: u64, + pub(super) pool_stale_pick_total: u64, + pub(super) writer_removed_total: u64, + pub(super) writer_removed_unexpected_total: u64, + pub(super) refill_triggered_total: u64, + pub(super) refill_skipped_inflight_total: u64, + pub(super) refill_failed_total: u64, + pub(super) writer_restored_same_endpoint_total: u64, + pub(super) writer_restored_fallback_total: u64, +} + +#[derive(Serialize, Clone)] +pub(super) struct ZeroDesyncData { + pub(super) secure_padding_invalid_total: u64, + pub(super) desync_total: u64, + pub(super) desync_full_logged_total: u64, + pub(super) desync_suppressed_total: u64, + pub(super) desync_frames_bucket_0: u64, + pub(super) desync_frames_bucket_1_2: u64, + pub(super) desync_frames_bucket_3_10: u64, + pub(super) desync_frames_bucket_gt_10: u64, +} + +#[derive(Serialize, Clone)] +pub(super) struct ZeroAllData { + pub(super) generated_at_epoch_secs: u64, + pub(super) core: ZeroCoreData, + pub(super) upstream: ZeroUpstreamData, + pub(super) middle_proxy: ZeroMiddleProxyData, + pub(super) pool: ZeroPoolData, + pub(super) desync: ZeroDesyncData, +} + +#[derive(Serialize, Clone)] pub(super) struct MeWritersSummary { pub(super) configured_dc_groups: usize, pub(super) configured_endpoints: usize, @@ -76,7 +192,7 @@ pub(super) struct MeWritersSummary { pub(super) coverage_pct: f64, } -#[derive(Serialize)] +#[derive(Serialize, Clone)] pub(super) struct MeWriterStatus { pub(super) writer_id: u64, pub(super) dc: Option, @@ -90,15 +206,17 @@ pub(super) struct MeWriterStatus { pub(super) rtt_ema_ms: Option, } -#[derive(Serialize)] +#[derive(Serialize, Clone)] pub(super) struct MeWritersData { pub(super) middle_proxy_enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) reason: Option<&'static str>, pub(super) generated_at_epoch_secs: u64, pub(super) summary: MeWritersSummary, pub(super) writers: Vec, } -#[derive(Serialize)] +#[derive(Serialize, Clone)] pub(super) struct DcStatus { pub(super) dc: i16, pub(super) endpoints: Vec, @@ -111,13 +229,85 @@ pub(super) struct DcStatus { pub(super) load: usize, } -#[derive(Serialize)] +#[derive(Serialize, Clone)] pub(super) struct DcStatusData { pub(super) middle_proxy_enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) reason: Option<&'static str>, pub(super) generated_at_epoch_secs: u64, pub(super) dcs: Vec, } +#[derive(Serialize, Clone)] +pub(super) struct MinimalQuarantineData { + pub(super) endpoint: String, + pub(super) remaining_ms: u64, +} + +#[derive(Serialize, Clone)] +pub(super) struct MinimalDcPathData { + pub(super) dc: i16, + pub(super) ip_preference: Option<&'static str>, + pub(super) selected_addr_v4: Option, + pub(super) selected_addr_v6: Option, +} + +#[derive(Serialize, Clone)] +pub(super) struct MinimalMeRuntimeData { + pub(super) active_generation: u64, + pub(super) warm_generation: u64, + pub(super) pending_hardswap_generation: u64, + pub(super) pending_hardswap_age_secs: Option, + pub(super) hardswap_enabled: bool, + pub(super) floor_mode: &'static str, + pub(super) adaptive_floor_idle_secs: u64, + pub(super) adaptive_floor_min_writers_single_endpoint: u8, + pub(super) adaptive_floor_recover_grace_secs: u64, + pub(super) me_keepalive_enabled: bool, + pub(super) me_keepalive_interval_secs: u64, + pub(super) me_keepalive_jitter_secs: u64, + pub(super) me_keepalive_payload_random: bool, + pub(super) rpc_proxy_req_every_secs: u64, + pub(super) me_reconnect_max_concurrent_per_dc: u32, + pub(super) me_reconnect_backoff_base_ms: u64, + pub(super) me_reconnect_backoff_cap_ms: u64, + pub(super) me_reconnect_fast_retry_count: u32, + pub(super) me_pool_drain_ttl_secs: u64, + pub(super) me_pool_force_close_secs: u64, + pub(super) me_pool_min_fresh_ratio: f32, + pub(super) me_bind_stale_mode: &'static str, + pub(super) me_bind_stale_ttl_secs: u64, + pub(super) me_single_endpoint_shadow_writers: u8, + pub(super) me_single_endpoint_outage_mode_enabled: bool, + pub(super) me_single_endpoint_outage_disable_quarantine: bool, + pub(super) me_single_endpoint_outage_backoff_min_ms: u64, + pub(super) me_single_endpoint_outage_backoff_max_ms: u64, + pub(super) me_single_endpoint_shadow_rotate_every_secs: u64, + pub(super) me_deterministic_writer_sort: bool, + pub(super) me_socks_kdf_policy: &'static str, + pub(super) quarantined_endpoints_total: usize, + pub(super) quarantined_endpoints: Vec, +} + +#[derive(Serialize, Clone)] +pub(super) struct MinimalAllPayload { + pub(super) me_writers: MeWritersData, + pub(super) dcs: DcStatusData, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) me_runtime: Option, + pub(super) network_path: Vec, +} + +#[derive(Serialize, Clone)] +pub(super) struct MinimalAllData { + pub(super) enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) reason: Option<&'static str>, + pub(super) generated_at_epoch_secs: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) data: Option, +} + #[derive(Serialize)] pub(super) struct UserInfo { pub(super) username: String,