Runtime + Upstream API: merge pull request #340 from telemt/flow-api

Runtime + Upstream API
This commit is contained in:
Alexey 2026-03-06 19:56:29 +03:00 committed by GitHub
commit 24ff75701e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 1549 additions and 129 deletions

90
src/api/events.rs Normal file
View File

@ -0,0 +1,90 @@
use std::collections::VecDeque;
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use serde::Serialize;
#[derive(Clone, Serialize)]
pub(super) struct ApiEventRecord {
pub(super) seq: u64,
pub(super) ts_epoch_secs: u64,
pub(super) event_type: String,
pub(super) context: String,
}
#[derive(Clone, Serialize)]
pub(super) struct ApiEventSnapshot {
pub(super) capacity: usize,
pub(super) dropped_total: u64,
pub(super) events: Vec<ApiEventRecord>,
}
struct ApiEventsInner {
capacity: usize,
dropped_total: u64,
next_seq: u64,
events: VecDeque<ApiEventRecord>,
}
/// Bounded ring-buffer for control-plane API/runtime events.
pub(crate) struct ApiEventStore {
inner: Mutex<ApiEventsInner>,
}
impl ApiEventStore {
pub(super) fn new(capacity: usize) -> Self {
let bounded = capacity.max(16);
Self {
inner: Mutex::new(ApiEventsInner {
capacity: bounded,
dropped_total: 0,
next_seq: 1,
events: VecDeque::with_capacity(bounded),
}),
}
}
pub(super) fn record(&self, event_type: &str, context: impl Into<String>) {
let now_epoch_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let mut context = context.into();
if context.len() > 256 {
context.truncate(256);
}
let mut guard = self.inner.lock().expect("api event store mutex poisoned");
if guard.events.len() == guard.capacity {
guard.events.pop_front();
guard.dropped_total = guard.dropped_total.saturating_add(1);
}
let seq = guard.next_seq;
guard.next_seq = guard.next_seq.saturating_add(1);
guard.events.push_back(ApiEventRecord {
seq,
ts_epoch_secs: now_epoch_secs,
event_type: event_type.to_string(),
context,
});
}
pub(super) fn snapshot(&self, limit: usize) -> ApiEventSnapshot {
let guard = self.inner.lock().expect("api event store mutex poisoned");
let bounded_limit = limit.clamp(1, guard.capacity.max(1));
let mut items: Vec<ApiEventRecord> = guard
.events
.iter()
.rev()
.take(bounded_limit)
.cloned()
.collect();
items.reverse();
ApiEventSnapshot {
capacity: guard.capacity,
dropped_total: guard.dropped_total,
events: items,
}
}
}

91
src/api/http_utils.rs Normal file
View File

@ -0,0 +1,91 @@
use http_body_util::{BodyExt, Full};
use hyper::StatusCode;
use hyper::body::{Bytes, Incoming};
use serde::Serialize;
use serde::de::DeserializeOwned;
use super::model::{ApiFailure, ErrorBody, ErrorResponse, SuccessResponse};
pub(super) fn success_response<T: Serialize>(
status: StatusCode,
data: T,
revision: String,
) -> hyper::Response<Full<Bytes>> {
let payload = SuccessResponse {
ok: true,
data,
revision,
};
let body = serde_json::to_vec(&payload).unwrap_or_else(|_| b"{\"ok\":false}".to_vec());
hyper::Response::builder()
.status(status)
.header("content-type", "application/json; charset=utf-8")
.body(Full::new(Bytes::from(body)))
.unwrap()
}
pub(super) fn error_response(
request_id: u64,
failure: ApiFailure,
) -> hyper::Response<Full<Bytes>> {
let payload = ErrorResponse {
ok: false,
error: ErrorBody {
code: failure.code,
message: failure.message,
},
request_id,
};
let body = serde_json::to_vec(&payload).unwrap_or_else(|_| {
format!(
"{{\"ok\":false,\"error\":{{\"code\":\"internal_error\",\"message\":\"serialization failed\"}},\"request_id\":{}}}",
request_id
)
.into_bytes()
});
hyper::Response::builder()
.status(failure.status)
.header("content-type", "application/json; charset=utf-8")
.body(Full::new(Bytes::from(body)))
.unwrap()
}
pub(super) async fn read_json<T: DeserializeOwned>(
body: Incoming,
limit: usize,
) -> Result<T, ApiFailure> {
let bytes = read_body_with_limit(body, limit).await?;
serde_json::from_slice(&bytes).map_err(|_| ApiFailure::bad_request("Invalid JSON body"))
}
pub(super) async fn read_optional_json<T: DeserializeOwned>(
body: Incoming,
limit: usize,
) -> Result<Option<T>, ApiFailure> {
let bytes = read_body_with_limit(body, limit).await?;
if bytes.is_empty() {
return Ok(None);
}
serde_json::from_slice(&bytes)
.map(Some)
.map_err(|_| ApiFailure::bad_request("Invalid JSON body"))
}
async fn read_body_with_limit(body: Incoming, limit: usize) -> Result<Vec<u8>, ApiFailure> {
let mut collected = Vec::new();
let mut body = body;
while let Some(frame_result) = body.frame().await {
let frame = frame_result.map_err(|_| ApiFailure::bad_request("Invalid request body"))?;
if let Some(chunk) = frame.data_ref() {
if collected.len().saturating_add(chunk.len()) > limit {
return Err(ApiFailure::new(
StatusCode::PAYLOAD_TOO_LARGE,
"payload_too_large",
format!("Body exceeds {} bytes", limit),
));
}
collected.extend_from_slice(chunk);
}
}
Ok(collected)
}

View File

