From 42212309698da5fe1f1a0e239b2fb170fd9b99a5 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 18:55:20 +0300 Subject: [PATCH] API Events + API as module --- src/api/events.rs | 90 ++++++++++++++++ src/api/mod.rs | 262 +++++++++++++++++++++++----------------------- 2 files changed, 223 insertions(+), 129 deletions(-) create mode 100644 src/api/events.rs diff --git a/src/api/events.rs b/src/api/events.rs new file mode 100644 index 0000000..4ca91e8 --- /dev/null +++ b/src/api/events.rs @@ -0,0 +1,90 @@ +use std::collections::VecDeque; +use std::sync::Mutex; +use std::time::{SystemTime, UNIX_EPOCH}; + +use serde::Serialize; + +#[derive(Clone, Serialize)] +pub(super) struct ApiEventRecord { + pub(super) seq: u64, + pub(super) ts_epoch_secs: u64, + pub(super) event_type: String, + pub(super) context: String, +} + +#[derive(Clone, Serialize)] +pub(super) struct ApiEventSnapshot { + pub(super) capacity: usize, + pub(super) dropped_total: u64, + pub(super) events: Vec, +} + +struct ApiEventsInner { + capacity: usize, + dropped_total: u64, + next_seq: u64, + events: VecDeque, +} + +/// Bounded ring-buffer for control-plane API/runtime events. +pub(crate) struct ApiEventStore { + inner: Mutex, +} + +impl ApiEventStore { + pub(super) fn new(capacity: usize) -> Self { + let bounded = capacity.max(16); + Self { + inner: Mutex::new(ApiEventsInner { + capacity: bounded, + dropped_total: 0, + next_seq: 1, + events: VecDeque::with_capacity(bounded), + }), + } + } + + pub(super) fn record(&self, event_type: &str, context: impl Into) { + let now_epoch_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let mut context = context.into(); + if context.len() > 256 { + context.truncate(256); + } + + let mut guard = self.inner.lock().expect("api event store mutex poisoned"); + if guard.events.len() == guard.capacity { + guard.events.pop_front(); + guard.dropped_total = guard.dropped_total.saturating_add(1); + } + let seq = guard.next_seq; + guard.next_seq = guard.next_seq.saturating_add(1); + guard.events.push_back(ApiEventRecord { + seq, + ts_epoch_secs: now_epoch_secs, + event_type: event_type.to_string(), + context, + }); + } + + pub(super) fn snapshot(&self, limit: usize) -> ApiEventSnapshot { + let guard = self.inner.lock().expect("api event store mutex poisoned"); + let bounded_limit = limit.clamp(1, guard.capacity.max(1)); + let mut items: Vec = guard + .events + .iter() + .rev() + .take(bounded_limit) + .cloned() + .collect(); + items.reverse(); + + ApiEventSnapshot { + capacity: guard.capacity, + dropped_total: guard.dropped_total, + events: items, + } + } +} diff --git a/src/api/mod.rs b/src/api/mod.rs index f2d31da..a705a46 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -3,16 +3,13 @@ use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::time::{SystemTime, UNIX_EPOCH}; -use http_body_util::{BodyExt, Full}; +use http_body_util::Full; use hyper::body::{Bytes, Incoming}; use hyper::header::AUTHORIZATION; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Method, Request, Response, StatusCode}; -use serde::Serialize; -use serde::de::DeserializeOwned; use tokio::net::TcpListener; use tokio::sync::{Mutex, watch}; use tracing::{debug, info, warn}; @@ -24,15 +21,29 @@ use crate::transport::middle_proxy::MePool; use crate::transport::UpstreamManager; mod config_store; +mod events; +mod http_utils; mod model; +mod runtime_edge; +mod runtime_min; mod runtime_stats; +mod runtime_watch; mod runtime_zero; mod users; use config_store::{current_revision, parse_if_match}; +use http_utils::{error_response, read_json, read_optional_json, success_response}; +use events::ApiEventStore; use model::{ - ApiFailure, CreateUserRequest, ErrorBody, ErrorResponse, HealthData, PatchUserRequest, - RotateSecretRequest, SuccessResponse, SummaryData, + ApiFailure, CreateUserRequest, HealthData, PatchUserRequest, RotateSecretRequest, SummaryData, +}; +use runtime_edge::{ + EdgeConnectionsCacheEntry, build_runtime_connections_summary_data, + build_runtime_events_recent_data, +}; +use runtime_min::{ + build_runtime_me_pool_state_data, build_runtime_me_quality_data, build_runtime_nat_stun_data, + build_runtime_upstream_quality_data, build_security_whitelist_data, }; use runtime_stats::{ MinimalCacheEntry, build_dcs_data, build_me_writers_data, build_minimal_all_data, @@ -42,6 +53,7 @@ use runtime_zero::{ build_limits_effective_data, build_runtime_gates_data, build_security_posture_data, build_system_info_data, }; +use runtime_watch::spawn_runtime_watchers; use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config}; pub(super) struct ApiRuntimeState { @@ -62,6 +74,9 @@ pub(super) struct ApiShared { pub(super) startup_detected_ip_v6: Option, pub(super) mutation_lock: Arc>, pub(super) minimal_cache: Arc>>, + pub(super) runtime_edge_connections_cache: Arc>>, + pub(super) runtime_edge_recompute_lock: Arc>, + pub(super) runtime_events: Arc, pub(super) request_id: Arc, pub(super) runtime_state: Arc, } @@ -116,40 +131,21 @@ pub async fn serve( startup_detected_ip_v6, mutation_lock: Arc::new(Mutex::new(())), minimal_cache: Arc::new(Mutex::new(None)), + runtime_edge_connections_cache: Arc::new(Mutex::new(None)), + runtime_edge_recompute_lock: Arc::new(Mutex::new(())), + runtime_events: Arc::new(ApiEventStore::new( + config_rx.borrow().server.api.runtime_edge_events_capacity, + )), request_id: Arc::new(AtomicU64::new(1)), runtime_state: runtime_state.clone(), }); - let mut config_rx_reload = config_rx.clone(); - let runtime_state_reload = runtime_state.clone(); - tokio::spawn(async move { - loop { - if config_rx_reload.changed().await.is_err() { - break; - } - runtime_state_reload - .config_reload_count - .fetch_add(1, Ordering::Relaxed); - runtime_state_reload - .last_config_reload_epoch_secs - .store(now_epoch_secs(), Ordering::Relaxed); - } - }); - - let mut admission_rx_watch = admission_rx.clone(); - tokio::spawn(async move { - runtime_state - .admission_open - .store(*admission_rx_watch.borrow(), Ordering::Relaxed); - loop { - if admission_rx_watch.changed().await.is_err() { - break; - } - runtime_state - .admission_open - .store(*admission_rx_watch.borrow(), Ordering::Relaxed); - } - }); + spawn_runtime_watchers( + config_rx.clone(), + admission_rx.clone(), + runtime_state.clone(), + shared.runtime_events.clone(), + ); loop { let (stream, peer) = match listener.accept().await { @@ -232,6 +228,7 @@ async fn handle( let method = req.method().clone(); let path = req.uri().path().to_string(); + let query = req.uri().query().map(str::to_string); let body_limit = api_cfg.request_body_limit_bytes; let result: Result>, ApiFailure> = async { @@ -264,6 +261,11 @@ async fn handle( let data = build_security_posture_data(cfg.as_ref()); Ok(success_response(StatusCode::OK, data, revision)) } + ("GET", "/v1/security/whitelist") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_security_whitelist_data(cfg.as_ref()); + Ok(success_response(StatusCode::OK, data, revision)) + } ("GET", "/v1/stats/summary") => { let revision = current_revision(&shared.config_path).await?; let data = SummaryData { @@ -300,6 +302,40 @@ async fn handle( let data = build_dcs_data(shared.as_ref(), api_cfg).await; Ok(success_response(StatusCode::OK, data, revision)) } + ("GET", "/v1/runtime/me_pool_state") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_me_pool_state_data(shared.as_ref()).await; + Ok(success_response(StatusCode::OK, data, revision)) + } + ("GET", "/v1/runtime/me_quality") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_me_quality_data(shared.as_ref()).await; + Ok(success_response(StatusCode::OK, data, revision)) + } + ("GET", "/v1/runtime/upstream_quality") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_upstream_quality_data(shared.as_ref()).await; + Ok(success_response(StatusCode::OK, data, revision)) + } + ("GET", "/v1/runtime/nat_stun") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_nat_stun_data(shared.as_ref()).await; + Ok(success_response(StatusCode::OK, data, revision)) + } + ("GET", "/v1/runtime/connections/summary") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_connections_summary_data(shared.as_ref(), cfg.as_ref()).await; + Ok(success_response(StatusCode::OK, data, revision)) + } + ("GET", "/v1/runtime/events/recent") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_events_recent_data( + shared.as_ref(), + cfg.as_ref(), + query.as_deref(), + ); + Ok(success_response(StatusCode::OK, data, revision)) + } ("GET", "/v1/stats/users") | ("GET", "/v1/users") => { let revision = current_revision(&shared.config_path).await?; let users = users_from_config( @@ -325,7 +361,17 @@ async fn handle( } let expected_revision = parse_if_match(req.headers()); let body = read_json::(req.into_body(), body_limit).await?; - let (data, revision) = create_user(body, expected_revision, &shared).await?; + let result = create_user(body, expected_revision, &shared).await; + let (data, revision) = match result { + Ok(ok) => ok, + Err(error) => { + shared.runtime_events.record("api.user.create.failed", error.code); + return Err(error); + } + }; + shared + .runtime_events + .record("api.user.create.ok", format!("username={}", data.user.username)); Ok(success_response(StatusCode::CREATED, data, revision)) } _ => { @@ -365,8 +411,20 @@ async fn handle( } let expected_revision = parse_if_match(req.headers()); let body = read_json::(req.into_body(), body_limit).await?; - let (data, revision) = - patch_user(user, body, expected_revision, &shared).await?; + let result = patch_user(user, body, expected_revision, &shared).await; + let (data, revision) = match result { + Ok(ok) => ok, + Err(error) => { + shared.runtime_events.record( + "api.user.patch.failed", + format!("username={} code={}", user, error.code), + ); + return Err(error); + } + }; + shared + .runtime_events + .record("api.user.patch.ok", format!("username={}", data.username)); return Ok(success_response(StatusCode::OK, data, revision)); } if method == Method::DELETE { @@ -381,8 +439,21 @@ async fn handle( )); } let expected_revision = parse_if_match(req.headers()); - let (deleted_user, revision) = - delete_user(user, expected_revision, &shared).await?; + let result = delete_user(user, expected_revision, &shared).await; + let (deleted_user, revision) = match result { + Ok(ok) => ok, + Err(error) => { + shared.runtime_events.record( + "api.user.delete.failed", + format!("username={} code={}", user, error.code), + ); + return Err(error); + } + }; + shared.runtime_events.record( + "api.user.delete.ok", + format!("username={}", deleted_user), + ); return Ok(success_response(StatusCode::OK, deleted_user, revision)); } if method == Method::POST @@ -404,9 +475,27 @@ async fn handle( let body = read_optional_json::(req.into_body(), body_limit) .await?; - let (data, revision) = - rotate_secret(base_user, body.unwrap_or_default(), expected_revision, &shared) - .await?; + let result = rotate_secret( + base_user, + body.unwrap_or_default(), + expected_revision, + &shared, + ) + .await; + let (data, revision) = match result { + Ok(ok) => ok, + Err(error) => { + shared.runtime_events.record( + "api.user.rotate_secret.failed", + format!("username={} code={}", base_user, error.code), + ); + return Err(error); + } + }; + shared.runtime_events.record( + "api.user.rotate_secret.ok", + format!("username={}", base_user), + ); return Ok(success_response(StatusCode::OK, data, revision)); } if method == Method::POST { @@ -438,88 +527,3 @@ async fn handle( Err(error) => Ok(error_response(request_id, error)), } } - -fn success_response( - status: StatusCode, - data: T, - revision: String, -) -> Response> { - let payload = SuccessResponse { - ok: true, - data, - revision, - }; - let body = serde_json::to_vec(&payload).unwrap_or_else(|_| b"{\"ok\":false}".to_vec()); - Response::builder() - .status(status) - .header("content-type", "application/json; charset=utf-8") - .body(Full::new(Bytes::from(body))) - .unwrap() -} - -fn error_response(request_id: u64, failure: ApiFailure) -> Response> { - let payload = ErrorResponse { - ok: false, - error: ErrorBody { - code: failure.code, - message: failure.message, - }, - request_id, - }; - let body = serde_json::to_vec(&payload).unwrap_or_else(|_| { - format!( - "{{\"ok\":false,\"error\":{{\"code\":\"internal_error\",\"message\":\"serialization failed\"}},\"request_id\":{}}}", - request_id - ) - .into_bytes() - }); - Response::builder() - .status(failure.status) - .header("content-type", "application/json; charset=utf-8") - .body(Full::new(Bytes::from(body))) - .unwrap() -} - -async fn read_json(body: Incoming, limit: usize) -> Result { - let bytes = read_body_with_limit(body, limit).await?; - serde_json::from_slice(&bytes).map_err(|_| ApiFailure::bad_request("Invalid JSON body")) -} - -async fn read_optional_json( - body: Incoming, - limit: usize, -) -> Result, ApiFailure> { - let bytes = read_body_with_limit(body, limit).await?; - if bytes.is_empty() { - return Ok(None); - } - serde_json::from_slice(&bytes) - .map(Some) - .map_err(|_| ApiFailure::bad_request("Invalid JSON body")) -} - -async fn read_body_with_limit(body: Incoming, limit: usize) -> Result, ApiFailure> { - let mut collected = Vec::new(); - let mut body = body; - while let Some(frame_result) = body.frame().await { - let frame = frame_result.map_err(|_| ApiFailure::bad_request("Invalid request body"))?; - if let Some(chunk) = frame.data_ref() { - if collected.len().saturating_add(chunk.len()) > limit { - return Err(ApiFailure::new( - StatusCode::PAYLOAD_TOO_LARGE, - "payload_too_large", - format!("Body exceeds {} bytes", limit), - )); - } - collected.extend_from_slice(chunk); - } - } - Ok(collected) -} - -fn now_epoch_secs() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_secs() -}