mirror of
https://github.com/telemt/telemt.git
synced 2026-06-12 22:11:43 +03:00
423 lines
13 KiB
Rust
423 lines
13 KiB
Rust
use std::cmp::Reverse;
|
|
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
|
|
|
use serde::Serialize;
|
|
|
|
use crate::config::ProxyConfig;
|
|
|
|
use super::ApiShared;
|
|
use super::events::ApiEventRecord;
|
|
|
|
const FEATURE_DISABLED_REASON: &str = "feature_disabled";
|
|
const SOURCE_UNAVAILABLE_REASON: &str = "source_unavailable";
|
|
const EVENTS_DEFAULT_LIMIT: usize = 50;
|
|
const EVENTS_MAX_LIMIT: usize = 1000;
|
|
const TLS_FINGERPRINTS_MAX_LIMIT: usize = 1000;
|
|
const RUNTIME_EDGE_RETENTION_MAX_MINUTES: u64 = 24 * 60;
|
|
|
|
#[derive(Clone, Serialize)]
|
|
pub(super) struct RuntimeEdgeConnectionUserData {
|
|
pub(super) username: String,
|
|
pub(super) current_connections: u64,
|
|
pub(super) total_octets: u64,
|
|
}
|
|
|
|
#[derive(Clone, Serialize)]
|
|
pub(super) struct RuntimeEdgeConnectionTotalsData {
|
|
pub(super) current_connections: u64,
|
|
pub(super) current_connections_me: u64,
|
|
pub(super) current_connections_direct: u64,
|
|
pub(super) active_users: usize,
|
|
}
|
|
|
|
#[derive(Clone, Serialize)]
|
|
pub(super) struct RuntimeEdgeConnectionTopData {
|
|
pub(super) limit: usize,
|
|
pub(super) by_connections: Vec<RuntimeEdgeConnectionUserData>,
|
|
pub(super) by_throughput: Vec<RuntimeEdgeConnectionUserData>,
|
|
}
|
|
|
|
#[derive(Clone, Serialize)]
|
|
pub(super) struct RuntimeEdgeConnectionCacheData {
|
|
pub(super) ttl_ms: u64,
|
|
pub(super) served_from_cache: bool,
|
|
pub(super) stale_cache_used: bool,
|
|
}
|
|
|
|
#[derive(Clone, Serialize)]
|
|
pub(super) struct RuntimeEdgeConnectionTelemetryData {
|
|
pub(super) user_enabled: bool,
|
|
pub(super) throughput_is_cumulative: bool,
|
|
}
|
|
|
|
#[derive(Clone, Serialize)]
|
|
pub(super) struct RuntimeEdgeConnectionsSummaryPayload {
|
|
pub(super) cache: RuntimeEdgeConnectionCacheData,
|
|
pub(super) totals: RuntimeEdgeConnectionTotalsData,
|
|
pub(super) top: RuntimeEdgeConnectionTopData,
|
|
pub(super) telemetry: RuntimeEdgeConnectionTelemetryData,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub(super) struct RuntimeEdgeConnectionsSummaryData {
|
|
pub(super) enabled: bool,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub(super) reason: Option<&'static str>,
|
|
pub(super) generated_at_epoch_secs: u64,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub(super) data: Option<RuntimeEdgeConnectionsSummaryPayload>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub(crate) struct EdgeConnectionsCacheEntry {
|
|
pub(super) expires_at: Instant,
|
|
pub(super) payload: RuntimeEdgeConnectionsSummaryPayload,
|
|
pub(super) generated_at_epoch_secs: u64,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub(super) struct RuntimeEdgeEventsPayload {
|
|
pub(super) capacity: usize,
|
|
pub(super) dropped_total: u64,
|
|
pub(super) events: Vec<ApiEventRecord>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub(super) struct RuntimeEdgeEventsData {
|
|
pub(super) enabled: bool,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub(super) reason: Option<&'static str>,
|
|
pub(super) generated_at_epoch_secs: u64,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub(super) data: Option<RuntimeEdgeEventsPayload>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub(super) struct RuntimeEdgeTlsFingerprintRow {
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub(super) scope: Option<String>,
|
|
pub(super) ja3: String,
|
|
pub(super) ja3_raw: String,
|
|
pub(super) ja4: String,
|
|
pub(super) ja4_raw: String,
|
|
pub(super) total: u64,
|
|
pub(super) auth_success: u64,
|
|
pub(super) bad_or_probe: u64,
|
|
pub(super) first_seen_epoch_secs: u64,
|
|
pub(super) last_seen_epoch_secs: u64,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub(super) struct RuntimeEdgeTlsFingerprintsPayload {
|
|
pub(super) limit: usize,
|
|
pub(super) retention_secs: u64,
|
|
pub(super) capacity: usize,
|
|
pub(super) dropped_total: u64,
|
|
pub(super) parse_error_total: u64,
|
|
pub(super) by_fingerprint: Vec<RuntimeEdgeTlsFingerprintRow>,
|
|
pub(super) by_ip: Vec<RuntimeEdgeTlsFingerprintRow>,
|
|
pub(super) by_cidr: Vec<RuntimeEdgeTlsFingerprintRow>,
|
|
pub(super) by_user: Vec<RuntimeEdgeTlsFingerprintRow>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub(super) struct RuntimeEdgeTlsFingerprintsData {
|
|
pub(super) enabled: bool,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub(super) reason: Option<&'static str>,
|
|
pub(super) generated_at_epoch_secs: u64,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub(super) data: Option<RuntimeEdgeTlsFingerprintsPayload>,
|
|
}
|
|
|
|
pub(super) async fn build_runtime_connections_summary_data(
|
|
shared: &ApiShared,
|
|
cfg: &ProxyConfig,
|
|
) -> RuntimeEdgeConnectionsSummaryData {
|
|
let now_epoch_secs = now_epoch_secs();
|
|
let api_cfg = &cfg.server.api;
|
|
if !api_cfg.runtime_edge_enabled {
|
|
return RuntimeEdgeConnectionsSummaryData {
|
|
enabled: false,
|
|
reason: Some(FEATURE_DISABLED_REASON),
|
|
generated_at_epoch_secs: now_epoch_secs,
|
|
data: None,
|
|
};
|
|
}
|
|
|
|
let (generated_at_epoch_secs, payload) = match get_connections_payload_cached(
|
|
shared,
|
|
api_cfg.runtime_edge_cache_ttl_ms,
|
|
api_cfg.runtime_edge_top_n,
|
|
)
|
|
.await
|
|
{
|
|
Some(v) => v,
|
|
None => {
|
|
return RuntimeEdgeConnectionsSummaryData {
|
|
enabled: true,
|
|
reason: Some(SOURCE_UNAVAILABLE_REASON),
|
|
generated_at_epoch_secs: now_epoch_secs,
|
|
data: None,
|
|
};
|
|
}
|
|
};
|
|
|
|
RuntimeEdgeConnectionsSummaryData {
|
|
enabled: true,
|
|
reason: None,
|
|
generated_at_epoch_secs,
|
|
data: Some(payload),
|
|
}
|
|
}
|
|
|
|
pub(super) fn build_runtime_events_recent_data(
|
|
shared: &ApiShared,
|
|
cfg: &ProxyConfig,
|
|
query: Option<&str>,
|
|
) -> RuntimeEdgeEventsData {
|
|
let now_epoch_secs = now_epoch_secs();
|
|
let api_cfg = &cfg.server.api;
|
|
if !api_cfg.runtime_edge_enabled {
|
|
return RuntimeEdgeEventsData {
|
|
enabled: false,
|
|
reason: Some(FEATURE_DISABLED_REASON),
|
|
generated_at_epoch_secs: now_epoch_secs,
|
|
data: None,
|
|
};
|
|
}
|
|
|
|
let limit = parse_recent_events_limit(query, EVENTS_DEFAULT_LIMIT, EVENTS_MAX_LIMIT);
|
|
let snapshot = shared.runtime_events.snapshot(limit);
|
|
|
|
RuntimeEdgeEventsData {
|
|
enabled: true,
|
|
reason: None,
|
|
generated_at_epoch_secs: now_epoch_secs,
|
|
data: Some(RuntimeEdgeEventsPayload {
|
|
capacity: snapshot.capacity,
|
|
dropped_total: snapshot.dropped_total,
|
|
events: snapshot.events,
|
|
}),
|
|
}
|
|
}
|
|
|
|
pub(super) fn build_runtime_tls_fingerprints_data(
|
|
shared: &ApiShared,
|
|
cfg: &ProxyConfig,
|
|
query: Option<&str>,
|
|
) -> RuntimeEdgeTlsFingerprintsData {
|
|
let now_epoch_secs = now_epoch_secs();
|
|
let api_cfg = &cfg.server.api;
|
|
if !api_cfg.runtime_edge_enabled {
|
|
return RuntimeEdgeTlsFingerprintsData {
|
|
enabled: false,
|
|
reason: Some(FEATURE_DISABLED_REASON),
|
|
generated_at_epoch_secs: now_epoch_secs,
|
|
data: None,
|
|
};
|
|
}
|
|
|
|
let limit = parse_recent_events_limit(
|
|
query,
|
|
api_cfg.runtime_edge_top_n.max(1),
|
|
TLS_FINGERPRINTS_MAX_LIMIT,
|
|
);
|
|
let snapshot = shared
|
|
.stats
|
|
.tls_fingerprint_snapshot(runtime_edge_retention(cfg), limit);
|
|
|
|
RuntimeEdgeTlsFingerprintsData {
|
|
enabled: true,
|
|
reason: None,
|
|
generated_at_epoch_secs: now_epoch_secs,
|
|
data: Some(RuntimeEdgeTlsFingerprintsPayload {
|
|
limit,
|
|
retention_secs: snapshot.retention_secs,
|
|
capacity: snapshot.capacity,
|
|
dropped_total: snapshot.dropped_total,
|
|
parse_error_total: snapshot.parse_error_total,
|
|
by_fingerprint: snapshot
|
|
.by_fingerprint
|
|
.into_iter()
|
|
.map(runtime_tls_fingerprint_row)
|
|
.collect(),
|
|
by_ip: snapshot
|
|
.by_ip
|
|
.into_iter()
|
|
.map(runtime_tls_fingerprint_row)
|
|
.collect(),
|
|
by_cidr: snapshot
|
|
.by_cidr
|
|
.into_iter()
|
|
.map(runtime_tls_fingerprint_row)
|
|
.collect(),
|
|
by_user: snapshot
|
|
.by_user
|
|
.into_iter()
|
|
.map(runtime_tls_fingerprint_row)
|
|
.collect(),
|
|
}),
|
|
}
|
|
}
|
|
|
|
async fn get_connections_payload_cached(
|
|
shared: &ApiShared,
|
|
cache_ttl_ms: u64,
|
|
top_n: usize,
|
|
) -> Option<(u64, RuntimeEdgeConnectionsSummaryPayload)> {
|
|
if cache_ttl_ms > 0 {
|
|
let now = Instant::now();
|
|
let cached = shared.runtime_edge_connections_cache.lock().await.clone();
|
|
if let Some(entry) = cached
|
|
&& now < entry.expires_at
|
|
{
|
|
let mut payload = entry.payload;
|
|
payload.cache.served_from_cache = true;
|
|
payload.cache.stale_cache_used = false;
|
|
return Some((entry.generated_at_epoch_secs, payload));
|
|
}
|
|
}
|
|
|
|
let Ok(_guard) = shared.runtime_edge_recompute_lock.try_lock() else {
|
|
let cached = shared.runtime_edge_connections_cache.lock().await.clone();
|
|
if let Some(entry) = cached {
|
|
let mut payload = entry.payload;
|
|
payload.cache.served_from_cache = true;
|
|
payload.cache.stale_cache_used = true;
|
|
return Some((entry.generated_at_epoch_secs, payload));
|
|
}
|
|
return None;
|
|
};
|
|
|
|
let generated_at_epoch_secs = now_epoch_secs();
|
|
let payload = recompute_connections_payload(shared, cache_ttl_ms, top_n).await;
|
|
|
|
if cache_ttl_ms > 0 {
|
|
let entry = EdgeConnectionsCacheEntry {
|
|
expires_at: Instant::now() + Duration::from_millis(cache_ttl_ms),
|
|
payload: payload.clone(),
|
|
generated_at_epoch_secs,
|
|
};
|
|
*shared.runtime_edge_connections_cache.lock().await = Some(entry);
|
|
}
|
|
|
|
Some((generated_at_epoch_secs, payload))
|
|
}
|
|
|
|
async fn recompute_connections_payload(
|
|
shared: &ApiShared,
|
|
cache_ttl_ms: u64,
|
|
top_n: usize,
|
|
) -> RuntimeEdgeConnectionsSummaryPayload {
|
|
let mut rows = Vec::<RuntimeEdgeConnectionUserData>::new();
|
|
let mut active_users = 0usize;
|
|
for entry in shared.stats.iter_user_stats() {
|
|
let user_stats = entry.value();
|
|
let current_connections = user_stats
|
|
.curr_connects
|
|
.load(std::sync::atomic::Ordering::Relaxed);
|
|
let total_octets = user_stats
|
|
.octets_from_client
|
|
.load(std::sync::atomic::Ordering::Relaxed)
|
|
.saturating_add(
|
|
user_stats
|
|
.octets_to_client
|
|
.load(std::sync::atomic::Ordering::Relaxed),
|
|
);
|
|
if current_connections > 0 {
|
|
active_users = active_users.saturating_add(1);
|
|
}
|
|
rows.push(RuntimeEdgeConnectionUserData {
|
|
username: entry.key().clone(),
|
|
current_connections,
|
|
total_octets,
|
|
});
|
|
}
|
|
|
|
let limit = top_n.max(1);
|
|
let mut by_connections = rows.clone();
|
|
by_connections.sort_by_key(|row| (Reverse(row.current_connections), row.username.clone()));
|
|
by_connections.truncate(limit);
|
|
|
|
let mut by_throughput = rows;
|
|
by_throughput.sort_by_key(|row| (Reverse(row.total_octets), row.username.clone()));
|
|
by_throughput.truncate(limit);
|
|
|
|
let telemetry = shared.stats.telemetry_policy();
|
|
RuntimeEdgeConnectionsSummaryPayload {
|
|
cache: RuntimeEdgeConnectionCacheData {
|
|
ttl_ms: cache_ttl_ms,
|
|
served_from_cache: false,
|
|
stale_cache_used: false,
|
|
},
|
|
totals: RuntimeEdgeConnectionTotalsData {
|
|
current_connections: shared.stats.get_current_connections_total(),
|
|
current_connections_me: shared.stats.get_current_connections_me(),
|
|
current_connections_direct: shared.stats.get_current_connections_direct(),
|
|
active_users,
|
|
},
|
|
top: RuntimeEdgeConnectionTopData {
|
|
limit,
|
|
by_connections,
|
|
by_throughput,
|
|
},
|
|
telemetry: RuntimeEdgeConnectionTelemetryData {
|
|
user_enabled: telemetry.user_enabled,
|
|
throughput_is_cumulative: true,
|
|
},
|
|
}
|
|
}
|
|
|
|
fn parse_recent_events_limit(query: Option<&str>, default_limit: usize, max_limit: usize) -> usize {
|
|
let Some(query) = query else {
|
|
return default_limit;
|
|
};
|
|
for pair in query.split('&') {
|
|
let mut split = pair.splitn(2, '=');
|
|
if split.next() == Some("limit")
|
|
&& let Some(raw) = split.next()
|
|
&& let Ok(parsed) = raw.parse::<usize>()
|
|
{
|
|
return parsed.clamp(1, max_limit);
|
|
}
|
|
}
|
|
default_limit
|
|
}
|
|
|
|
fn runtime_edge_retention(cfg: &ProxyConfig) -> Duration {
|
|
let minutes = cfg
|
|
.general
|
|
.beobachten_minutes
|
|
.clamp(1, RUNTIME_EDGE_RETENTION_MAX_MINUTES);
|
|
Duration::from_secs(minutes.saturating_mul(60))
|
|
}
|
|
|
|
fn runtime_tls_fingerprint_row(
|
|
row: crate::stats::TlsFingerprintSnapshotRow,
|
|
) -> RuntimeEdgeTlsFingerprintRow {
|
|
RuntimeEdgeTlsFingerprintRow {
|
|
scope: if row.scope_key.is_empty() {
|
|
None
|
|
} else {
|
|
Some(row.scope_key)
|
|
},
|
|
ja3: row.ja3,
|
|
ja3_raw: row.ja3_raw,
|
|
ja4: row.ja4,
|
|
ja4_raw: row.ja4_raw,
|
|
total: row.total,
|
|
auth_success: row.auth_success,
|
|
bad_or_probe: row.bad_or_probe,
|
|
first_seen_epoch_secs: row.first_seen_epoch_secs,
|
|
last_seen_epoch_secs: row.last_seen_epoch_secs,
|
|
}
|
|
}
|
|
|
|
fn now_epoch_secs() -> u64 {
|
|
SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.unwrap_or_default()
|
|
.as_secs()
|
|
}
|