@ -3,16 +3,13 @@ use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use http_body_util::{BodyExt, Full}; use http_body_util::Full;
use hyper::body::{Bytes, Incoming}; use hyper::body::{Bytes, Incoming};
use hyper::header::AUTHORIZATION; use hyper::header::AUTHORIZATION;
use hyper::server::conn::http1; use hyper::server::conn::http1;
use hyper::service::service_fn; use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode}; use hyper::{Method, Request, Response, StatusCode};
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::{Mutex, watch}; use tokio::sync::{Mutex, watch};
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
@ -24,15 +21,29 @@ use crate::transport::middle_proxy::MePool;
use crate::transport::UpstreamManager; use crate::transport::UpstreamManager;
mod config_store; mod config_store;
mod events;
mod http_utils;
mod model; mod model;
mod runtime_edge;
mod runtime_min;
mod runtime_stats; mod runtime_stats;
mod runtime_watch;
mod runtime_zero; mod runtime_zero;
mod users; mod users;
use config_store::{current_revision, parse_if_match}; use config_store::{current_revision, parse_if_match};
use http_utils::{error_response, read_json, read_optional_json, success_response};
use events::ApiEventStore;
use model::{ use model::{
ApiFailure, CreateUserRequest, ErrorBody, ErrorResponse, HealthData, PatchUserRequest, ApiFailure, CreateUserRequest, HealthData, PatchUserRequest, RotateSecretRequest, SummaryData,
RotateSecretRequest, SuccessResponse, SummaryData, };
use runtime_edge::{
EdgeConnectionsCacheEntry, build_runtime_connections_summary_data,
build_runtime_events_recent_data,
};
use runtime_min::{
build_runtime_me_pool_state_data, build_runtime_me_quality_data, build_runtime_nat_stun_data,
build_runtime_upstream_quality_data, build_security_whitelist_data,
}; };
use runtime_stats::{ use runtime_stats::{
MinimalCacheEntry, build_dcs_data, build_me_writers_data, build_minimal_all_data, MinimalCacheEntry, build_dcs_data, build_me_writers_data, build_minimal_all_data,
@ -42,6 +53,7 @@ use runtime_zero::{
build_limits_effective_data, build_runtime_gates_data, build_security_posture_data, build_limits_effective_data, build_runtime_gates_data, build_security_posture_data,
build_system_info_data, build_system_info_data,
}; };
use runtime_watch::spawn_runtime_watchers;
use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config}; use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config};
pub(super) struct ApiRuntimeState { pub(super) struct ApiRuntimeState {
@ -62,6 +74,9 @@ pub(super) struct ApiShared {
pub(super) startup_detected_ip_v6: Option<IpAddr>, pub(super) startup_detected_ip_v6: Option<IpAddr>,
pub(super) mutation_lock: Arc<Mutex<()>>, pub(super) mutation_lock: Arc<Mutex<()>>,
pub(super) minimal_cache: Arc<Mutex<Option<MinimalCacheEntry>>>, pub(super) minimal_cache: Arc<Mutex<Option<MinimalCacheEntry>>>,
pub(super) runtime_edge_connections_cache: Arc<Mutex<Option<EdgeConnectionsCacheEntry>>>,
pub(super) runtime_edge_recompute_lock: Arc<Mutex<()>>,
pub(super) runtime_events: Arc<ApiEventStore>,
pub(super) request_id: Arc<AtomicU64>, pub(super) request_id: Arc<AtomicU64>,
pub(super) runtime_state: Arc<ApiRuntimeState>, pub(super) runtime_state: Arc<ApiRuntimeState>,
} }
@ -116,40 +131,21 @@ pub async fn serve(
startup_detected_ip_v6, startup_detected_ip_v6,
mutation_lock: Arc::new(Mutex::new(())), mutation_lock: Arc::new(Mutex::new(())),
minimal_cache: Arc::new(Mutex::new(None)), minimal_cache: Arc::new(Mutex::new(None)),
runtime_edge_connections_cache: Arc::new(Mutex::new(None)),
runtime_edge_recompute_lock: Arc::new(Mutex::new(())),
runtime_events: Arc::new(ApiEventStore::new(
config_rx.borrow().server.api.runtime_edge_events_capacity,
)),
request_id: Arc::new(AtomicU64::new(1)), request_id: Arc::new(AtomicU64::new(1)),
runtime_state: runtime_state.clone(), runtime_state: runtime_state.clone(),
}); });
let mut config_rx_reload = config_rx.clone(); spawn_runtime_watchers(
let runtime_state_reload = runtime_state.clone(); config_rx.clone(),
tokio::spawn(async move { admission_rx.clone(),
loop { runtime_state.clone(),
if config_rx_reload.changed().await.is_err() { shared.runtime_events.clone(),
break; );
}
runtime_state_reload
.config_reload_count
.fetch_add(1, Ordering::Relaxed);
runtime_state_reload
.last_config_reload_epoch_secs
.store(now_epoch_secs(), Ordering::Relaxed);
}
});
let mut admission_rx_watch = admission_rx.clone();
tokio::spawn(async move {
runtime_state
.admission_open
.store(*admission_rx_watch.borrow(), Ordering::Relaxed);
loop {
if admission_rx_watch.changed().await.is_err() {
break;
}
runtime_state
.admission_open
.store(*admission_rx_watch.borrow(), Ordering::Relaxed);
}
});
loop { loop {
let (stream, peer) = match listener.accept().await { let (stream, peer) = match listener.accept().await {
@ -232,6 +228,7 @@ async fn handle(
let method = req.method().clone(); let method = req.method().clone();
let path = req.uri().path().to_string(); let path = req.uri().path().to_string();
let query = req.uri().query().map(str::to_string);
let body_limit = api_cfg.request_body_limit_bytes; let body_limit = api_cfg.request_body_limit_bytes;
let result: Result<Response<Full<Bytes>>, ApiFailure> = async { let result: Result<Response<Full<Bytes>>, ApiFailure> = async {
@ -264,6 +261,11 @@ async fn handle(
let data = build_security_posture_data(cfg.as_ref()); let data = build_security_posture_data(cfg.as_ref());
Ok(success_response(StatusCode::OK, data, revision)) Ok(success_response(StatusCode::OK, data, revision))
} }
("GET", "/v1/security/whitelist") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_security_whitelist_data(cfg.as_ref());
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/stats/summary") => { ("GET", "/v1/stats/summary") => {
let revision = current_revision(&shared.config_path).await?; let revision = current_revision(&shared.config_path).await?;
let data = SummaryData { let data = SummaryData {
@ -300,6 +302,40 @@ async fn handle(
let data = build_dcs_data(shared.as_ref(), api_cfg).await; let data = build_dcs_data(shared.as_ref(), api_cfg).await;
Ok(success_response(StatusCode::OK, data, revision)) Ok(success_response(StatusCode::OK, data, revision))
} }
("GET", "/v1/runtime/me_pool_state") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_runtime_me_pool_state_data(shared.as_ref()).await;
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/runtime/me_quality") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_runtime_me_quality_data(shared.as_ref()).await;
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/runtime/upstream_quality") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_runtime_upstream_quality_data(shared.as_ref()).await;
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/runtime/nat_stun") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_runtime_nat_stun_data(shared.as_ref()).await;
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/runtime/connections/summary") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_runtime_connections_summary_data(shared.as_ref(), cfg.as_ref()).await;
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/runtime/events/recent") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_runtime_events_recent_data(
shared.as_ref(),
cfg.as_ref(),
query.as_deref(),
);
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/stats/users") | ("GET", "/v1/users") => { ("GET", "/v1/stats/users") | ("GET", "/v1/users") => {
let revision = current_revision(&shared.config_path).await?; let revision = current_revision(&shared.config_path).await?;
let users = users_from_config( let users = users_from_config(
@ -325,7 +361,17 @@ async fn handle(
} }
let expected_revision = parse_if_match(req.headers()); let expected_revision = parse_if_match(req.headers());
let body = read_json::<CreateUserRequest>(req.into_body(), body_limit).await?; let body = read_json::<CreateUserRequest>(req.into_body(), body_limit).await?;
let (data, revision) = create_user(body, expected_revision, &shared).await?; let result = create_user(body, expected_revision, &shared).await;
let (data, revision) = match result {
Ok(ok) => ok,
Err(error) => {
shared.runtime_events.record("api.user.create.failed", error.code);
return Err(error);
}
};
shared
.runtime_events
.record("api.user.create.ok", format!("username={}", data.user.username));
Ok(success_response(StatusCode::CREATED, data, revision)) Ok(success_response(StatusCode::CREATED, data, revision))
} }
_ => { _ => {
@ -365,8 +411,20 @@ async fn handle(
} }
let expected_revision = parse_if_match(req.headers()); let expected_revision = parse_if_match(req.headers());
let body = read_json::<PatchUserRequest>(req.into_body(), body_limit).await?; let body = read_json::<PatchUserRequest>(req.into_body(), body_limit).await?;
let (data, revision) = let result = patch_user(user, body, expected_revision, &shared).await;
patch_user(user, body, expected_revision, &shared).await?; let (data, revision) = match result {
Ok(ok) => ok,
Err(error) => {
shared.runtime_events.record(
"api.user.patch.failed",
format!("username={} code={}", user, error.code),
);
return Err(error);
}
};
shared
.runtime_events
.record("api.user.patch.ok", format!("username={}", data.username));
return Ok(success_response(StatusCode::OK, data, revision)); return Ok(success_response(StatusCode::OK, data, revision));
} }
if method == Method::DELETE { if method == Method::DELETE {
@ -381,8 +439,21 @@ async fn handle(
)); ));
} }
let expected_revision = parse_if_match(req.headers()); let expected_revision = parse_if_match(req.headers());
let (deleted_user, revision) = let result = delete_user(user, expected_revision, &shared).await;
delete_user(user, expected_revision, &shared).await?; let (deleted_user, revision) = match result {
Ok(ok) => ok,
Err(error) => {
shared.runtime_events.record(
"api.user.delete.failed",
format!("username={} code={}", user, error.code),
);
return Err(error);
}
};
shared.runtime_events.record(
"api.user.delete.ok",
format!("username={}", deleted_user),
);
return Ok(success_response(StatusCode::OK, deleted_user, revision)); return Ok(success_response(StatusCode::OK, deleted_user, revision));
} }
if method == Method::POST if method == Method::POST
@ -404,9 +475,27 @@ async fn handle(
let body = let body =
read_optional_json::<RotateSecretRequest>(req.into_body(), body_limit) read_optional_json::<RotateSecretRequest>(req.into_body(), body_limit)
.await?; .await?;
let (data, revision) = let result = rotate_secret(
rotate_secret(base_user, body.unwrap_or_default(), expected_revision, &shared) base_user,
.await?; body.unwrap_or_default(),
expected_revision,
&shared,
)
.await;
let (data, revision) = match result {
Ok(ok) => ok,
Err(error) => {
shared.runtime_events.record(
"api.user.rotate_secret.failed",
format!("username={} code={}", base_user, error.code),
);
return Err(error);
}
};
shared.runtime_events.record(
"api.user.rotate_secret.ok",
format!("username={}", base_user),
);
return Ok(success_response(StatusCode::OK, data, revision)); return Ok(success_response(StatusCode::OK, data, revision));
} }
if method == Method::POST { if method == Method::POST {
@ -438,88 +527,3 @@ async fn handle(
Err(error) => Ok(error_response(request_id, error)), Err(error) => Ok(error_response(request_id, error)),
} }
} }
fn success_response<T: Serialize>(
status: StatusCode,
data: T,
revision: String,
) -> Response<Full<Bytes>> {
let payload = SuccessResponse {
ok: true,
data,
revision,
};
let body = serde_json::to_vec(&payload).unwrap_or_else(|_| b"{\"ok\":false}".to_vec());
Response::builder()
.status(status)
.header("content-type", "application/json; charset=utf-8")
.body(Full::new(Bytes::from(body)))
.unwrap()
}
fn error_response(request_id: u64, failure: ApiFailure) -> Response<Full<Bytes>> {
let payload = ErrorResponse {
ok: false,
error: ErrorBody {
code: failure.code,
message: failure.message,
},
request_id,
};
let body = serde_json::to_vec(&payload).unwrap_or_else(|_| {
format!(
"{{\"ok\":false,\"error\":{{\"code\":\"internal_error\",\"message\":\"serialization failed\"}},\"request_id\":{}}}",
request_id
)
.into_bytes()
});
Response::builder()
.status(failure.status)
.header("content-type", "application/json; charset=utf-8")
.body(Full::new(Bytes::from(body)))
.unwrap()
}
async fn read_json<T: DeserializeOwned>(body: Incoming, limit: usize) -> Result<T, ApiFailure> {
let bytes = read_body_with_limit(body, limit).await?;
serde_json::from_slice(&bytes).map_err(|_| ApiFailure::bad_request("Invalid JSON body"))
}
async fn read_optional_json<T: DeserializeOwned>(
body: Incoming,
limit: usize,
) -> Result<Option<T>, ApiFailure> {
let bytes = read_body_with_limit(body, limit).await?;
if bytes.is_empty() {
return Ok(None);
}
serde_json::from_slice(&bytes)
.map(Some)
.map_err(|_| ApiFailure::bad_request("Invalid JSON body"))
}
async fn read_body_with_limit(body: Incoming, limit: usize) -> Result<Vec<u8>, ApiFailure> {
let mut collected = Vec::new();
let mut body = body;
while let Some(frame_result) = body.frame().await {
let frame = frame_result.map_err(|_| ApiFailure::bad_request("Invalid request body"))?;
if let Some(chunk) = frame.data_ref() {
if collected.len().saturating_add(chunk.len()) > limit {
return Err(ApiFailure::new(
StatusCode::PAYLOAD_TOO_LARGE,
"payload_too_large",
format!("Body exceeds {} bytes", limit),
));
}
collected.extend_from_slice(chunk);
}
}
Ok(collected)
}
fn now_epoch_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}

294
src/api/runtime_edge.rs Normal file
View File

@ -0,0 +1,294 @@
use std::cmp::Reverse;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use serde::Serialize;
use crate::config::ProxyConfig;
use super::ApiShared;
use super::events::ApiEventRecord;
const FEATURE_DISABLED_REASON: &str = "feature_disabled";
const SOURCE_UNAVAILABLE_REASON: &str = "source_unavailable";
const EVENTS_DEFAULT_LIMIT: usize = 50;
const EVENTS_MAX_LIMIT: usize = 1000;
#[derive(Clone, Serialize)]
pub(super) struct RuntimeEdgeConnectionUserData {
pub(super) username: String,
pub(super) current_connections: u64,
pub(super) total_octets: u64,
}
#[derive(Clone, Serialize)]
pub(super) struct RuntimeEdgeConnectionTotalsData {
pub(super) current_connections: u64,
pub(super) current_connections_me: u64,
pub(super) current_connections_direct: u64,
pub(super) active_users: usize,
}
#[derive(Clone, Serialize)]
pub(super) struct RuntimeEdgeConnectionTopData {
pub(super) limit: usize,
pub(super) by_connections: Vec<RuntimeEdgeConnectionUserData>,
pub(super) by_throughput: Vec<RuntimeEdgeConnectionUserData>,
}
#[derive(Clone, Serialize)]
pub(super) struct RuntimeEdgeConnectionCacheData {
pub(super) ttl_ms: u64,
pub(super) served_from_cache: bool,
pub(super) stale_cache_used: bool,
}
#[derive(Clone, Serialize)]
pub(super) struct RuntimeEdgeConnectionTelemetryData {
pub(super) user_enabled: bool,
pub(super) throughput_is_cumulative: bool,
}
#[derive(Clone, Serialize)]
pub(super) struct RuntimeEdgeConnectionsSummaryPayload {
pub(super) cache: RuntimeEdgeConnectionCacheData,
pub(super) totals: RuntimeEdgeConnectionTotalsData,
pub(super) top: RuntimeEdgeConnectionTopData,
pub(super) telemetry: RuntimeEdgeConnectionTelemetryData,
}
#[derive(Serialize)]
pub(super) struct RuntimeEdgeConnectionsSummaryData {
pub(super) enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) reason: Option<&'static str>,
pub(super) generated_at_epoch_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) data: Option<RuntimeEdgeConnectionsSummaryPayload>,
}
#[derive(Clone)]
pub(crate) struct EdgeConnectionsCacheEntry {
pub(super) expires_at: Instant,
pub(super) payload: RuntimeEdgeConnectionsSummaryPayload,
pub(super) generated_at_epoch_secs: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeEdgeEventsPayload {
pub(super) capacity: usize,
pub(super) dropped_total: u64,
pub(super) events: Vec<ApiEventRecord>,
}
#[derive(Serialize)]
pub(super) struct RuntimeEdgeEventsData {
pub(super) enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) reason: Option<&'static str>,
pub(super) generated_at_epoch_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) data: Option<RuntimeEdgeEventsPayload>,
}
pub(super) async fn build_runtime_connections_summary_data(
shared: &ApiShared,
cfg: &ProxyConfig,
) -> RuntimeEdgeConnectionsSummaryData {
let now_epoch_secs = now_epoch_secs();
let api_cfg = &cfg.server.api;
if !api_cfg.runtime_edge_enabled {
return RuntimeEdgeConnectionsSummaryData {
enabled: false,
reason: Some(FEATURE_DISABLED_REASON),
generated_at_epoch_secs: now_epoch_secs,
data: None,
};
}
let (generated_at_epoch_secs, payload) = match get_connections_payload_cached(
shared,
api_cfg.runtime_edge_cache_ttl_ms,
api_cfg.runtime_edge_top_n,
)
.await
{
Some(v) => v,
None => {
return RuntimeEdgeConnectionsSummaryData {
enabled: true,
reason: Some(SOURCE_UNAVAILABLE_REASON),
generated_at_epoch_secs: now_epoch_secs,
data: None,
};
}
};
RuntimeEdgeConnectionsSummaryData {
enabled: true,
reason: None,
generated_at_epoch_secs,
data: Some(payload),
}
}
pub(super) fn build_runtime_events_recent_data(
shared: &ApiShared,
cfg: &ProxyConfig,
query: Option<&str>,
) -> RuntimeEdgeEventsData {
let now_epoch_secs = now_epoch_secs();
let api_cfg = &cfg.server.api;
if !api_cfg.runtime_edge_enabled {
return RuntimeEdgeEventsData {
enabled: false,
reason: Some(FEATURE_DISABLED_REASON),
generated_at_epoch_secs: now_epoch_secs,
data: None,
};
}
let limit = parse_recent_events_limit(query, EVENTS_DEFAULT_LIMIT, EVENTS_MAX_LIMIT);
let snapshot = shared.runtime_events.snapshot(limit);
RuntimeEdgeEventsData {
enabled: true,
reason: None,
generated_at_epoch_secs: now_epoch_secs,
data: Some(RuntimeEdgeEventsPayload {
capacity: snapshot.capacity,
dropped_total: snapshot.dropped_total,
events: snapshot.events,
}),
}
}
async fn get_connections_payload_cached(
shared: &ApiShared,
cache_ttl_ms: u64,
top_n: usize,
) -> Option<(u64, RuntimeEdgeConnectionsSummaryPayload)> {
if cache_ttl_ms > 0 {
let now = Instant::now();
let cached = shared.runtime_edge_connections_cache.lock().await.clone();
if let Some(entry) = cached
&& now < entry.expires_at
{
let mut payload = entry.payload;
payload.cache.served_from_cache = true;
payload.cache.stale_cache_used = false;
return Some((entry.generated_at_epoch_secs, payload));
}
}
let Ok(_guard) = shared.runtime_edge_recompute_lock.try_lock() else {
let cached = shared.runtime_edge_connections_cache.lock().await.clone();
if let Some(entry) = cached {
let mut payload = entry.payload;
payload.cache.served_from_cache = true;
payload.cache.stale_cache_used = true;
return Some((entry.generated_at_epoch_secs, payload));
}
return None;
};
let generated_at_epoch_secs = now_epoch_secs();
let payload = recompute_connections_payload(shared, cache_ttl_ms, top_n).await;
if cache_ttl_ms > 0 {
let entry = EdgeConnectionsCacheEntry {
expires_at: Instant::now() + Duration::from_millis(cache_ttl_ms),
payload: payload.clone(),
generated_at_epoch_secs,
};
*shared.runtime_edge_connections_cache.lock().await = Some(entry);
}
Some((generated_at_epoch_secs, payload))
}
async fn recompute_connections_payload(
shared: &ApiShared,
cache_ttl_ms: u64,
top_n: usize,
) -> RuntimeEdgeConnectionsSummaryPayload {
let mut rows = Vec::<RuntimeEdgeConnectionUserData>::new();
let mut active_users = 0usize;
for entry in shared.stats.iter_user_stats() {
let user_stats = entry.value();
let current_connections = user_stats
.curr_connects
.load(std::sync::atomic::Ordering::Relaxed);
let total_octets = user_stats
.octets_from_client
.load(std::sync::atomic::Ordering::Relaxed)
.saturating_add(
user_stats
.octets_to_client
.load(std::sync::atomic::Ordering::Relaxed),
);
if current_connections > 0 {
active_users = active_users.saturating_add(1);
}
rows.push(RuntimeEdgeConnectionUserData {
username: entry.key().clone(),
current_connections,
total_octets,
});
}
let limit = top_n.max(1);
let mut by_connections = rows.clone();
by_connections.sort_by_key(|row| (Reverse(row.current_connections), row.username.clone()));
by_connections.truncate(limit);
let mut by_throughput = rows;
by_throughput.sort_by_key(|row| (Reverse(row.total_octets), row.username.clone()));
by_throughput.truncate(limit);
let telemetry = shared.stats.telemetry_policy();
RuntimeEdgeConnectionsSummaryPayload {
cache: RuntimeEdgeConnectionCacheData {
ttl_ms: cache_ttl_ms,
served_from_cache: false,
stale_cache_used: false,
},
totals: RuntimeEdgeConnectionTotalsData {
current_connections: shared.stats.get_current_connections_total(),
current_connections_me: shared.stats.get_current_connections_me(),
current_connections_direct: shared.stats.get_current_connections_direct(),
active_users,
},
top: RuntimeEdgeConnectionTopData {
limit,
by_connections,
by_throughput,
},
telemetry: RuntimeEdgeConnectionTelemetryData {
user_enabled: telemetry.user_enabled,
throughput_is_cumulative: true,
},
}
}
fn parse_recent_events_limit(query: Option<&str>, default_limit: usize, max_limit: usize) -> usize {
let Some(query) = query else {
return default_limit;
};
for pair in query.split('&') {
let mut split = pair.splitn(2, '=');
if split.next() == Some("limit")
&& let Some(raw) = split.next()
&& let Ok(parsed) = raw.parse::<usize>()
{
return parsed.clamp(1, max_limit);
}
}
default_limit
}
fn now_epoch_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}

