mirror of https://github.com/telemt/telemt.git
API Events + API as module
This commit is contained in:
parent
d87196c105
commit
4221230969
|
|
@ -0,0 +1,90 @@
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
|
#[derive(Clone, Serialize)]
|
||||||
|
pub(super) struct ApiEventRecord {
|
||||||
|
pub(super) seq: u64,
|
||||||
|
pub(super) ts_epoch_secs: u64,
|
||||||
|
pub(super) event_type: String,
|
||||||
|
pub(super) context: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Serialize)]
|
||||||
|
pub(super) struct ApiEventSnapshot {
|
||||||
|
pub(super) capacity: usize,
|
||||||
|
pub(super) dropped_total: u64,
|
||||||
|
pub(super) events: Vec<ApiEventRecord>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ApiEventsInner {
|
||||||
|
capacity: usize,
|
||||||
|
dropped_total: u64,
|
||||||
|
next_seq: u64,
|
||||||
|
events: VecDeque<ApiEventRecord>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bounded ring-buffer for control-plane API/runtime events.
|
||||||
|
pub(crate) struct ApiEventStore {
|
||||||
|
inner: Mutex<ApiEventsInner>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ApiEventStore {
|
||||||
|
pub(super) fn new(capacity: usize) -> Self {
|
||||||
|
let bounded = capacity.max(16);
|
||||||
|
Self {
|
||||||
|
inner: Mutex::new(ApiEventsInner {
|
||||||
|
capacity: bounded,
|
||||||
|
dropped_total: 0,
|
||||||
|
next_seq: 1,
|
||||||
|
events: VecDeque::with_capacity(bounded),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn record(&self, event_type: &str, context: impl Into<String>) {
|
||||||
|
let now_epoch_secs = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_secs();
|
||||||
|
let mut context = context.into();
|
||||||
|
if context.len() > 256 {
|
||||||
|
context.truncate(256);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut guard = self.inner.lock().expect("api event store mutex poisoned");
|
||||||
|
if guard.events.len() == guard.capacity {
|
||||||
|
guard.events.pop_front();
|
||||||
|
guard.dropped_total = guard.dropped_total.saturating_add(1);
|
||||||
|
}
|
||||||
|
let seq = guard.next_seq;
|
||||||
|
guard.next_seq = guard.next_seq.saturating_add(1);
|
||||||
|
guard.events.push_back(ApiEventRecord {
|
||||||
|
seq,
|
||||||
|
ts_epoch_secs: now_epoch_secs,
|
||||||
|
event_type: event_type.to_string(),
|
||||||
|
context,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn snapshot(&self, limit: usize) -> ApiEventSnapshot {
|
||||||
|
let guard = self.inner.lock().expect("api event store mutex poisoned");
|
||||||
|
let bounded_limit = limit.clamp(1, guard.capacity.max(1));
|
||||||
|
let mut items: Vec<ApiEventRecord> = guard
|
||||||
|
.events
|
||||||
|
.iter()
|
||||||
|
.rev()
|
||||||
|
.take(bounded_limit)
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
items.reverse();
|
||||||
|
|
||||||
|
ApiEventSnapshot {
|
||||||
|
capacity: guard.capacity,
|
||||||
|
dropped_total: guard.dropped_total,
|
||||||
|
events: items,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
262
src/api/mod.rs
262
src/api/mod.rs
|
|
@ -3,16 +3,13 @@ use std::net::{IpAddr, SocketAddr};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
|
||||||
|
|
||||||
use http_body_util::{BodyExt, Full};
|
use http_body_util::Full;
|
||||||
use hyper::body::{Bytes, Incoming};
|
use hyper::body::{Bytes, Incoming};
|
||||||
use hyper::header::AUTHORIZATION;
|
use hyper::header::AUTHORIZATION;
|
||||||
use hyper::server::conn::http1;
|
use hyper::server::conn::http1;
|
||||||
use hyper::service::service_fn;
|
use hyper::service::service_fn;
|
||||||
use hyper::{Method, Request, Response, StatusCode};
|
use hyper::{Method, Request, Response, StatusCode};
|
||||||
use serde::Serialize;
|
|
||||||
use serde::de::DeserializeOwned;
|
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
use tokio::sync::{Mutex, watch};
|
use tokio::sync::{Mutex, watch};
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
|
@ -24,15 +21,29 @@ use crate::transport::middle_proxy::MePool;
|
||||||
use crate::transport::UpstreamManager;
|
use crate::transport::UpstreamManager;
|
||||||
|
|
||||||
mod config_store;
|
mod config_store;
|
||||||
|
mod events;
|
||||||
|
mod http_utils;
|
||||||
mod model;
|
mod model;
|
||||||
|
mod runtime_edge;
|
||||||
|
mod runtime_min;
|
||||||
mod runtime_stats;
|
mod runtime_stats;
|
||||||
|
mod runtime_watch;
|
||||||
mod runtime_zero;
|
mod runtime_zero;
|
||||||
mod users;
|
mod users;
|
||||||
|
|
||||||
use config_store::{current_revision, parse_if_match};
|
use config_store::{current_revision, parse_if_match};
|
||||||
|
use http_utils::{error_response, read_json, read_optional_json, success_response};
|
||||||
|
use events::ApiEventStore;
|
||||||
use model::{
|
use model::{
|
||||||
ApiFailure, CreateUserRequest, ErrorBody, ErrorResponse, HealthData, PatchUserRequest,
|
ApiFailure, CreateUserRequest, HealthData, PatchUserRequest, RotateSecretRequest, SummaryData,
|
||||||
RotateSecretRequest, SuccessResponse, SummaryData,
|
};
|
||||||
|
use runtime_edge::{
|
||||||
|
EdgeConnectionsCacheEntry, build_runtime_connections_summary_data,
|
||||||
|
build_runtime_events_recent_data,
|
||||||
|
};
|
||||||
|
use runtime_min::{
|
||||||
|
build_runtime_me_pool_state_data, build_runtime_me_quality_data, build_runtime_nat_stun_data,
|
||||||
|
build_runtime_upstream_quality_data, build_security_whitelist_data,
|
||||||
};
|
};
|
||||||
use runtime_stats::{
|
use runtime_stats::{
|
||||||
MinimalCacheEntry, build_dcs_data, build_me_writers_data, build_minimal_all_data,
|
MinimalCacheEntry, build_dcs_data, build_me_writers_data, build_minimal_all_data,
|
||||||
|
|
@ -42,6 +53,7 @@ use runtime_zero::{
|
||||||
build_limits_effective_data, build_runtime_gates_data, build_security_posture_data,
|
build_limits_effective_data, build_runtime_gates_data, build_security_posture_data,
|
||||||
build_system_info_data,
|
build_system_info_data,
|
||||||
};
|
};
|
||||||
|
use runtime_watch::spawn_runtime_watchers;
|
||||||
use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config};
|
use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config};
|
||||||
|
|
||||||
pub(super) struct ApiRuntimeState {
|
pub(super) struct ApiRuntimeState {
|
||||||
|
|
@ -62,6 +74,9 @@ pub(super) struct ApiShared {
|
||||||
pub(super) startup_detected_ip_v6: Option<IpAddr>,
|
pub(super) startup_detected_ip_v6: Option<IpAddr>,
|
||||||
pub(super) mutation_lock: Arc<Mutex<()>>,
|
pub(super) mutation_lock: Arc<Mutex<()>>,
|
||||||
pub(super) minimal_cache: Arc<Mutex<Option<MinimalCacheEntry>>>,
|
pub(super) minimal_cache: Arc<Mutex<Option<MinimalCacheEntry>>>,
|
||||||
|
pub(super) runtime_edge_connections_cache: Arc<Mutex<Option<EdgeConnectionsCacheEntry>>>,
|
||||||
|
pub(super) runtime_edge_recompute_lock: Arc<Mutex<()>>,
|
||||||
|
pub(super) runtime_events: Arc<ApiEventStore>,
|
||||||
pub(super) request_id: Arc<AtomicU64>,
|
pub(super) request_id: Arc<AtomicU64>,
|
||||||
pub(super) runtime_state: Arc<ApiRuntimeState>,
|
pub(super) runtime_state: Arc<ApiRuntimeState>,
|
||||||
}
|
}
|
||||||
|
|
@ -116,40 +131,21 @@ pub async fn serve(
|
||||||
startup_detected_ip_v6,
|
startup_detected_ip_v6,
|
||||||
mutation_lock: Arc::new(Mutex::new(())),
|
mutation_lock: Arc::new(Mutex::new(())),
|
||||||
minimal_cache: Arc::new(Mutex::new(None)),
|
minimal_cache: Arc::new(Mutex::new(None)),
|
||||||
|
runtime_edge_connections_cache: Arc::new(Mutex::new(None)),
|
||||||
|
runtime_edge_recompute_lock: Arc::new(Mutex::new(())),
|
||||||
|
runtime_events: Arc::new(ApiEventStore::new(
|
||||||
|
config_rx.borrow().server.api.runtime_edge_events_capacity,
|
||||||
|
)),
|
||||||
request_id: Arc::new(AtomicU64::new(1)),
|
request_id: Arc::new(AtomicU64::new(1)),
|
||||||
runtime_state: runtime_state.clone(),
|
runtime_state: runtime_state.clone(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut config_rx_reload = config_rx.clone();
|
spawn_runtime_watchers(
|
||||||
let runtime_state_reload = runtime_state.clone();
|
config_rx.clone(),
|
||||||
tokio::spawn(async move {
|
admission_rx.clone(),
|
||||||
loop {
|
runtime_state.clone(),
|
||||||
if config_rx_reload.changed().await.is_err() {
|
shared.runtime_events.clone(),
|
||||||
break;
|
);
|
||||||
}
|
|
||||||
runtime_state_reload
|
|
||||||
.config_reload_count
|
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
|
||||||
runtime_state_reload
|
|
||||||
.last_config_reload_epoch_secs
|
|
||||||
.store(now_epoch_secs(), Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut admission_rx_watch = admission_rx.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
runtime_state
|
|
||||||
.admission_open
|
|
||||||
.store(*admission_rx_watch.borrow(), Ordering::Relaxed);
|
|
||||||
loop {
|
|
||||||
if admission_rx_watch.changed().await.is_err() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
runtime_state
|
|
||||||
.admission_open
|
|
||||||
.store(*admission_rx_watch.borrow(), Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (stream, peer) = match listener.accept().await {
|
let (stream, peer) = match listener.accept().await {
|
||||||
|
|
@ -232,6 +228,7 @@ async fn handle(
|
||||||
|
|
||||||
let method = req.method().clone();
|
let method = req.method().clone();
|
||||||
let path = req.uri().path().to_string();
|
let path = req.uri().path().to_string();
|
||||||
|
let query = req.uri().query().map(str::to_string);
|
||||||
let body_limit = api_cfg.request_body_limit_bytes;
|
let body_limit = api_cfg.request_body_limit_bytes;
|
||||||
|
|
||||||
let result: Result<Response<Full<Bytes>>, ApiFailure> = async {
|
let result: Result<Response<Full<Bytes>>, ApiFailure> = async {
|
||||||
|
|
@ -264,6 +261,11 @@ async fn handle(
|
||||||
let data = build_security_posture_data(cfg.as_ref());
|
let data = build_security_posture_data(cfg.as_ref());
|
||||||
Ok(success_response(StatusCode::OK, data, revision))
|
Ok(success_response(StatusCode::OK, data, revision))
|
||||||
}
|
}
|
||||||
|
("GET", "/v1/security/whitelist") => {
|
||||||
|
let revision = current_revision(&shared.config_path).await?;
|
||||||
|
let data = build_security_whitelist_data(cfg.as_ref());
|
||||||
|
Ok(success_response(StatusCode::OK, data, revision))
|
||||||
|
}
|
||||||
("GET", "/v1/stats/summary") => {
|
("GET", "/v1/stats/summary") => {
|
||||||
let revision = current_revision(&shared.config_path).await?;
|
let revision = current_revision(&shared.config_path).await?;
|
||||||
let data = SummaryData {
|
let data = SummaryData {
|
||||||
|
|
@ -300,6 +302,40 @@ async fn handle(
|
||||||
let data = build_dcs_data(shared.as_ref(), api_cfg).await;
|
let data = build_dcs_data(shared.as_ref(), api_cfg).await;
|
||||||
Ok(success_response(StatusCode::OK, data, revision))
|
Ok(success_response(StatusCode::OK, data, revision))
|
||||||
}
|
}
|
||||||
|
("GET", "/v1/runtime/me_pool_state") => {
|
||||||
|
let revision = current_revision(&shared.config_path).await?;
|
||||||
|
let data = build_runtime_me_pool_state_data(shared.as_ref()).await;
|
||||||
|
Ok(success_response(StatusCode::OK, data, revision))
|
||||||
|
}
|
||||||
|
("GET", "/v1/runtime/me_quality") => {
|
||||||
|
let revision = current_revision(&shared.config_path).await?;
|
||||||
|
let data = build_runtime_me_quality_data(shared.as_ref()).await;
|
||||||
|
Ok(success_response(StatusCode::OK, data, revision))
|
||||||
|
}
|
||||||
|
("GET", "/v1/runtime/upstream_quality") => {
|
||||||
|
let revision = current_revision(&shared.config_path).await?;
|
||||||
|
let data = build_runtime_upstream_quality_data(shared.as_ref()).await;
|
||||||
|
Ok(success_response(StatusCode::OK, data, revision))
|
||||||
|
}
|
||||||
|
("GET", "/v1/runtime/nat_stun") => {
|
||||||
|
let revision = current_revision(&shared.config_path).await?;
|
||||||
|
let data = build_runtime_nat_stun_data(shared.as_ref()).await;
|
||||||
|
Ok(success_response(StatusCode::OK, data, revision))
|
||||||
|
}
|
||||||
|
("GET", "/v1/runtime/connections/summary") => {
|
||||||
|
let revision = current_revision(&shared.config_path).await?;
|
||||||
|
let data = build_runtime_connections_summary_data(shared.as_ref(), cfg.as_ref()).await;
|
||||||
|
Ok(success_response(StatusCode::OK, data, revision))
|
||||||
|
}
|
||||||
|
("GET", "/v1/runtime/events/recent") => {
|
||||||
|
let revision = current_revision(&shared.config_path).await?;
|
||||||
|
let data = build_runtime_events_recent_data(
|
||||||
|
shared.as_ref(),
|
||||||
|
cfg.as_ref(),
|
||||||
|
query.as_deref(),
|
||||||
|
);
|
||||||
|
Ok(success_response(StatusCode::OK, data, revision))
|
||||||
|
}
|
||||||
("GET", "/v1/stats/users") | ("GET", "/v1/users") => {
|
("GET", "/v1/stats/users") | ("GET", "/v1/users") => {
|
||||||
let revision = current_revision(&shared.config_path).await?;
|
let revision = current_revision(&shared.config_path).await?;
|
||||||
let users = users_from_config(
|
let users = users_from_config(
|
||||||
|
|
@ -325,7 +361,17 @@ async fn handle(
|
||||||
}
|
}
|
||||||
let expected_revision = parse_if_match(req.headers());
|
let expected_revision = parse_if_match(req.headers());
|
||||||
let body = read_json::<CreateUserRequest>(req.into_body(), body_limit).await?;
|
let body = read_json::<CreateUserRequest>(req.into_body(), body_limit).await?;
|
||||||
let (data, revision) = create_user(body, expected_revision, &shared).await?;
|
let result = create_user(body, expected_revision, &shared).await;
|
||||||
|
let (data, revision) = match result {
|
||||||
|
Ok(ok) => ok,
|
||||||
|
Err(error) => {
|
||||||
|
shared.runtime_events.record("api.user.create.failed", error.code);
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
shared
|
||||||
|
.runtime_events
|
||||||
|
.record("api.user.create.ok", format!("username={}", data.user.username));
|
||||||
Ok(success_response(StatusCode::CREATED, data, revision))
|
Ok(success_response(StatusCode::CREATED, data, revision))
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
|
|
@ -365,8 +411,20 @@ async fn handle(
|
||||||
}
|
}
|
||||||
let expected_revision = parse_if_match(req.headers());
|
let expected_revision = parse_if_match(req.headers());
|
||||||
let body = read_json::<PatchUserRequest>(req.into_body(), body_limit).await?;
|
let body = read_json::<PatchUserRequest>(req.into_body(), body_limit).await?;
|
||||||
let (data, revision) =
|
let result = patch_user(user, body, expected_revision, &shared).await;
|
||||||
patch_user(user, body, expected_revision, &shared).await?;
|
let (data, revision) = match result {
|
||||||
|
Ok(ok) => ok,
|
||||||
|
Err(error) => {
|
||||||
|
shared.runtime_events.record(
|
||||||
|
"api.user.patch.failed",
|
||||||
|
format!("username={} code={}", user, error.code),
|
||||||
|
);
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
shared
|
||||||
|
.runtime_events
|
||||||
|
.record("api.user.patch.ok", format!("username={}", data.username));
|
||||||
return Ok(success_response(StatusCode::OK, data, revision));
|
return Ok(success_response(StatusCode::OK, data, revision));
|
||||||
}
|
}
|
||||||
if method == Method::DELETE {
|
if method == Method::DELETE {
|
||||||
|
|
@ -381,8 +439,21 @@ async fn handle(
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
let expected_revision = parse_if_match(req.headers());
|
let expected_revision = parse_if_match(req.headers());
|
||||||
let (deleted_user, revision) =
|
let result = delete_user(user, expected_revision, &shared).await;
|
||||||
delete_user(user, expected_revision, &shared).await?;
|
let (deleted_user, revision) = match result {
|
||||||
|
Ok(ok) => ok,
|
||||||
|
Err(error) => {
|
||||||
|
shared.runtime_events.record(
|
||||||
|
"api.user.delete.failed",
|
||||||
|
format!("username={} code={}", user, error.code),
|
||||||
|
);
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
shared.runtime_events.record(
|
||||||
|
"api.user.delete.ok",
|
||||||
|
format!("username={}", deleted_user),
|
||||||
|
);
|
||||||
return Ok(success_response(StatusCode::OK, deleted_user, revision));
|
return Ok(success_response(StatusCode::OK, deleted_user, revision));
|
||||||
}
|
}
|
||||||
if method == Method::POST
|
if method == Method::POST
|
||||||
|
|
@ -404,9 +475,27 @@ async fn handle(
|
||||||
let body =
|
let body =
|
||||||
read_optional_json::<RotateSecretRequest>(req.into_body(), body_limit)
|
read_optional_json::<RotateSecretRequest>(req.into_body(), body_limit)
|
||||||
.await?;
|
.await?;
|
||||||
let (data, revision) =
|
let result = rotate_secret(
|
||||||
rotate_secret(base_user, body.unwrap_or_default(), expected_revision, &shared)
|
base_user,
|
||||||
.await?;
|
body.unwrap_or_default(),
|
||||||
|
expected_revision,
|
||||||
|
&shared,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let (data, revision) = match result {
|
||||||
|
Ok(ok) => ok,
|
||||||
|
Err(error) => {
|
||||||
|
shared.runtime_events.record(
|
||||||
|
"api.user.rotate_secret.failed",
|
||||||
|
format!("username={} code={}", base_user, error.code),
|
||||||
|
);
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
shared.runtime_events.record(
|
||||||
|
"api.user.rotate_secret.ok",
|
||||||
|
format!("username={}", base_user),
|
||||||
|
);
|
||||||
return Ok(success_response(StatusCode::OK, data, revision));
|
return Ok(success_response(StatusCode::OK, data, revision));
|
||||||
}
|
}
|
||||||
if method == Method::POST {
|
if method == Method::POST {
|
||||||
|
|
@ -438,88 +527,3 @@ async fn handle(
|
||||||
Err(error) => Ok(error_response(request_id, error)),
|
Err(error) => Ok(error_response(request_id, error)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn success_response<T: Serialize>(
|
|
||||||
status: StatusCode,
|
|
||||||
data: T,
|
|
||||||
revision: String,
|
|
||||||
) -> Response<Full<Bytes>> {
|
|
||||||
let payload = SuccessResponse {
|
|
||||||
ok: true,
|
|
||||||
data,
|
|
||||||
revision,
|
|
||||||
};
|
|
||||||
let body = serde_json::to_vec(&payload).unwrap_or_else(|_| b"{\"ok\":false}".to_vec());
|
|
||||||
Response::builder()
|
|
||||||
.status(status)
|
|
||||||
.header("content-type", "application/json; charset=utf-8")
|
|
||||||
.body(Full::new(Bytes::from(body)))
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn error_response(request_id: u64, failure: ApiFailure) -> Response<Full<Bytes>> {
|
|
||||||
let payload = ErrorResponse {
|
|
||||||
ok: false,
|
|
||||||
error: ErrorBody {
|
|
||||||
code: failure.code,
|
|
||||||
message: failure.message,
|
|
||||||
},
|
|
||||||
request_id,
|
|
||||||
};
|
|
||||||
let body = serde_json::to_vec(&payload).unwrap_or_else(|_| {
|
|
||||||
format!(
|
|
||||||
"{{\"ok\":false,\"error\":{{\"code\":\"internal_error\",\"message\":\"serialization failed\"}},\"request_id\":{}}}",
|
|
||||||
request_id
|
|
||||||
)
|
|
||||||
.into_bytes()
|
|
||||||
});
|
|
||||||
Response::builder()
|
|
||||||
.status(failure.status)
|
|
||||||
.header("content-type", "application/json; charset=utf-8")
|
|
||||||
.body(Full::new(Bytes::from(body)))
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn read_json<T: DeserializeOwned>(body: Incoming, limit: usize) -> Result<T, ApiFailure> {
|
|
||||||
let bytes = read_body_with_limit(body, limit).await?;
|
|
||||||
serde_json::from_slice(&bytes).map_err(|_| ApiFailure::bad_request("Invalid JSON body"))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn read_optional_json<T: DeserializeOwned>(
|
|
||||||
body: Incoming,
|
|
||||||
limit: usize,
|
|
||||||
) -> Result<Option<T>, ApiFailure> {
|
|
||||||
let bytes = read_body_with_limit(body, limit).await?;
|
|
||||||
if bytes.is_empty() {
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
serde_json::from_slice(&bytes)
|
|
||||||
.map(Some)
|
|
||||||
.map_err(|_| ApiFailure::bad_request("Invalid JSON body"))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn read_body_with_limit(body: Incoming, limit: usize) -> Result<Vec<u8>, ApiFailure> {
|
|
||||||
let mut collected = Vec::new();
|
|
||||||
let mut body = body;
|
|
||||||
while let Some(frame_result) = body.frame().await {
|
|
||||||
let frame = frame_result.map_err(|_| ApiFailure::bad_request("Invalid request body"))?;
|
|
||||||
if let Some(chunk) = frame.data_ref() {
|
|
||||||
if collected.len().saturating_add(chunk.len()) > limit {
|
|
||||||
return Err(ApiFailure::new(
|
|
||||||
StatusCode::PAYLOAD_TOO_LARGE,
|
|
||||||
"payload_too_large",
|
|
||||||
format!("Body exceeds {} bytes", limit),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
collected.extend_from_slice(chunk);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(collected)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn now_epoch_secs() -> u64 {
|
|
||||||
SystemTime::now()
|
|
||||||
.duration_since(UNIX_EPOCH)
|
|
||||||
.unwrap_or_default()
|
|
||||||
.as_secs()
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue