mirror of https://github.com/telemt/telemt.git
Runtime API on Edge
This commit is contained in:
parent
2d98ebf3c3
commit
da89415961
|
|
@ -0,0 +1,294 @@
|
|||
use std::cmp::Reverse;
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::config::ProxyConfig;
|
||||
|
||||
use super::ApiShared;
|
||||
use super::events::ApiEventRecord;
|
||||
|
||||
const FEATURE_DISABLED_REASON: &str = "feature_disabled";
|
||||
const SOURCE_UNAVAILABLE_REASON: &str = "source_unavailable";
|
||||
const EVENTS_DEFAULT_LIMIT: usize = 50;
|
||||
const EVENTS_MAX_LIMIT: usize = 1000;
|
||||
|
||||
#[derive(Clone, Serialize)]
|
||||
pub(super) struct RuntimeEdgeConnectionUserData {
|
||||
pub(super) username: String,
|
||||
pub(super) current_connections: u64,
|
||||
pub(super) total_octets: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize)]
|
||||
pub(super) struct RuntimeEdgeConnectionTotalsData {
|
||||
pub(super) current_connections: u64,
|
||||
pub(super) current_connections_me: u64,
|
||||
pub(super) current_connections_direct: u64,
|
||||
pub(super) active_users: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize)]
|
||||
pub(super) struct RuntimeEdgeConnectionTopData {
|
||||
pub(super) limit: usize,
|
||||
pub(super) by_connections: Vec<RuntimeEdgeConnectionUserData>,
|
||||
pub(super) by_throughput: Vec<RuntimeEdgeConnectionUserData>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize)]
|
||||
pub(super) struct RuntimeEdgeConnectionCacheData {
|
||||
pub(super) ttl_ms: u64,
|
||||
pub(super) served_from_cache: bool,
|
||||
pub(super) stale_cache_used: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize)]
|
||||
pub(super) struct RuntimeEdgeConnectionTelemetryData {
|
||||
pub(super) user_enabled: bool,
|
||||
pub(super) throughput_is_cumulative: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize)]
|
||||
pub(super) struct RuntimeEdgeConnectionsSummaryPayload {
|
||||
pub(super) cache: RuntimeEdgeConnectionCacheData,
|
||||
pub(super) totals: RuntimeEdgeConnectionTotalsData,
|
||||
pub(super) top: RuntimeEdgeConnectionTopData,
|
||||
pub(super) telemetry: RuntimeEdgeConnectionTelemetryData,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(super) struct RuntimeEdgeConnectionsSummaryData {
|
||||
pub(super) enabled: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(super) reason: Option<&'static str>,
|
||||
pub(super) generated_at_epoch_secs: u64,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(super) data: Option<RuntimeEdgeConnectionsSummaryPayload>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct EdgeConnectionsCacheEntry {
|
||||
pub(super) expires_at: Instant,
|
||||
pub(super) payload: RuntimeEdgeConnectionsSummaryPayload,
|
||||
pub(super) generated_at_epoch_secs: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(super) struct RuntimeEdgeEventsPayload {
|
||||
pub(super) capacity: usize,
|
||||
pub(super) dropped_total: u64,
|
||||
pub(super) events: Vec<ApiEventRecord>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(super) struct RuntimeEdgeEventsData {
|
||||
pub(super) enabled: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(super) reason: Option<&'static str>,
|
||||
pub(super) generated_at_epoch_secs: u64,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(super) data: Option<RuntimeEdgeEventsPayload>,
|
||||
}
|
||||
|
||||
pub(super) async fn build_runtime_connections_summary_data(
|
||||
shared: &ApiShared,
|
||||
cfg: &ProxyConfig,
|
||||
) -> RuntimeEdgeConnectionsSummaryData {
|
||||
let now_epoch_secs = now_epoch_secs();
|
||||
let api_cfg = &cfg.server.api;
|
||||
if !api_cfg.runtime_edge_enabled {
|
||||
return RuntimeEdgeConnectionsSummaryData {
|
||||
enabled: false,
|
||||
reason: Some(FEATURE_DISABLED_REASON),
|
||||
generated_at_epoch_secs: now_epoch_secs,
|
||||
data: None,
|
||||
};
|
||||
}
|
||||
|
||||
let (generated_at_epoch_secs, payload) = match get_connections_payload_cached(
|
||||
shared,
|
||||
api_cfg.runtime_edge_cache_ttl_ms,
|
||||
api_cfg.runtime_edge_top_n,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Some(v) => v,
|
||||
None => {
|
||||
return RuntimeEdgeConnectionsSummaryData {
|
||||
enabled: true,
|
||||
reason: Some(SOURCE_UNAVAILABLE_REASON),
|
||||
generated_at_epoch_secs: now_epoch_secs,
|
||||
data: None,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
RuntimeEdgeConnectionsSummaryData {
|
||||
enabled: true,
|
||||
reason: None,
|
||||
generated_at_epoch_secs,
|
||||
data: Some(payload),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn build_runtime_events_recent_data(
|
||||
shared: &ApiShared,
|
||||
cfg: &ProxyConfig,
|
||||
query: Option<&str>,
|
||||
) -> RuntimeEdgeEventsData {
|
||||
let now_epoch_secs = now_epoch_secs();
|
||||
let api_cfg = &cfg.server.api;
|
||||
if !api_cfg.runtime_edge_enabled {
|
||||
return RuntimeEdgeEventsData {
|
||||
enabled: false,
|
||||
reason: Some(FEATURE_DISABLED_REASON),
|
||||
generated_at_epoch_secs: now_epoch_secs,
|
||||
data: None,
|
||||
};
|
||||
}
|
||||
|
||||
let limit = parse_recent_events_limit(query, EVENTS_DEFAULT_LIMIT, EVENTS_MAX_LIMIT);
|
||||
let snapshot = shared.runtime_events.snapshot(limit);
|
||||
|
||||
RuntimeEdgeEventsData {
|
||||
enabled: true,
|
||||
reason: None,
|
||||
generated_at_epoch_secs: now_epoch_secs,
|
||||
data: Some(RuntimeEdgeEventsPayload {
|
||||
capacity: snapshot.capacity,
|
||||
dropped_total: snapshot.dropped_total,
|
||||
events: snapshot.events,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_connections_payload_cached(
|
||||
shared: &ApiShared,
|
||||
cache_ttl_ms: u64,
|
||||
top_n: usize,
|
||||
) -> Option<(u64, RuntimeEdgeConnectionsSummaryPayload)> {
|
||||
if cache_ttl_ms > 0 {
|
||||
let now = Instant::now();
|
||||
let cached = shared.runtime_edge_connections_cache.lock().await.clone();
|
||||
if let Some(entry) = cached
|
||||
&& now < entry.expires_at
|
||||
{
|
||||
let mut payload = entry.payload;
|
||||
payload.cache.served_from_cache = true;
|
||||
payload.cache.stale_cache_used = false;
|
||||
return Some((entry.generated_at_epoch_secs, payload));
|
||||
}
|
||||
}
|
||||
|
||||
let Ok(_guard) = shared.runtime_edge_recompute_lock.try_lock() else {
|
||||
let cached = shared.runtime_edge_connections_cache.lock().await.clone();
|
||||
if let Some(entry) = cached {
|
||||
let mut payload = entry.payload;
|
||||
payload.cache.served_from_cache = true;
|
||||
payload.cache.stale_cache_used = true;
|
||||
return Some((entry.generated_at_epoch_secs, payload));
|
||||
}
|
||||
return None;
|
||||
};
|
||||
|
||||
let generated_at_epoch_secs = now_epoch_secs();
|
||||
let payload = recompute_connections_payload(shared, cache_ttl_ms, top_n).await;
|
||||
|
||||
if cache_ttl_ms > 0 {
|
||||
let entry = EdgeConnectionsCacheEntry {
|
||||
expires_at: Instant::now() + Duration::from_millis(cache_ttl_ms),
|
||||
payload: payload.clone(),
|
||||
generated_at_epoch_secs,
|
||||
};
|
||||
*shared.runtime_edge_connections_cache.lock().await = Some(entry);
|
||||
}
|
||||
|
||||
Some((generated_at_epoch_secs, payload))
|
||||
}
|
||||
|
||||
async fn recompute_connections_payload(
|
||||
shared: &ApiShared,
|
||||
cache_ttl_ms: u64,
|
||||
top_n: usize,
|
||||
) -> RuntimeEdgeConnectionsSummaryPayload {
|
||||
let mut rows = Vec::<RuntimeEdgeConnectionUserData>::new();
|
||||
let mut active_users = 0usize;
|
||||
for entry in shared.stats.iter_user_stats() {
|
||||
let user_stats = entry.value();
|
||||
let current_connections = user_stats
|
||||
.curr_connects
|
||||
.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let total_octets = user_stats
|
||||
.octets_from_client
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
.saturating_add(
|
||||
user_stats
|
||||
.octets_to_client
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
);
|
||||
if current_connections > 0 {
|
||||
active_users = active_users.saturating_add(1);
|
||||
}
|
||||
rows.push(RuntimeEdgeConnectionUserData {
|
||||
username: entry.key().clone(),
|
||||
current_connections,
|
||||
total_octets,
|
||||
});
|
||||
}
|
||||
|
||||
let limit = top_n.max(1);
|
||||
let mut by_connections = rows.clone();
|
||||
by_connections.sort_by_key(|row| (Reverse(row.current_connections), row.username.clone()));
|
||||
by_connections.truncate(limit);
|
||||
|
||||
let mut by_throughput = rows;
|
||||
by_throughput.sort_by_key(|row| (Reverse(row.total_octets), row.username.clone()));
|
||||
by_throughput.truncate(limit);
|
||||
|
||||
let telemetry = shared.stats.telemetry_policy();
|
||||
RuntimeEdgeConnectionsSummaryPayload {
|
||||
cache: RuntimeEdgeConnectionCacheData {
|
||||
ttl_ms: cache_ttl_ms,
|
||||
served_from_cache: false,
|
||||
stale_cache_used: false,
|
||||
},
|
||||
totals: RuntimeEdgeConnectionTotalsData {
|
||||
current_connections: shared.stats.get_current_connections_total(),
|
||||
current_connections_me: shared.stats.get_current_connections_me(),
|
||||
current_connections_direct: shared.stats.get_current_connections_direct(),
|
||||
active_users,
|
||||
},
|
||||
top: RuntimeEdgeConnectionTopData {
|
||||
limit,
|
||||
by_connections,
|
||||
by_throughput,
|
||||
},
|
||||
telemetry: RuntimeEdgeConnectionTelemetryData {
|
||||
user_enabled: telemetry.user_enabled,
|
||||
throughput_is_cumulative: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_recent_events_limit(query: Option<&str>, default_limit: usize, max_limit: usize) -> usize {
|
||||
let Some(query) = query else {
|
||||
return default_limit;
|
||||
};
|
||||
for pair in query.split('&') {
|
||||
let mut split = pair.splitn(2, '=');
|
||||
if split.next() == Some("limit")
|
||||
&& let Some(raw) = split.next()
|
||||
&& let Ok(parsed) = raw.parse::<usize>()
|
||||
{
|
||||
return parsed.clamp(1, max_limit);
|
||||
}
|
||||
}
|
||||
default_limit
|
||||
}
|
||||
|
||||
fn now_epoch_secs() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs()
|
||||
}
|
||||
Loading…
Reference in New Issue