534
src/api/runtime_min.rs Normal file
View File

@ -0,0 +1,534 @@
use std::collections::BTreeSet;
use std::time::{SystemTime, UNIX_EPOCH};
use serde::Serialize;
use crate::config::ProxyConfig;
use super::ApiShared;
const SOURCE_UNAVAILABLE_REASON: &str = "source_unavailable";
#[derive(Serialize)]
pub(super) struct SecurityWhitelistData {
pub(super) generated_at_epoch_secs: u64,
pub(super) enabled: bool,
pub(super) entries_total: usize,
pub(super) entries: Vec<String>,
}
#[derive(Serialize)]
pub(super) struct RuntimeMePoolStateGenerationData {
pub(super) active_generation: u64,
pub(super) warm_generation: u64,
pub(super) pending_hardswap_generation: u64,
pub(super) pending_hardswap_age_secs: Option<u64>,
pub(super) draining_generations: Vec<u64>,
}
#[derive(Serialize)]
pub(super) struct RuntimeMePoolStateHardswapData {
pub(super) enabled: bool,
pub(super) pending: bool,
}
#[derive(Serialize)]
pub(super) struct RuntimeMePoolStateWriterContourData {
pub(super) warm: usize,
pub(super) active: usize,
pub(super) draining: usize,
}
#[derive(Serialize)]
pub(super) struct RuntimeMePoolStateWriterHealthData {
pub(super) healthy: usize,
pub(super) degraded: usize,
pub(super) draining: usize,
}
#[derive(Serialize)]
pub(super) struct RuntimeMePoolStateWriterData {
pub(super) total: usize,
pub(super) alive_non_draining: usize,
pub(super) draining: usize,
pub(super) degraded: usize,
pub(super) contour: RuntimeMePoolStateWriterContourData,
pub(super) health: RuntimeMePoolStateWriterHealthData,
}
#[derive(Serialize)]
pub(super) struct RuntimeMePoolStateRefillDcData {
pub(super) dc: i16,
pub(super) family: &'static str,
pub(super) inflight: usize,
}
#[derive(Serialize)]
pub(super) struct RuntimeMePoolStateRefillData {
pub(super) inflight_endpoints_total: usize,
pub(super) inflight_dc_total: usize,
pub(super) by_dc: Vec<RuntimeMePoolStateRefillDcData>,
}
#[derive(Serialize)]
pub(super) struct RuntimeMePoolStatePayload {
pub(super) generations: RuntimeMePoolStateGenerationData,
pub(super) hardswap: RuntimeMePoolStateHardswapData,
pub(super) writers: RuntimeMePoolStateWriterData,
pub(super) refill: RuntimeMePoolStateRefillData,
}
#[derive(Serialize)]
pub(super) struct RuntimeMePoolStateData {
pub(super) enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) reason: Option<&'static str>,
pub(super) generated_at_epoch_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) data: Option<RuntimeMePoolStatePayload>,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityCountersData {
pub(super) idle_close_by_peer_total: u64,
pub(super) reader_eof_total: u64,
pub(super) kdf_drift_total: u64,
pub(super) kdf_port_only_drift_total: u64,
pub(super) reconnect_attempt_total: u64,
pub(super) reconnect_success_total: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityRouteDropData {
pub(super) no_conn_total: u64,
pub(super) channel_closed_total: u64,
pub(super) queue_full_total: u64,
pub(super) queue_full_base_total: u64,
pub(super) queue_full_high_total: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityDcRttData {
pub(super) dc: i16,
pub(super) rtt_ema_ms: Option<f64>,
pub(super) alive_writers: usize,
pub(super) required_writers: usize,
pub(super) coverage_pct: f64,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityPayload {
pub(super) counters: RuntimeMeQualityCountersData,
pub(super) route_drops: RuntimeMeQualityRouteDropData,
pub(super) dc_rtt: Vec<RuntimeMeQualityDcRttData>,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityData {
pub(super) enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) reason: Option<&'static str>,
pub(super) generated_at_epoch_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) data: Option<RuntimeMeQualityPayload>,
}
#[derive(Serialize)]
pub(super) struct RuntimeUpstreamQualityPolicyData {
pub(super) connect_retry_attempts: u32,
pub(super) connect_retry_backoff_ms: u64,
pub(super) connect_budget_ms: u64,
pub(super) unhealthy_fail_threshold: u32,
pub(super) connect_failfast_hard_errors: bool,
}
#[derive(Serialize)]
pub(super) struct RuntimeUpstreamQualityCountersData {
pub(super) connect_attempt_total: u64,
pub(super) connect_success_total: u64,
pub(super) connect_fail_total: u64,
pub(super) connect_failfast_hard_error_total: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeUpstreamQualitySummaryData {
pub(super) configured_total: usize,
pub(super) healthy_total: usize,
pub(super) unhealthy_total: usize,
pub(super) direct_total: usize,
pub(super) socks4_total: usize,
pub(super) socks5_total: usize,
}
#[derive(Serialize)]
pub(super) struct RuntimeUpstreamQualityDcData {
pub(super) dc: i16,
pub(super) latency_ema_ms: Option<f64>,
pub(super) ip_preference: &'static str,
}
#[derive(Serialize)]
pub(super) struct RuntimeUpstreamQualityUpstreamData {
pub(super) upstream_id: usize,
pub(super) route_kind: &'static str,
pub(super) address: String,
pub(super) weight: u16,
pub(super) scopes: String,
pub(super) healthy: bool,
pub(super) fails: u32,
pub(super) last_check_age_secs: u64,
pub(super) effective_latency_ms: Option<f64>,
pub(super) dc: Vec<RuntimeUpstreamQualityDcData>,
}
#[derive(Serialize)]
pub(super) struct RuntimeUpstreamQualityData {
pub(super) enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) reason: Option<&'static str>,
pub(super) generated_at_epoch_secs: u64,
pub(super) policy: RuntimeUpstreamQualityPolicyData,
pub(super) counters: RuntimeUpstreamQualityCountersData,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) summary: Option<RuntimeUpstreamQualitySummaryData>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) upstreams: Option<Vec<RuntimeUpstreamQualityUpstreamData>>,
}
#[derive(Serialize)]
pub(super) struct RuntimeNatStunReflectionData {
pub(super) addr: String,
pub(super) age_secs: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeNatStunFlagsData {
pub(super) nat_probe_enabled: bool,
pub(super) nat_probe_disabled_runtime: bool,
pub(super) nat_probe_attempts: u8,
}
#[derive(Serialize)]
pub(super) struct RuntimeNatStunServersData {
pub(super) configured: Vec<String>,
pub(super) live: Vec<String>,
pub(super) live_total: usize,
}
#[derive(Serialize)]
pub(super) struct RuntimeNatStunReflectionBlockData {
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) v4: Option<RuntimeNatStunReflectionData>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) v6: Option<RuntimeNatStunReflectionData>,
}
#[derive(Serialize)]
pub(super) struct RuntimeNatStunPayload {
pub(super) flags: RuntimeNatStunFlagsData,
pub(super) servers: RuntimeNatStunServersData,
pub(super) reflection: RuntimeNatStunReflectionBlockData,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) stun_backoff_remaining_ms: Option<u64>,
}
#[derive(Serialize)]
pub(super) struct RuntimeNatStunData {
pub(super) enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) reason: Option<&'static str>,
pub(super) generated_at_epoch_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) data: Option<RuntimeNatStunPayload>,
}
pub(super) fn build_security_whitelist_data(cfg: &ProxyConfig) -> SecurityWhitelistData {
let entries = cfg
.server
.api
.whitelist
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
SecurityWhitelistData {
generated_at_epoch_secs: now_epoch_secs(),
enabled: !entries.is_empty(),
entries_total: entries.len(),
entries,
}
}
pub(super) async fn build_runtime_me_pool_state_data(shared: &ApiShared) -> RuntimeMePoolStateData {
let now_epoch_secs = now_epoch_secs();
let Some(pool) = &shared.me_pool else {
return RuntimeMePoolStateData {
enabled: false,
reason: Some(SOURCE_UNAVAILABLE_REASON),
generated_at_epoch_secs: now_epoch_secs,
data: None,
};
};
let status = pool.api_status_snapshot().await;
let runtime = pool.api_runtime_snapshot().await;
let refill = pool.api_refill_snapshot().await;
let mut draining_generations = BTreeSet::<u64>::new();
let mut contour_warm = 0usize;
let mut contour_active = 0usize;
let mut contour_draining = 0usize;
let mut draining = 0usize;
let mut degraded = 0usize;
let mut healthy = 0usize;
for writer in &status.writers {
if writer.draining {
draining_generations.insert(writer.generation);
draining += 1;
}
if writer.degraded && !writer.draining {
degraded += 1;
}
if !writer.degraded && !writer.draining {
healthy += 1;
}
match writer.state {
"warm" => contour_warm += 1,
"active" => contour_active += 1,
_ => contour_draining += 1,
}
}
RuntimeMePoolStateData {
enabled: true,
reason: None,
generated_at_epoch_secs: status.generated_at_epoch_secs,
data: Some(RuntimeMePoolStatePayload {
generations: RuntimeMePoolStateGenerationData {
active_generation: runtime.active_generation,
warm_generation: runtime.warm_generation,
pending_hardswap_generation: runtime.pending_hardswap_generation,
pending_hardswap_age_secs: runtime.pending_hardswap_age_secs,
draining_generations: draining_generations.into_iter().collect(),
},
hardswap: RuntimeMePoolStateHardswapData {
enabled: runtime.hardswap_enabled,
pending: runtime.pending_hardswap_generation != 0,
},
writers: RuntimeMePoolStateWriterData {
total: status.writers.len(),
alive_non_draining: status.writers.len().saturating_sub(draining),
draining,
degraded,
contour: RuntimeMePoolStateWriterContourData {
warm: contour_warm,
active: contour_active,
draining: contour_draining,
},
health: RuntimeMePoolStateWriterHealthData {
healthy,
degraded,
draining,
},
},
refill: RuntimeMePoolStateRefillData {
inflight_endpoints_total: refill.inflight_endpoints_total,
inflight_dc_total: refill.inflight_dc_total,
by_dc: refill
.by_dc
.into_iter()
.map(|entry| RuntimeMePoolStateRefillDcData {
dc: entry.dc,
family: entry.family,
inflight: entry.inflight,
})
.collect(),
},
}),
}
}
pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> RuntimeMeQualityData {
let now_epoch_secs = now_epoch_secs();
let Some(pool) = &shared.me_pool else {
return RuntimeMeQualityData {
enabled: false,
reason: Some(SOURCE_UNAVAILABLE_REASON),
generated_at_epoch_secs: now_epoch_secs,
data: None,
};
};
let status = pool.api_status_snapshot().await;
RuntimeMeQualityData {
enabled: true,
reason: None,
generated_at_epoch_secs: status.generated_at_epoch_secs,
data: Some(RuntimeMeQualityPayload {
counters: RuntimeMeQualityCountersData {
idle_close_by_peer_total: shared.stats.get_me_idle_close_by_peer_total(),
reader_eof_total: shared.stats.get_me_reader_eof_total(),
kdf_drift_total: shared.stats.get_me_kdf_drift_total(),
kdf_port_only_drift_total: shared.stats.get_me_kdf_port_only_drift_total(),
reconnect_attempt_total: shared.stats.get_me_reconnect_attempts(),
reconnect_success_total: shared.stats.get_me_reconnect_success(),
},
route_drops: RuntimeMeQualityRouteDropData {
no_conn_total: shared.stats.get_me_route_drop_no_conn(),
channel_closed_total: shared.stats.get_me_route_drop_channel_closed(),
queue_full_total: shared.stats.get_me_route_drop_queue_full(),
queue_full_base_total: shared.stats.get_me_route_drop_queue_full_base(),
queue_full_high_total: shared.stats.get_me_route_drop_queue_full_high(),
},
dc_rtt: status
.dcs
.into_iter()
.map(|dc| RuntimeMeQualityDcRttData {
dc: dc.dc,
rtt_ema_ms: dc.rtt_ms,
alive_writers: dc.alive_writers,
required_writers: dc.required_writers,
coverage_pct: dc.coverage_pct,
})
.collect(),
}),
}
}
pub(super) async fn build_runtime_upstream_quality_data(
shared: &ApiShared,
) -> RuntimeUpstreamQualityData {
let generated_at_epoch_secs = now_epoch_secs();
let policy = shared.upstream_manager.api_policy_snapshot();
let counters = RuntimeUpstreamQualityCountersData {
connect_attempt_total: shared.stats.get_upstream_connect_attempt_total(),
connect_success_total: shared.stats.get_upstream_connect_success_total(),
connect_fail_total: shared.stats.get_upstream_connect_fail_total(),
connect_failfast_hard_error_total: shared.stats.get_upstream_connect_failfast_hard_error_total(),
};
let Some(snapshot) = shared.upstream_manager.try_api_snapshot() else {
return RuntimeUpstreamQualityData {
enabled: false,
reason: Some(SOURCE_UNAVAILABLE_REASON),
generated_at_epoch_secs,
policy: RuntimeUpstreamQualityPolicyData {
connect_retry_attempts: policy.connect_retry_attempts,
connect_retry_backoff_ms: policy.connect_retry_backoff_ms,
connect_budget_ms: policy.connect_budget_ms,
unhealthy_fail_threshold: policy.unhealthy_fail_threshold,
connect_failfast_hard_errors: policy.connect_failfast_hard_errors,
},
counters,
summary: None,
upstreams: None,
};
};
RuntimeUpstreamQualityData {
enabled: true,
reason: None,
generated_at_epoch_secs,
policy: RuntimeUpstreamQualityPolicyData {
connect_retry_attempts: policy.connect_retry_attempts,
connect_retry_backoff_ms: policy.connect_retry_backoff_ms,
connect_budget_ms: policy.connect_budget_ms,
unhealthy_fail_threshold: policy.unhealthy_fail_threshold,
connect_failfast_hard_errors: policy.connect_failfast_hard_errors,
},
counters,
summary: Some(RuntimeUpstreamQualitySummaryData {
configured_total: snapshot.summary.configured_total,
healthy_total: snapshot.summary.healthy_total,
unhealthy_total: snapshot.summary.unhealthy_total,
direct_total: snapshot.summary.direct_total,
socks4_total: snapshot.summary.socks4_total,
socks5_total: snapshot.summary.socks5_total,
}),
upstreams: Some(
snapshot
.upstreams
.into_iter()
.map(|upstream| RuntimeUpstreamQualityUpstreamData {
upstream_id: upstream.upstream_id,
route_kind: match upstream.route_kind {
crate::transport::UpstreamRouteKind::Direct => "direct",
crate::transport::UpstreamRouteKind::Socks4 => "socks4",
crate::transport::UpstreamRouteKind::Socks5 => "socks5",
},
address: upstream.address,
weight: upstream.weight,
scopes: upstream.scopes,
healthy: upstream.healthy,
fails: upstream.fails,
last_check_age_secs: upstream.last_check_age_secs,
effective_latency_ms: upstream.effective_latency_ms,
dc: upstream
.dc
.into_iter()
.map(|dc| RuntimeUpstreamQualityDcData {
dc: dc.dc,
latency_ema_ms: dc.latency_ema_ms,
ip_preference: match dc.ip_preference {
crate::transport::upstream::IpPreference::Unknown => "unknown",
crate::transport::upstream::IpPreference::PreferV6 => "prefer_v6",
crate::transport::upstream::IpPreference::PreferV4 => "prefer_v4",
crate::transport::upstream::IpPreference::BothWork => "both_work",
crate::transport::upstream::IpPreference::Unavailable => "unavailable",
},
})
.collect(),
})
.collect(),
),
}
}
pub(super) async fn build_runtime_nat_stun_data(shared: &ApiShared) -> RuntimeNatStunData {
let now_epoch_secs = now_epoch_secs();
let Some(pool) = &shared.me_pool else {
return RuntimeNatStunData {
enabled: false,
reason: Some(SOURCE_UNAVAILABLE_REASON),
generated_at_epoch_secs: now_epoch_secs,
data: None,
};
};
let snapshot = pool.api_nat_stun_snapshot().await;
RuntimeNatStunData {
enabled: true,
reason: None,
generated_at_epoch_secs: now_epoch_secs,
data: Some(RuntimeNatStunPayload {
flags: RuntimeNatStunFlagsData {
nat_probe_enabled: snapshot.nat_probe_enabled,
nat_probe_disabled_runtime: snapshot.nat_probe_disabled_runtime,
nat_probe_attempts: snapshot.nat_probe_attempts,
},
servers: RuntimeNatStunServersData {
configured: snapshot.configured_servers,
live: snapshot.live_servers.clone(),
live_total: snapshot.live_servers.len(),
},
reflection: RuntimeNatStunReflectionBlockData {
v4: snapshot.reflection_v4.map(|entry| RuntimeNatStunReflectionData {
addr: entry.addr.to_string(),
age_secs: entry.age_secs,
}),
v6: snapshot.reflection_v6.map(|entry| RuntimeNatStunReflectionData {
addr: entry.addr.to_string(),
age_secs: entry.age_secs,
}),
},
stun_backoff_remaining_ms: snapshot.stun_backoff_remaining_ms,
}),
}
}
fn now_epoch_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}

