mirror of
https://github.com/telemt/telemt.git
synced 2026-04-18 19:14:09 +03:00
API Zero
Added new endpoints: - GET /v1/system/info - GET /v1/runtime/gates - GET /v1/limits/effective - GET /v1/security/posture Added API runtime state without impacting the hot path: - config_reload_count - last_config_reload_epoch_secs - admission_open - process_started_at_epoch_secs Added background watcher tasks in api::serve: - configuration reload tracking - admission gate state tracking
This commit is contained in:
@@ -2,7 +2,8 @@ use std::convert::Infallible;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use http_body_util::{BodyExt, Full};
|
||||
use hyper::body::{Bytes, Incoming};
|
||||
@@ -25,6 +26,7 @@ use crate::transport::UpstreamManager;
|
||||
mod config_store;
|
||||
mod model;
|
||||
mod runtime_stats;
|
||||
mod runtime_zero;
|
||||
mod users;
|
||||
|
||||
use config_store::{current_revision, parse_if_match};
|
||||
@@ -36,8 +38,19 @@ use runtime_stats::{
|
||||
MinimalCacheEntry, build_dcs_data, build_me_writers_data, build_minimal_all_data,
|
||||
build_upstreams_data, build_zero_all_data,
|
||||
};
|
||||
use runtime_zero::{
|
||||
build_limits_effective_data, build_runtime_gates_data, build_security_posture_data,
|
||||
build_system_info_data,
|
||||
};
|
||||
use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config};
|
||||
|
||||
pub(super) struct ApiRuntimeState {
|
||||
pub(super) process_started_at_epoch_secs: u64,
|
||||
pub(super) config_reload_count: AtomicU64,
|
||||
pub(super) last_config_reload_epoch_secs: AtomicU64,
|
||||
pub(super) admission_open: AtomicBool,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(super) struct ApiShared {
|
||||
pub(super) stats: Arc<Stats>,
|
||||
@@ -50,6 +63,7 @@ pub(super) struct ApiShared {
|
||||
pub(super) mutation_lock: Arc<Mutex<()>>,
|
||||
pub(super) minimal_cache: Arc<Mutex<Option<MinimalCacheEntry>>>,
|
||||
pub(super) request_id: Arc<AtomicU64>,
|
||||
pub(super) runtime_state: Arc<ApiRuntimeState>,
|
||||
}
|
||||
|
||||
impl ApiShared {
|
||||
@@ -65,9 +79,11 @@ pub async fn serve(
|
||||
me_pool: Option<Arc<MePool>>,
|
||||
upstream_manager: Arc<UpstreamManager>,
|
||||
config_rx: watch::Receiver<Arc<ProxyConfig>>,
|
||||
admission_rx: watch::Receiver<bool>,
|
||||
config_path: PathBuf,
|
||||
startup_detected_ip_v4: Option<IpAddr>,
|
||||
startup_detected_ip_v6: Option<IpAddr>,
|
||||
process_started_at_epoch_secs: u64,
|
||||
) {
|
||||
let listener = match TcpListener::bind(listen).await {
|
||||
Ok(listener) => listener,
|
||||
@@ -83,6 +99,13 @@ pub async fn serve(
|
||||
|
||||
info!("API endpoint: http://{}/v1/*", listen);
|
||||
|
||||
let runtime_state = Arc::new(ApiRuntimeState {
|
||||
process_started_at_epoch_secs,
|
||||
config_reload_count: AtomicU64::new(0),
|
||||
last_config_reload_epoch_secs: AtomicU64::new(0),
|
||||
admission_open: AtomicBool::new(*admission_rx.borrow()),
|
||||
});
|
||||
|
||||
let shared = Arc::new(ApiShared {
|
||||
stats,
|
||||
ip_tracker,
|
||||
@@ -94,6 +117,38 @@ pub async fn serve(
|
||||
mutation_lock: Arc::new(Mutex::new(())),
|
||||
minimal_cache: Arc::new(Mutex::new(None)),
|
||||
request_id: Arc::new(AtomicU64::new(1)),
|
||||
runtime_state: runtime_state.clone(),
|
||||
});
|
||||
|
||||
let mut config_rx_reload = config_rx.clone();
|
||||
let runtime_state_reload = runtime_state.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
if config_rx_reload.changed().await.is_err() {
|
||||
break;
|
||||
}
|
||||
runtime_state_reload
|
||||
.config_reload_count
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
runtime_state_reload
|
||||
.last_config_reload_epoch_secs
|
||||
.store(now_epoch_secs(), Ordering::Relaxed);
|
||||
}
|
||||
});
|
||||
|
||||
let mut admission_rx_watch = admission_rx.clone();
|
||||
tokio::spawn(async move {
|
||||
runtime_state
|
||||
.admission_open
|
||||
.store(*admission_rx_watch.borrow(), Ordering::Relaxed);
|
||||
loop {
|
||||
if admission_rx_watch.changed().await.is_err() {
|
||||
break;
|
||||
}
|
||||
runtime_state
|
||||
.admission_open
|
||||
.store(*admission_rx_watch.borrow(), Ordering::Relaxed);
|
||||
}
|
||||
});
|
||||
|
||||
loop {
|
||||
@@ -189,6 +244,26 @@ async fn handle(
|
||||
};
|
||||
Ok(success_response(StatusCode::OK, data, revision))
|
||||
}
|
||||
("GET", "/v1/system/info") => {
|
||||
let revision = current_revision(&shared.config_path).await?;
|
||||
let data = build_system_info_data(shared.as_ref(), cfg.as_ref(), &revision);
|
||||
Ok(success_response(StatusCode::OK, data, revision))
|
||||
}
|
||||
("GET", "/v1/runtime/gates") => {
|
||||
let revision = current_revision(&shared.config_path).await?;
|
||||
let data = build_runtime_gates_data(shared.as_ref(), cfg.as_ref());
|
||||
Ok(success_response(StatusCode::OK, data, revision))
|
||||
}
|
||||
("GET", "/v1/limits/effective") => {
|
||||
let revision = current_revision(&shared.config_path).await?;
|
||||
let data = build_limits_effective_data(cfg.as_ref());
|
||||
Ok(success_response(StatusCode::OK, data, revision))
|
||||
}
|
||||
("GET", "/v1/security/posture") => {
|
||||
let revision = current_revision(&shared.config_path).await?;
|
||||
let data = build_security_posture_data(cfg.as_ref());
|
||||
Ok(success_response(StatusCode::OK, data, revision))
|
||||
}
|
||||
("GET", "/v1/stats/summary") => {
|
||||
let revision = current_revision(&shared.config_path).await?;
|
||||
let data = SummaryData {
|
||||
@@ -441,3 +516,10 @@ async fn read_body_with_limit(body: Incoming, limit: usize) -> Result<Vec<u8>, A
|
||||
}
|
||||
Ok(collected)
|
||||
}
|
||||
|
||||
fn now_epoch_secs() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user