mirror of
https://github.com/telemt/telemt.git
synced 2026-05-01 01:14:11 +03:00
Shard TLS full-cert budget tracking + Bound user-labeled metrics export cardinality
This commit is contained in:
@@ -22,6 +22,9 @@ use crate::tls_front::cache;
|
||||
use crate::tls_front::fetcher;
|
||||
use crate::transport::{ListenOptions, create_listener};
|
||||
|
||||
// Keeps `/metrics` response size bounded when per-user telemetry is enabled.
|
||||
const USER_LABELED_METRICS_MAX_USERS: usize = 4096;
|
||||
|
||||
pub async fn serve(
|
||||
port: u16,
|
||||
listen: Option<String>,
|
||||
@@ -313,6 +316,12 @@ async fn render_metrics(
|
||||
"telemt_telemetry_user_enabled {}",
|
||||
if user_enabled { 1 } else { 0 }
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_stats_user_entries Retained per-user stats entries"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_stats_user_entries gauge");
|
||||
let _ = writeln!(out, "telemt_stats_user_entries {}", stats.user_stats_len());
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
@@ -3071,17 +3080,6 @@ async fn render_metrics(
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_suppressed gauge");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_telemetry_user_series_suppressed {}",
|
||||
if user_enabled { 0 } else { 1 }
|
||||
);
|
||||
|
||||
let ip_memory = ip_tracker.memory_stats().await;
|
||||
let _ = writeln!(
|
||||
out,
|
||||
@@ -3154,10 +3152,20 @@ async fn render_metrics(
|
||||
ip_memory.recent_cap_rejects
|
||||
);
|
||||
|
||||
let mut user_stats_emitted = 0usize;
|
||||
let mut user_stats_suppressed = 0usize;
|
||||
let mut unique_ip_emitted = 0usize;
|
||||
let mut unique_ip_suppressed = 0usize;
|
||||
|
||||
if user_enabled {
|
||||
for entry in stats.iter_user_stats() {
|
||||
if user_stats_emitted >= USER_LABELED_METRICS_MAX_USERS {
|
||||
user_stats_suppressed = user_stats_suppressed.saturating_add(1);
|
||||
continue;
|
||||
}
|
||||
let user = entry.key();
|
||||
let s = entry.value();
|
||||
user_stats_emitted = user_stats_emitted.saturating_add(1);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_user_connections_total{{user=\"{}\"}} {}",
|
||||
@@ -3236,6 +3244,11 @@ async fn render_metrics(
|
||||
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge");
|
||||
|
||||
for user in unique_users {
|
||||
if unique_ip_emitted >= USER_LABELED_METRICS_MAX_USERS {
|
||||
unique_ip_suppressed = unique_ip_suppressed.saturating_add(1);
|
||||
continue;
|
||||
}
|
||||
unique_ip_emitted = unique_ip_emitted.saturating_add(1);
|
||||
let current = ip_counts.get(&user).copied().unwrap_or(0);
|
||||
let limit = config
|
||||
.access
|
||||
@@ -3275,6 +3288,46 @@ async fn render_metrics(
|
||||
}
|
||||
}
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_suppressed gauge");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_telemetry_user_series_suppressed {}",
|
||||
if user_enabled && user_stats_suppressed == 0 && unique_ip_suppressed == 0 {
|
||||
0
|
||||
} else {
|
||||
1
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_telemetry_user_series_users User-labeled metric users by export status"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_users gauge");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_telemetry_user_series_users{{family=\"stats\",status=\"emitted\"}} {}",
|
||||
user_stats_emitted
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_telemetry_user_series_users{{family=\"stats\",status=\"suppressed\"}} {}",
|
||||
user_stats_suppressed
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_telemetry_user_series_users{{family=\"unique_ip\",status=\"emitted\"}} {}",
|
||||
unique_ip_emitted
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_telemetry_user_series_users{{family=\"unique_ip\",status=\"suppressed\"}} {}",
|
||||
unique_ip_suppressed
|
||||
);
|
||||
|
||||
out
|
||||
}
|
||||
|
||||
@@ -3488,6 +3541,8 @@ mod tests {
|
||||
assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge"));
|
||||
assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge"));
|
||||
assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge"));
|
||||
assert!(output.contains("# TYPE telemt_stats_user_entries gauge"));
|
||||
assert!(output.contains("# TYPE telemt_telemetry_user_series_users gauge"));
|
||||
assert!(output.contains("# TYPE telemt_ip_tracker_users gauge"));
|
||||
assert!(output.contains("# TYPE telemt_ip_tracker_entries gauge"));
|
||||
assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_queue_len gauge"));
|
||||
|
||||
@@ -1432,8 +1432,8 @@ impl RunningClientHandler {
|
||||
|
||||
/// Main dispatch after successful handshake.
|
||||
/// Two modes:
|
||||
/// - Direct: TCP relay to TG DC (existing behavior)
|
||||
/// - Middle Proxy: RPC multiplex through ME pool (new — supports CDN DCs)
|
||||
/// - Direct: TCP relay to TG DC (existing behavior)
|
||||
/// - Middle Proxy: RPC multiplex through ME pool (supports CDN DCs)
|
||||
#[cfg(test)]
|
||||
async fn handle_authenticated_static<R, W>(
|
||||
client_reader: CryptoReader<R>,
|
||||
|
||||
@@ -2477,6 +2477,11 @@ impl Stats {
|
||||
self.user_stats.iter()
|
||||
}
|
||||
|
||||
/// Current number of retained per-user stats entries.
|
||||
pub fn user_stats_len(&self) -> usize {
|
||||
self.user_stats.len()
|
||||
}
|
||||
|
||||
pub fn uptime_secs(&self) -> f64 {
|
||||
self.start_time
|
||||
.read()
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::net::IpAddr;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
@@ -15,6 +17,7 @@ use crate::tls_front::types::{
|
||||
|
||||
const FULL_CERT_SENT_SWEEP_INTERVAL_SECS: u64 = 30;
|
||||
const FULL_CERT_SENT_MAX_IPS: usize = 65_536;
|
||||
const FULL_CERT_SENT_SHARDS: usize = 64;
|
||||
|
||||
static FULL_CERT_SENT_IPS_GAUGE: AtomicU64 = AtomicU64::new(0);
|
||||
static FULL_CERT_SENT_CAP_DROPS: AtomicU64 = AtomicU64::new(0);
|
||||
@@ -34,7 +37,7 @@ pub(crate) fn full_cert_sent_cap_drops_for_metrics() -> u64 {
|
||||
pub struct TlsFrontCache {
|
||||
memory: RwLock<HashMap<String, Arc<CachedTlsData>>>,
|
||||
default: Arc<CachedTlsData>,
|
||||
full_cert_sent: RwLock<HashMap<IpAddr, Instant>>,
|
||||
full_cert_sent_shards: Vec<RwLock<HashMap<IpAddr, Instant>>>,
|
||||
full_cert_sent_last_sweep_epoch_secs: AtomicU64,
|
||||
disk_path: PathBuf,
|
||||
}
|
||||
@@ -70,7 +73,9 @@ impl TlsFrontCache {
|
||||
Self {
|
||||
memory: RwLock::new(map),
|
||||
default,
|
||||
full_cert_sent: RwLock::new(HashMap::new()),
|
||||
full_cert_sent_shards: (0..FULL_CERT_SENT_SHARDS)
|
||||
.map(|_| RwLock::new(HashMap::new()))
|
||||
.collect(),
|
||||
full_cert_sent_last_sweep_epoch_secs: AtomicU64::new(0),
|
||||
disk_path: disk_path.as_ref().to_path_buf(),
|
||||
}
|
||||
@@ -88,6 +93,54 @@ impl TlsFrontCache {
|
||||
self.memory.read().await.contains_key(domain)
|
||||
}
|
||||
|
||||
fn full_cert_sent_shard_index(client_ip: IpAddr) -> usize {
|
||||
let mut hasher = DefaultHasher::new();
|
||||
client_ip.hash(&mut hasher);
|
||||
(hasher.finish() as usize) % FULL_CERT_SENT_SHARDS
|
||||
}
|
||||
|
||||
fn full_cert_sent_shard(&self, client_ip: IpAddr) -> &RwLock<HashMap<IpAddr, Instant>> {
|
||||
&self.full_cert_sent_shards[Self::full_cert_sent_shard_index(client_ip)]
|
||||
}
|
||||
|
||||
fn decrement_full_cert_sent_entries(amount: usize) {
|
||||
if amount == 0 {
|
||||
return;
|
||||
}
|
||||
let amount = amount as u64;
|
||||
let _ =
|
||||
FULL_CERT_SENT_IPS_GAUGE.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |current| {
|
||||
Some(current.saturating_sub(amount))
|
||||
});
|
||||
}
|
||||
|
||||
fn try_reserve_full_cert_sent_entry() -> bool {
|
||||
let mut current = FULL_CERT_SENT_IPS_GAUGE.load(Ordering::Relaxed);
|
||||
loop {
|
||||
if current >= FULL_CERT_SENT_MAX_IPS as u64 {
|
||||
return false;
|
||||
}
|
||||
match FULL_CERT_SENT_IPS_GAUGE.compare_exchange_weak(
|
||||
current,
|
||||
current.saturating_add(1),
|
||||
Ordering::AcqRel,
|
||||
Ordering::Relaxed,
|
||||
) {
|
||||
Ok(_) => return true,
|
||||
Err(actual) => current = actual,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn sweep_full_cert_sent_shards(&self, now: Instant, ttl: Duration) {
|
||||
for shard in &self.full_cert_sent_shards {
|
||||
let mut guard = shard.write().await;
|
||||
let before = guard.len();
|
||||
guard.retain(|_, seen_at| now.duration_since(*seen_at) < ttl);
|
||||
Self::decrement_full_cert_sent_entries(before.saturating_sub(guard.len()));
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true when full cert payload should be sent for client_ip
|
||||
/// according to TTL policy.
|
||||
pub async fn take_full_cert_budget_for_ip(&self, client_ip: IpAddr, ttl: Duration) -> bool {
|
||||
@@ -113,11 +166,11 @@ impl TlsFrontCache {
|
||||
})
|
||||
.is_ok();
|
||||
|
||||
let mut guard = self.full_cert_sent.write().await;
|
||||
if should_sweep {
|
||||
guard.retain(|_, seen_at| now.duration_since(*seen_at) < ttl);
|
||||
self.sweep_full_cert_sent_shards(now, ttl).await;
|
||||
}
|
||||
|
||||
let mut guard = self.full_cert_sent_shard(client_ip).write().await;
|
||||
let allowed = match guard.get_mut(&client_ip) {
|
||||
Some(seen_at) => {
|
||||
if now.duration_since(*seen_at) >= ttl {
|
||||
@@ -128,19 +181,43 @@ impl TlsFrontCache {
|
||||
}
|
||||
}
|
||||
None => {
|
||||
if guard.len() >= FULL_CERT_SENT_MAX_IPS {
|
||||
if !Self::try_reserve_full_cert_sent_entry() {
|
||||
FULL_CERT_SENT_CAP_DROPS.fetch_add(1, Ordering::Relaxed);
|
||||
FULL_CERT_SENT_IPS_GAUGE.store(guard.len() as u64, Ordering::Relaxed);
|
||||
return false;
|
||||
}
|
||||
guard.insert(client_ip, now);
|
||||
true
|
||||
}
|
||||
};
|
||||
FULL_CERT_SENT_IPS_GAUGE.store(guard.len() as u64, Ordering::Relaxed);
|
||||
allowed
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
async fn insert_full_cert_sent_for_tests(&self, client_ip: IpAddr, seen_at: Instant) {
|
||||
let mut guard = self.full_cert_sent_shard(client_ip).write().await;
|
||||
if guard.insert(client_ip, seen_at).is_none() {
|
||||
FULL_CERT_SENT_IPS_GAUGE.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
async fn full_cert_sent_is_empty_for_tests(&self) -> bool {
|
||||
for shard in &self.full_cert_sent_shards {
|
||||
if !shard.read().await.is_empty() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
async fn full_cert_sent_contains_for_tests(&self, client_ip: IpAddr) -> bool {
|
||||
self.full_cert_sent_shard(client_ip)
|
||||
.read()
|
||||
.await
|
||||
.contains_key(&client_ip)
|
||||
}
|
||||
|
||||
pub async fn set(&self, domain: &str, data: CachedTlsData) {
|
||||
let mut guard = self.memory.write().await;
|
||||
guard.insert(domain.to_string(), Arc::new(data));
|
||||
@@ -381,7 +458,7 @@ mod tests {
|
||||
assert!(cache.take_full_cert_budget_for_ip(ip, ttl).await);
|
||||
}
|
||||
|
||||
assert!(cache.full_cert_sent.read().await.is_empty());
|
||||
assert!(cache.full_cert_sent_is_empty_for_tests().await);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -395,19 +472,16 @@ mod tests {
|
||||
.unwrap_or_else(Instant::now);
|
||||
|
||||
cache
|
||||
.full_cert_sent
|
||||
.write()
|
||||
.await
|
||||
.insert(stale_ip, stale_seen_at);
|
||||
.insert_full_cert_sent_for_tests(stale_ip, stale_seen_at)
|
||||
.await;
|
||||
cache
|
||||
.full_cert_sent_last_sweep_epoch_secs
|
||||
.store(0, Ordering::Relaxed);
|
||||
|
||||
assert!(cache.take_full_cert_budget_for_ip(new_ip, ttl).await);
|
||||
|
||||
let guard = cache.full_cert_sent.read().await;
|
||||
assert!(!guard.contains_key(&stale_ip));
|
||||
assert!(guard.contains_key(&new_ip));
|
||||
assert!(!cache.full_cert_sent_contains_for_tests(stale_ip).await);
|
||||
assert!(cache.full_cert_sent_contains_for_tests(new_ip).await);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -425,18 +499,15 @@ mod tests {
|
||||
.as_secs();
|
||||
|
||||
cache
|
||||
.full_cert_sent
|
||||
.write()
|
||||
.await
|
||||
.insert(stale_ip, stale_seen_at);
|
||||
.insert_full_cert_sent_for_tests(stale_ip, stale_seen_at)
|
||||
.await;
|
||||
cache
|
||||
.full_cert_sent_last_sweep_epoch_secs
|
||||
.store(now_epoch_secs, Ordering::Relaxed);
|
||||
|
||||
assert!(cache.take_full_cert_budget_for_ip(new_ip, ttl).await);
|
||||
|
||||
let guard = cache.full_cert_sent.read().await;
|
||||
assert!(guard.contains_key(&stale_ip));
|
||||
assert!(guard.contains_key(&new_ip));
|
||||
assert!(cache.full_cert_sent_contains_for_tests(stale_ip).await);
|
||||
assert!(cache.full_cert_sent_contains_for_tests(new_ip).await);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user