66
src/api/runtime_watch.rs Normal file
View File

@ -0,0 +1,66 @@
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::watch;
use crate::config::ProxyConfig;
use super::ApiRuntimeState;
use super::events::ApiEventStore;
pub(super) fn spawn_runtime_watchers(
config_rx: watch::Receiver<Arc<ProxyConfig>>,
admission_rx: watch::Receiver<bool>,
runtime_state: Arc<ApiRuntimeState>,
runtime_events: Arc<ApiEventStore>,
) {
let mut config_rx_reload = config_rx;
let runtime_state_reload = runtime_state.clone();
let runtime_events_reload = runtime_events.clone();
tokio::spawn(async move {
loop {
if config_rx_reload.changed().await.is_err() {
break;
}
runtime_state_reload
.config_reload_count
.fetch_add(1, Ordering::Relaxed);
runtime_state_reload
.last_config_reload_epoch_secs
.store(now_epoch_secs(), Ordering::Relaxed);
runtime_events_reload.record("config.reload.applied", "config receiver updated");
}
});
let mut admission_rx_watch = admission_rx;
tokio::spawn(async move {
runtime_state
.admission_open
.store(*admission_rx_watch.borrow(), Ordering::Relaxed);
runtime_events.record(
"admission.state",
format!("accepting_new_connections={}", *admission_rx_watch.borrow()),
);
loop {
if admission_rx_watch.changed().await.is_err() {
break;
}
let admission_open = *admission_rx_watch.borrow();
runtime_state
.admission_open
.store(admission_open, Ordering::Relaxed);
runtime_events.record(
"admission.state",
format!("accepting_new_connections={}", admission_open),
);
}
});
}
fn now_epoch_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}

View File

@ -114,6 +114,11 @@ pub(crate) fn default_api_minimal_runtime_cache_ttl_ms() -> u64 {
1000 1000
} }
pub(crate) fn default_api_runtime_edge_enabled() -> bool { false }
pub(crate) fn default_api_runtime_edge_cache_ttl_ms() -> u64 { 1000 }
pub(crate) fn default_api_runtime_edge_top_n() -> usize { 10 }
pub(crate) fn default_api_runtime_edge_events_capacity() -> usize { 256 }
pub(crate) fn default_proxy_protocol_header_timeout_ms() -> u64 { pub(crate) fn default_proxy_protocol_header_timeout_ms() -> u64 {
500 500
} }

View File

@ -312,6 +312,12 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
|| old.server.api.minimal_runtime_enabled != new.server.api.minimal_runtime_enabled || old.server.api.minimal_runtime_enabled != new.server.api.minimal_runtime_enabled
|| old.server.api.minimal_runtime_cache_ttl_ms || old.server.api.minimal_runtime_cache_ttl_ms
!= new.server.api.minimal_runtime_cache_ttl_ms != new.server.api.minimal_runtime_cache_ttl_ms
|| old.server.api.runtime_edge_enabled != new.server.api.runtime_edge_enabled
|| old.server.api.runtime_edge_cache_ttl_ms
!= new.server.api.runtime_edge_cache_ttl_ms
|| old.server.api.runtime_edge_top_n != new.server.api.runtime_edge_top_n
|| old.server.api.runtime_edge_events_capacity
!= new.server.api.runtime_edge_events_capacity
|| old.server.api.read_only != new.server.api.read_only || old.server.api.read_only != new.server.api.read_only
{ {
warned = true; warned = true;

View File

@ -462,6 +462,24 @@ impl ProxyConfig {
)); ));
} }
if config.server.api.runtime_edge_cache_ttl_ms > 60_000 {
return Err(ProxyError::Config(
"server.api.runtime_edge_cache_ttl_ms must be within [0, 60000]".to_string(),
));
}
if !(1..=1000).contains(&config.server.api.runtime_edge_top_n) {
return Err(ProxyError::Config(
"server.api.runtime_edge_top_n must be within [1, 1000]".to_string(),
));
}
if !(16..=4096).contains(&config.server.api.runtime_edge_events_capacity) {
return Err(ProxyError::Config(
"server.api.runtime_edge_events_capacity must be within [16, 4096]".to_string(),
));
}
if config.server.api.listen.parse::<SocketAddr>().is_err() { if config.server.api.listen.parse::<SocketAddr>().is_err() {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"server.api.listen must be in IP:PORT format".to_string(), "server.api.listen must be in IP:PORT format".to_string(),
@ -802,6 +820,22 @@ mod tests {
cfg.server.api.minimal_runtime_cache_ttl_ms, cfg.server.api.minimal_runtime_cache_ttl_ms,
default_api_minimal_runtime_cache_ttl_ms() default_api_minimal_runtime_cache_ttl_ms()
); );
assert_eq!(
cfg.server.api.runtime_edge_enabled,
default_api_runtime_edge_enabled()
);
assert_eq!(
cfg.server.api.runtime_edge_cache_ttl_ms,
default_api_runtime_edge_cache_ttl_ms()
);
assert_eq!(
cfg.server.api.runtime_edge_top_n,
default_api_runtime_edge_top_n()
);
assert_eq!(
cfg.server.api.runtime_edge_events_capacity,
default_api_runtime_edge_events_capacity()
);
assert_eq!(cfg.access.users, default_access_users()); assert_eq!(cfg.access.users, default_access_users());
assert_eq!( assert_eq!(
cfg.access.user_max_unique_ips_mode, cfg.access.user_max_unique_ips_mode,
@ -918,6 +952,22 @@ mod tests {
server.api.minimal_runtime_cache_ttl_ms, server.api.minimal_runtime_cache_ttl_ms,
default_api_minimal_runtime_cache_ttl_ms() default_api_minimal_runtime_cache_ttl_ms()
); );
assert_eq!(
server.api.runtime_edge_enabled,
default_api_runtime_edge_enabled()
);
assert_eq!(
server.api.runtime_edge_cache_ttl_ms,
default_api_runtime_edge_cache_ttl_ms()
);
assert_eq!(
server.api.runtime_edge_top_n,
default_api_runtime_edge_top_n()
);
assert_eq!(
server.api.runtime_edge_events_capacity,
default_api_runtime_edge_events_capacity()
);
let access = AccessConfig::default(); let access = AccessConfig::default();
assert_eq!(access.users, default_access_users()); assert_eq!(access.users, default_access_users());
@ -1565,6 +1615,72 @@ mod tests {
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }
#[test]
fn api_runtime_edge_cache_ttl_out_of_range_is_rejected() {
let toml = r#"
[server.api]
enabled = true
listen = "127.0.0.1:9091"
runtime_edge_cache_ttl_ms = 70000
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_api_runtime_edge_cache_ttl_invalid_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("server.api.runtime_edge_cache_ttl_ms must be within [0, 60000]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn api_runtime_edge_top_n_out_of_range_is_rejected() {
let toml = r#"
[server.api]
enabled = true
listen = "127.0.0.1:9091"
runtime_edge_top_n = 0
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_api_runtime_edge_top_n_invalid_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("server.api.runtime_edge_top_n must be within [1, 1000]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn api_runtime_edge_events_capacity_out_of_range_is_rejected() {
let toml = r#"
[server.api]
enabled = true
listen = "127.0.0.1:9091"
runtime_edge_events_capacity = 8
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_api_runtime_edge_events_capacity_invalid_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("server.api.runtime_edge_events_capacity must be within [16, 4096]"));
let _ = std::fs::remove_file(path);
}
#[test] #[test]
fn force_close_bumped_when_below_drain_ttl() { fn force_close_bumped_when_below_drain_ttl() {
let toml = r#" let toml = r#"

View File

@ -918,6 +918,22 @@ pub struct ApiConfig {
#[serde(default = "default_api_minimal_runtime_cache_ttl_ms")] #[serde(default = "default_api_minimal_runtime_cache_ttl_ms")]
pub minimal_runtime_cache_ttl_ms: u64, pub minimal_runtime_cache_ttl_ms: u64,
/// Enables runtime edge endpoints with optional cached aggregation.
#[serde(default = "default_api_runtime_edge_enabled")]
pub runtime_edge_enabled: bool,
/// Cache TTL for runtime edge aggregation payloads in milliseconds.
#[serde(default = "default_api_runtime_edge_cache_ttl_ms")]
pub runtime_edge_cache_ttl_ms: u64,
/// Top-N limit for edge connection leaderboard payloads.
#[serde(default = "default_api_runtime_edge_top_n")]
pub runtime_edge_top_n: usize,
/// Ring-buffer capacity for runtime edge control-plane events.
#[serde(default = "default_api_runtime_edge_events_capacity")]
pub runtime_edge_events_capacity: usize,
/// Read-only mode: mutating endpoints are rejected. /// Read-only mode: mutating endpoints are rejected.
#[serde(default)] #[serde(default)]
pub read_only: bool, pub read_only: bool,
@ -933,6 +949,10 @@ impl Default for ApiConfig {
request_body_limit_bytes: default_api_request_body_limit_bytes(), request_body_limit_bytes: default_api_request_body_limit_bytes(),
minimal_runtime_enabled: default_api_minimal_runtime_enabled(), minimal_runtime_enabled: default_api_minimal_runtime_enabled(),
minimal_runtime_cache_ttl_ms: default_api_minimal_runtime_cache_ttl_ms(), minimal_runtime_cache_ttl_ms: default_api_minimal_runtime_cache_ttl_ms(),
runtime_edge_enabled: default_api_runtime_edge_enabled(),
runtime_edge_cache_ttl_ms: default_api_runtime_edge_cache_ttl_ms(),
runtime_edge_top_n: default_api_runtime_edge_top_n(),
runtime_edge_events_capacity: default_api_runtime_edge_events_capacity(),
read_only: false, read_only: false,
} }
} }

View File

@ -57,6 +57,7 @@ where
stats.increment_user_connects(user); stats.increment_user_connects(user);
stats.increment_user_curr_connects(user); stats.increment_user_curr_connects(user);
stats.increment_current_connections_direct();
let relay_result = relay_bidirectional( let relay_result = relay_bidirectional(
client_reader, client_reader,
@ -69,6 +70,7 @@ where
) )
.await; .await;
stats.decrement_current_connections_direct();
stats.decrement_user_curr_connects(user); stats.decrement_user_curr_connects(user);
match &relay_result { match &relay_result {

View File

@ -237,6 +237,7 @@ where
stats.increment_user_connects(&user); stats.increment_user_connects(&user);
stats.increment_user_curr_connects(&user); stats.increment_user_curr_connects(&user);
stats.increment_current_connections_me();
// Per-user ad_tag from access.user_ad_tags; fallback to general.ad_tag (hot-reloadable) // Per-user ad_tag from access.user_ad_tags; fallback to general.ad_tag (hot-reloadable)
let user_tag: Option<Vec<u8>> = config let user_tag: Option<Vec<u8>> = config
@ -466,6 +467,7 @@ where
"ME relay cleanup" "ME relay cleanup"
); );
me_pool.registry().unregister(conn_id).await; me_pool.registry().unregister(conn_id).await;
stats.decrement_current_connections_me();
stats.decrement_user_curr_connects(&user); stats.decrement_user_curr_connects(&user);
result result
} }

View File

@ -25,6 +25,8 @@ use self::telemetry::TelemetryPolicy;
pub struct Stats { pub struct Stats {
connects_all: AtomicU64, connects_all: AtomicU64,
connects_bad: AtomicU64, connects_bad: AtomicU64,
current_connections_direct: AtomicU64,
current_connections_me: AtomicU64,
handshake_timeouts: AtomicU64, handshake_timeouts: AtomicU64,
upstream_connect_attempt_total: AtomicU64, upstream_connect_attempt_total: AtomicU64,
upstream_connect_success_total: AtomicU64, upstream_connect_success_total: AtomicU64,
@ -150,6 +152,24 @@ impl Stats {
self.telemetry_me_level().allows_debug() self.telemetry_me_level().allows_debug()
} }
fn decrement_atomic_saturating(counter: &AtomicU64) {
let mut current = counter.load(Ordering::Relaxed);
loop {
if current == 0 {
break;
}
match counter.compare_exchange_weak(
current,
current - 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
pub fn apply_telemetry_policy(&self, policy: TelemetryPolicy) { pub fn apply_telemetry_policy(&self, policy: TelemetryPolicy) {
self.telemetry_core_enabled self.telemetry_core_enabled
.store(policy.core_enabled, Ordering::Relaxed); .store(policy.core_enabled, Ordering::Relaxed);
@ -177,6 +197,18 @@ impl Stats {
self.connects_bad.fetch_add(1, Ordering::Relaxed); self.connects_bad.fetch_add(1, Ordering::Relaxed);
} }
} }
pub fn increment_current_connections_direct(&self) {
self.current_connections_direct.fetch_add(1, Ordering::Relaxed);
}
pub fn decrement_current_connections_direct(&self) {
Self::decrement_atomic_saturating(&self.current_connections_direct);
}
pub fn increment_current_connections_me(&self) {
self.current_connections_me.fetch_add(1, Ordering::Relaxed);
}
pub fn decrement_current_connections_me(&self) {
Self::decrement_atomic_saturating(&self.current_connections_me);
}
pub fn increment_handshake_timeouts(&self) { pub fn increment_handshake_timeouts(&self) {
if self.telemetry_core_enabled() { if self.telemetry_core_enabled() {
self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); self.handshake_timeouts.fetch_add(1, Ordering::Relaxed);
@ -646,6 +678,16 @@ impl Stats {
} }
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
pub fn get_current_connections_direct(&self) -> u64 {
self.current_connections_direct.load(Ordering::Relaxed)
}
pub fn get_current_connections_me(&self) -> u64 {
self.current_connections_me.load(Ordering::Relaxed)
}
pub fn get_current_connections_total(&self) -> u64 {
self.get_current_connections_direct()
.saturating_add(self.get_current_connections_me())
}
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) } pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) }
pub fn get_me_keepalive_pong(&self) -> u64 { self.me_keepalive_pong.load(Ordering::Relaxed) } pub fn get_me_keepalive_pong(&self) -> u64 { self.me_keepalive_pong.load(Ordering::Relaxed) }

View File

@ -10,6 +10,7 @@ mod pool_init;
mod pool_nat; mod pool_nat;
mod pool_refill; mod pool_refill;
mod pool_reinit; mod pool_reinit;
mod pool_runtime_api;
mod pool_writer; mod pool_writer;
mod ping; mod ping;
mod reader; mod reader;

View File

@ -0,0 +1,128 @@
use std::collections::HashMap;
use std::time::Instant;
use super::pool::{MePool, RefillDcKey};
use crate::network::IpFamily;
#[derive(Clone, Debug)]
pub(crate) struct MeApiRefillDcSnapshot {
pub dc: i16,
pub family: &'static str,
pub inflight: usize,
}
#[derive(Clone, Debug)]
pub(crate) struct MeApiRefillSnapshot {
pub inflight_endpoints_total: usize,
pub inflight_dc_total: usize,
pub by_dc: Vec<MeApiRefillDcSnapshot>,
}
#[derive(Clone, Debug)]
pub(crate) struct MeApiNatReflectionSnapshot {
pub addr: std::net::SocketAddr,
pub age_secs: u64,
}
#[derive(Clone, Debug)]
pub(crate) struct MeApiNatStunSnapshot {
pub nat_probe_enabled: bool,
pub nat_probe_disabled_runtime: bool,
pub nat_probe_attempts: u8,
pub configured_servers: Vec<String>,
pub live_servers: Vec<String>,
pub reflection_v4: Option<MeApiNatReflectionSnapshot>,
pub reflection_v6: Option<MeApiNatReflectionSnapshot>,
pub stun_backoff_remaining_ms: Option<u64>,
}
impl MePool {
pub(crate) async fn api_refill_snapshot(&self) -> MeApiRefillSnapshot {
let inflight_endpoints_total = self.refill_inflight.lock().await.len();
let inflight_dc_keys = self
.refill_inflight_dc
.lock()
.await
.iter()
.copied()
.collect::<Vec<RefillDcKey>>();
let mut by_dc_map = HashMap::<(i16, &'static str), usize>::new();
for key in inflight_dc_keys {
let family = match key.family {
IpFamily::V4 => "v4",
IpFamily::V6 => "v6",
};
let dc = key.dc as i16;
*by_dc_map.entry((dc, family)).or_insert(0) += 1;
}
let mut by_dc = by_dc_map
.into_iter()
.map(|((dc, family), inflight)| MeApiRefillDcSnapshot {
dc,
family,
inflight,
})
.collect::<Vec<_>>();
by_dc.sort_by_key(|entry| (entry.dc, entry.family));
MeApiRefillSnapshot {
inflight_endpoints_total,
inflight_dc_total: by_dc.len(),
by_dc,
}
}
pub(crate) async fn api_nat_stun_snapshot(&self) -> MeApiNatStunSnapshot {
let now = Instant::now();
let mut configured_servers = if !self.nat_stun_servers.is_empty() {
self.nat_stun_servers.clone()
} else if let Some(stun) = &self.nat_stun {
if stun.trim().is_empty() {
Vec::new()
} else {
vec![stun.clone()]
}
} else {
Vec::new()
};
configured_servers.sort();
configured_servers.dedup();
let mut live_servers = self.nat_stun_live_servers.read().await.clone();
live_servers.sort();
live_servers.dedup();
let reflection = self.nat_reflection_cache.lock().await;
let reflection_v4 = reflection.v4.map(|(ts, addr)| MeApiNatReflectionSnapshot {
addr,
age_secs: now.saturating_duration_since(ts).as_secs(),
});
let reflection_v6 = reflection.v6.map(|(ts, addr)| MeApiNatReflectionSnapshot {
addr,
age_secs: now.saturating_duration_since(ts).as_secs(),
});
drop(reflection);
let backoff_until = *self.stun_backoff_until.read().await;
let stun_backoff_remaining_ms = backoff_until.and_then(|until| {
(until > now).then_some(until.duration_since(now).as_millis() as u64)
});
MeApiNatStunSnapshot {
nat_probe_enabled: self.nat_probe,
nat_probe_disabled_runtime: self
.nat_probe_disabled
.load(std::sync::atomic::Ordering::Relaxed),
nat_probe_attempts: self
.nat_probe_attempts
.load(std::sync::atomic::Ordering::Relaxed),
configured_servers,
live_servers,
reflection_v4,
reflection_v6,
stun_backoff_remaining_ms,
}
}
}

View File

@ -202,6 +202,15 @@ pub struct UpstreamApiSnapshot {
pub upstreams: Vec<UpstreamApiItemSnapshot>, pub upstreams: Vec<UpstreamApiItemSnapshot>,
} }
#[derive(Debug, Clone, Copy)]
pub struct UpstreamApiPolicySnapshot {
pub connect_retry_attempts: u32,
pub connect_retry_backoff_ms: u64,
pub connect_budget_ms: u64,
pub unhealthy_fail_threshold: u32,
pub connect_failfast_hard_errors: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct UpstreamEgressInfo { pub struct UpstreamEgressInfo {
pub route_kind: UpstreamRouteKind, pub route_kind: UpstreamRouteKind,
@ -315,6 +324,16 @@ impl UpstreamManager {
Some(UpstreamApiSnapshot { summary, upstreams }) Some(UpstreamApiSnapshot { summary, upstreams })
} }
pub fn api_policy_snapshot(&self) -> UpstreamApiPolicySnapshot {
UpstreamApiPolicySnapshot {
connect_retry_attempts: self.connect_retry_attempts,
connect_retry_backoff_ms: self.connect_retry_backoff.as_millis() as u64,
connect_budget_ms: self.connect_budget.as_millis() as u64,
unhealthy_fail_threshold: self.unhealthy_fail_threshold,
connect_failfast_hard_errors: self.connect_failfast_hard_errors,
}
}
#[cfg(unix)] #[cfg(unix)]
fn resolve_interface_addrs(name: &str, want_ipv6: bool) -> Vec<IpAddr> { fn resolve_interface_addrs(name: &str, want_ipv6: bool) -> Vec<IpAddr> {
use nix::ifaddrs::getifaddrs; use nix::ifaddrs::getifaddrs;