mirror of
https://github.com/telemt/telemt.git
synced 2026-04-30 17:04:11 +03:00
TLS Full Certificate Budget Bookkeeping + Hot-path Cleanup and Timeout Invariants + IP-Tracker refactoring + Shard TLS Full-Cert Budget: merge pull request #753 from telemt/flow
TLS Full Certificate Budget Bookkeeping + Hot-path Cleanup and Timeout Invariants + IP-Tracker refactoring + Shard TLS Full-Cert Budget
This commit is contained in:
@@ -1087,9 +1087,9 @@ impl ProxyConfig {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.general.me_route_blocking_send_timeout_ms > 5000 {
|
if !(1..=5000).contains(&config.general.me_route_blocking_send_timeout_ms) {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
"general.me_route_blocking_send_timeout_ms must be within [0, 5000]".to_string(),
|
"general.me_route_blocking_send_timeout_ms must be within [1, 5000]".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2602,6 +2602,26 @@ mod tests {
|
|||||||
let _ = std::fs::remove_file(path);
|
let _ = std::fs::remove_file(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn me_route_blocking_send_timeout_ms_zero_is_rejected() {
|
||||||
|
let toml = r#"
|
||||||
|
[general]
|
||||||
|
me_route_blocking_send_timeout_ms = 0
|
||||||
|
|
||||||
|
[censorship]
|
||||||
|
tls_domain = "example.com"
|
||||||
|
|
||||||
|
[access.users]
|
||||||
|
user = "00000000000000000000000000000000"
|
||||||
|
"#;
|
||||||
|
let dir = std::env::temp_dir();
|
||||||
|
let path = dir.join("telemt_me_route_blocking_send_timeout_zero_test.toml");
|
||||||
|
std::fs::write(&path, toml).unwrap();
|
||||||
|
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||||
|
assert!(err.contains("general.me_route_blocking_send_timeout_ms must be within [1, 5000]"));
|
||||||
|
let _ = std::fs::remove_file(path);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn me_route_no_writer_mode_is_parsed() {
|
fn me_route_no_writer_mode_is_parsed() {
|
||||||
let toml = r#"
|
let toml = r#"
|
||||||
|
|||||||
@@ -778,7 +778,7 @@ pub struct GeneralConfig {
|
|||||||
pub me_route_hybrid_max_wait_ms: u64,
|
pub me_route_hybrid_max_wait_ms: u64,
|
||||||
|
|
||||||
/// Maximum wait in milliseconds for blocking ME writer channel send fallback.
|
/// Maximum wait in milliseconds for blocking ME writer channel send fallback.
|
||||||
/// `0` keeps legacy unbounded wait behavior.
|
/// Must be within [1, 5000].
|
||||||
#[serde(default = "default_me_route_blocking_send_timeout_ms")]
|
#[serde(default = "default_me_route_blocking_send_timeout_ms")]
|
||||||
pub me_route_blocking_send_timeout_ms: u64,
|
pub me_route_blocking_send_timeout_ms: u64,
|
||||||
|
|
||||||
|
|||||||
@@ -9,14 +9,24 @@ use std::sync::Mutex;
|
|||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use tokio::sync::{Mutex as AsyncMutex, RwLock};
|
use tokio::sync::{Mutex as AsyncMutex, RwLock, RwLockWriteGuard};
|
||||||
|
|
||||||
use crate::config::UserMaxUniqueIpsMode;
|
use crate::config::UserMaxUniqueIpsMode;
|
||||||
|
|
||||||
|
const CLEANUP_DRAIN_BATCH_LIMIT: usize = 1024;
|
||||||
|
const MAX_ACTIVE_IP_ENTRIES: u64 = 131_072;
|
||||||
|
const MAX_RECENT_IP_ENTRIES: u64 = 262_144;
|
||||||
|
|
||||||
|
/// Tracks active and recent client IPs for per-user admission control.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct UserIpTracker {
|
pub struct UserIpTracker {
|
||||||
active_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, usize>>>>,
|
active_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, usize>>>>,
|
||||||
recent_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, Instant>>>>,
|
recent_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, Instant>>>>,
|
||||||
|
active_entry_count: Arc<AtomicU64>,
|
||||||
|
recent_entry_count: Arc<AtomicU64>,
|
||||||
|
active_cap_rejects: Arc<AtomicU64>,
|
||||||
|
recent_cap_rejects: Arc<AtomicU64>,
|
||||||
|
cleanup_deferred_releases: Arc<AtomicU64>,
|
||||||
max_ips: Arc<RwLock<HashMap<String, usize>>>,
|
max_ips: Arc<RwLock<HashMap<String, usize>>>,
|
||||||
default_max_ips: Arc<RwLock<usize>>,
|
default_max_ips: Arc<RwLock<usize>>,
|
||||||
limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>,
|
limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>,
|
||||||
@@ -26,13 +36,25 @@ pub struct UserIpTracker {
|
|||||||
cleanup_drain_lock: Arc<AsyncMutex<()>>,
|
cleanup_drain_lock: Arc<AsyncMutex<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Point-in-time memory counters for user/IP limiter state.
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
pub struct UserIpTrackerMemoryStats {
|
pub struct UserIpTrackerMemoryStats {
|
||||||
|
/// Number of users with active IP state.
|
||||||
pub active_users: usize,
|
pub active_users: usize,
|
||||||
|
/// Number of users with recent IP state.
|
||||||
pub recent_users: usize,
|
pub recent_users: usize,
|
||||||
|
/// Number of active `(user, ip)` entries.
|
||||||
pub active_entries: usize,
|
pub active_entries: usize,
|
||||||
|
/// Number of recent-window `(user, ip)` entries.
|
||||||
pub recent_entries: usize,
|
pub recent_entries: usize,
|
||||||
|
/// Number of deferred disconnect cleanups waiting to be drained.
|
||||||
pub cleanup_queue_len: usize,
|
pub cleanup_queue_len: usize,
|
||||||
|
/// Number of new connections rejected by the global active-entry cap.
|
||||||
|
pub active_cap_rejects: u64,
|
||||||
|
/// Number of new connections rejected by the global recent-entry cap.
|
||||||
|
pub recent_cap_rejects: u64,
|
||||||
|
/// Number of release cleanups deferred through the cleanup queue.
|
||||||
|
pub cleanup_deferred_releases: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UserIpTracker {
|
impl UserIpTracker {
|
||||||
@@ -40,6 +62,11 @@ impl UserIpTracker {
|
|||||||
Self {
|
Self {
|
||||||
active_ips: Arc::new(RwLock::new(HashMap::new())),
|
active_ips: Arc::new(RwLock::new(HashMap::new())),
|
||||||
recent_ips: Arc::new(RwLock::new(HashMap::new())),
|
recent_ips: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
active_entry_count: Arc::new(AtomicU64::new(0)),
|
||||||
|
recent_entry_count: Arc::new(AtomicU64::new(0)),
|
||||||
|
active_cap_rejects: Arc::new(AtomicU64::new(0)),
|
||||||
|
recent_cap_rejects: Arc::new(AtomicU64::new(0)),
|
||||||
|
cleanup_deferred_releases: Arc::new(AtomicU64::new(0)),
|
||||||
max_ips: Arc::new(RwLock::new(HashMap::new())),
|
max_ips: Arc::new(RwLock::new(HashMap::new())),
|
||||||
default_max_ips: Arc::new(RwLock::new(0)),
|
default_max_ips: Arc::new(RwLock::new(0)),
|
||||||
limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)),
|
limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)),
|
||||||
@@ -50,16 +77,59 @@ impl UserIpTracker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn decrement_counter(counter: &AtomicU64, amount: usize) {
|
||||||
|
if amount == 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let amount = amount as u64;
|
||||||
|
let _ = counter.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |current| {
|
||||||
|
Some(current.saturating_sub(amount))
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn apply_active_cleanup(
|
||||||
|
active_ips: &mut HashMap<String, HashMap<IpAddr, usize>>,
|
||||||
|
user: &str,
|
||||||
|
ip: IpAddr,
|
||||||
|
pending_count: usize,
|
||||||
|
) -> usize {
|
||||||
|
if pending_count == 0 {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut remove_user = false;
|
||||||
|
let mut removed_active_entries = 0usize;
|
||||||
|
if let Some(user_ips) = active_ips.get_mut(user) {
|
||||||
|
if let Some(count) = user_ips.get_mut(&ip) {
|
||||||
|
if *count > pending_count {
|
||||||
|
*count -= pending_count;
|
||||||
|
} else if user_ips.remove(&ip).is_some() {
|
||||||
|
removed_active_entries = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
remove_user = user_ips.is_empty();
|
||||||
|
}
|
||||||
|
if remove_user {
|
||||||
|
active_ips.remove(user);
|
||||||
|
}
|
||||||
|
removed_active_entries
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Queues a deferred active IP cleanup for a later async drain.
|
||||||
pub fn enqueue_cleanup(&self, user: String, ip: IpAddr) {
|
pub fn enqueue_cleanup(&self, user: String, ip: IpAddr) {
|
||||||
match self.cleanup_queue.lock() {
|
match self.cleanup_queue.lock() {
|
||||||
Ok(mut queue) => {
|
Ok(mut queue) => {
|
||||||
let count = queue.entry((user, ip)).or_insert(0);
|
let count = queue.entry((user, ip)).or_insert(0);
|
||||||
*count = count.saturating_add(1);
|
*count = count.saturating_add(1);
|
||||||
|
self.cleanup_deferred_releases
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
Err(poisoned) => {
|
Err(poisoned) => {
|
||||||
let mut queue = poisoned.into_inner();
|
let mut queue = poisoned.into_inner();
|
||||||
let count = queue.entry((user.clone(), ip)).or_insert(0);
|
let count = queue.entry((user.clone(), ip)).or_insert(0);
|
||||||
*count = count.saturating_add(1);
|
*count = count.saturating_add(1);
|
||||||
|
self.cleanup_deferred_releases
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
self.cleanup_queue.clear_poison();
|
self.cleanup_queue.clear_poison();
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
"UserIpTracker cleanup_queue lock poisoned; recovered and enqueued IP cleanup for {} ({})",
|
"UserIpTracker cleanup_queue lock poisoned; recovered and enqueued IP cleanup for {} ({})",
|
||||||
@@ -86,16 +156,27 @@ impl UserIpTracker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn drain_cleanup_queue(&self) {
|
pub(crate) async fn drain_cleanup_queue(&self) {
|
||||||
// Serialize queue draining and active-IP mutation so check-and-add cannot
|
let Ok(_drain_guard) = self.cleanup_drain_lock.try_lock() else {
|
||||||
// observe stale active entries that are already queued for removal.
|
return;
|
||||||
let _drain_guard = self.cleanup_drain_lock.lock().await;
|
};
|
||||||
|
|
||||||
let to_remove = {
|
let to_remove = {
|
||||||
match self.cleanup_queue.lock() {
|
match self.cleanup_queue.lock() {
|
||||||
Ok(mut queue) => {
|
Ok(mut queue) => {
|
||||||
if queue.is_empty() {
|
if queue.is_empty() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
std::mem::take(&mut *queue)
|
let mut drained =
|
||||||
|
HashMap::with_capacity(queue.len().min(CLEANUP_DRAIN_BATCH_LIMIT));
|
||||||
|
for _ in 0..CLEANUP_DRAIN_BATCH_LIMIT {
|
||||||
|
let Some(key) = queue.keys().next().cloned() else {
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
if let Some(count) = queue.remove(&key) {
|
||||||
|
drained.insert(key, count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drained
|
||||||
}
|
}
|
||||||
Err(poisoned) => {
|
Err(poisoned) => {
|
||||||
let mut queue = poisoned.into_inner();
|
let mut queue = poisoned.into_inner();
|
||||||
@@ -103,31 +184,33 @@ impl UserIpTracker {
|
|||||||
self.cleanup_queue.clear_poison();
|
self.cleanup_queue.clear_poison();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let drained = std::mem::take(&mut *queue);
|
let mut drained =
|
||||||
|
HashMap::with_capacity(queue.len().min(CLEANUP_DRAIN_BATCH_LIMIT));
|
||||||
|
for _ in 0..CLEANUP_DRAIN_BATCH_LIMIT {
|
||||||
|
let Some(key) = queue.keys().next().cloned() else {
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
if let Some(count) = queue.remove(&key) {
|
||||||
|
drained.insert(key, count);
|
||||||
|
}
|
||||||
|
}
|
||||||
self.cleanup_queue.clear_poison();
|
self.cleanup_queue.clear_poison();
|
||||||
drained
|
drained
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
if to_remove.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let mut active_ips = self.active_ips.write().await;
|
let mut active_ips = self.active_ips.write().await;
|
||||||
|
let mut removed_active_entries = 0usize;
|
||||||
for ((user, ip), pending_count) in to_remove {
|
for ((user, ip), pending_count) in to_remove {
|
||||||
if pending_count == 0 {
|
removed_active_entries = removed_active_entries.saturating_add(
|
||||||
continue;
|
Self::apply_active_cleanup(&mut active_ips, &user, ip, pending_count),
|
||||||
}
|
);
|
||||||
if let Some(user_ips) = active_ips.get_mut(&user) {
|
|
||||||
if let Some(count) = user_ips.get_mut(&ip) {
|
|
||||||
if *count > pending_count {
|
|
||||||
*count -= pending_count;
|
|
||||||
} else {
|
|
||||||
user_ips.remove(&ip);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if user_ips.is_empty() {
|
|
||||||
active_ips.remove(&user);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Self::decrement_counter(&self.active_entry_count, removed_active_entries);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn now_epoch_secs() -> u64 {
|
fn now_epoch_secs() -> u64 {
|
||||||
@@ -137,6 +220,24 @@ impl UserIpTracker {
|
|||||||
.as_secs()
|
.as_secs()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn active_and_recent_write(
|
||||||
|
&self,
|
||||||
|
) -> (
|
||||||
|
RwLockWriteGuard<'_, HashMap<String, HashMap<IpAddr, usize>>>,
|
||||||
|
RwLockWriteGuard<'_, HashMap<String, HashMap<IpAddr, Instant>>>,
|
||||||
|
) {
|
||||||
|
loop {
|
||||||
|
let active_ips = self.active_ips.write().await;
|
||||||
|
match self.recent_ips.try_write() {
|
||||||
|
Ok(recent_ips) => return (active_ips, recent_ips),
|
||||||
|
Err(_) => {
|
||||||
|
drop(active_ips);
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn maybe_compact_empty_users(&self) {
|
async fn maybe_compact_empty_users(&self) {
|
||||||
const COMPACT_INTERVAL_SECS: u64 = 60;
|
const COMPACT_INTERVAL_SECS: u64 = 60;
|
||||||
let now_epoch_secs = Self::now_epoch_secs();
|
let now_epoch_secs = Self::now_epoch_secs();
|
||||||
@@ -157,14 +258,16 @@ impl UserIpTracker {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut active_ips = self.active_ips.write().await;
|
|
||||||
let mut recent_ips = self.recent_ips.write().await;
|
|
||||||
let window = *self.limit_window.read().await;
|
let window = *self.limit_window.read().await;
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
let (mut active_ips, mut recent_ips) = self.active_and_recent_write().await;
|
||||||
|
|
||||||
|
let mut pruned_recent_entries = 0usize;
|
||||||
for user_recent in recent_ips.values_mut() {
|
for user_recent in recent_ips.values_mut() {
|
||||||
Self::prune_recent(user_recent, now, window);
|
pruned_recent_entries =
|
||||||
|
pruned_recent_entries.saturating_add(Self::prune_recent(user_recent, now, window));
|
||||||
}
|
}
|
||||||
|
Self::decrement_counter(&self.recent_entry_count, pruned_recent_entries);
|
||||||
|
|
||||||
let mut users =
|
let mut users =
|
||||||
Vec::<String>::with_capacity(active_ips.len().saturating_add(recent_ips.len()));
|
Vec::<String>::with_capacity(active_ips.len().saturating_add(recent_ips.len()));
|
||||||
@@ -208,6 +311,9 @@ impl UserIpTracker {
|
|||||||
active_entries,
|
active_entries,
|
||||||
recent_entries,
|
recent_entries,
|
||||||
cleanup_queue_len,
|
cleanup_queue_len,
|
||||||
|
active_cap_rejects: self.active_cap_rejects.load(Ordering::Relaxed),
|
||||||
|
recent_cap_rejects: self.recent_cap_rejects.load(Ordering::Relaxed),
|
||||||
|
cleanup_deferred_releases: self.cleanup_deferred_releases.load(Ordering::Relaxed),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -238,11 +344,17 @@ impl UserIpTracker {
|
|||||||
max_ips.clone_from(limits);
|
max_ips.clone_from(limits);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn prune_recent(user_recent: &mut HashMap<IpAddr, Instant>, now: Instant, window: Duration) {
|
fn prune_recent(
|
||||||
|
user_recent: &mut HashMap<IpAddr, Instant>,
|
||||||
|
now: Instant,
|
||||||
|
window: Duration,
|
||||||
|
) -> usize {
|
||||||
if user_recent.is_empty() {
|
if user_recent.is_empty() {
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
|
let before = user_recent.len();
|
||||||
user_recent.retain(|_, seen_at| now.duration_since(*seen_at) <= window);
|
user_recent.retain(|_, seen_at| now.duration_since(*seen_at) <= window);
|
||||||
|
before.saturating_sub(user_recent.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> {
|
pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> {
|
||||||
@@ -261,24 +373,36 @@ impl UserIpTracker {
|
|||||||
let window = *self.limit_window.read().await;
|
let window = *self.limit_window.read().await;
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
let mut active_ips = self.active_ips.write().await;
|
let (mut active_ips, mut recent_ips) = self.active_and_recent_write().await;
|
||||||
let user_active = active_ips
|
let user_active = active_ips
|
||||||
.entry(username.to_string())
|
.entry(username.to_string())
|
||||||
.or_insert_with(HashMap::new);
|
.or_insert_with(HashMap::new);
|
||||||
|
|
||||||
let mut recent_ips = self.recent_ips.write().await;
|
|
||||||
let user_recent = recent_ips
|
let user_recent = recent_ips
|
||||||
.entry(username.to_string())
|
.entry(username.to_string())
|
||||||
.or_insert_with(HashMap::new);
|
.or_insert_with(HashMap::new);
|
||||||
Self::prune_recent(user_recent, now, window);
|
let pruned_recent_entries = Self::prune_recent(user_recent, now, window);
|
||||||
|
Self::decrement_counter(&self.recent_entry_count, pruned_recent_entries);
|
||||||
|
let recent_contains_ip = user_recent.contains_key(&ip);
|
||||||
|
|
||||||
if let Some(count) = user_active.get_mut(&ip) {
|
if let Some(count) = user_active.get_mut(&ip) {
|
||||||
|
if !recent_contains_ip
|
||||||
|
&& self.recent_entry_count.load(Ordering::Relaxed) >= MAX_RECENT_IP_ENTRIES
|
||||||
|
{
|
||||||
|
self.recent_cap_rejects.fetch_add(1, Ordering::Relaxed);
|
||||||
|
return Err(format!(
|
||||||
|
"IP tracker recent entry cap reached: entries={}/{}",
|
||||||
|
self.recent_entry_count.load(Ordering::Relaxed),
|
||||||
|
MAX_RECENT_IP_ENTRIES
|
||||||
|
));
|
||||||
|
}
|
||||||
*count = count.saturating_add(1);
|
*count = count.saturating_add(1);
|
||||||
user_recent.insert(ip, now);
|
if user_recent.insert(ip, now).is_none() {
|
||||||
|
self.recent_entry_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let is_new_ip = !user_recent.contains_key(&ip);
|
let is_new_ip = !recent_contains_ip;
|
||||||
|
|
||||||
if let Some(limit) = limit {
|
if let Some(limit) = limit {
|
||||||
let active_limit_reached = user_active.len() >= limit;
|
let active_limit_reached = user_active.len() >= limit;
|
||||||
@@ -302,30 +426,62 @@ impl UserIpTracker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
user_active.insert(ip, 1);
|
if self.active_entry_count.load(Ordering::Relaxed) >= MAX_ACTIVE_IP_ENTRIES {
|
||||||
user_recent.insert(ip, now);
|
self.active_cap_rejects.fetch_add(1, Ordering::Relaxed);
|
||||||
|
return Err(format!(
|
||||||
|
"IP tracker active entry cap reached: entries={}/{}",
|
||||||
|
self.active_entry_count.load(Ordering::Relaxed),
|
||||||
|
MAX_ACTIVE_IP_ENTRIES
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if is_new_ip && self.recent_entry_count.load(Ordering::Relaxed) >= MAX_RECENT_IP_ENTRIES {
|
||||||
|
self.recent_cap_rejects.fetch_add(1, Ordering::Relaxed);
|
||||||
|
return Err(format!(
|
||||||
|
"IP tracker recent entry cap reached: entries={}/{}",
|
||||||
|
self.recent_entry_count.load(Ordering::Relaxed),
|
||||||
|
MAX_RECENT_IP_ENTRIES
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if user_active.insert(ip, 1).is_none() {
|
||||||
|
self.active_entry_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
if user_recent.insert(ip, now).is_none() {
|
||||||
|
self.recent_entry_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn remove_ip(&self, username: &str, ip: IpAddr) {
|
pub async fn remove_ip(&self, username: &str, ip: IpAddr) {
|
||||||
self.maybe_compact_empty_users().await;
|
self.maybe_compact_empty_users().await;
|
||||||
let mut active_ips = self.active_ips.write().await;
|
let mut active_ips = self.active_ips.write().await;
|
||||||
|
let mut removed_active_entries = 0usize;
|
||||||
if let Some(user_ips) = active_ips.get_mut(username) {
|
if let Some(user_ips) = active_ips.get_mut(username) {
|
||||||
if let Some(count) = user_ips.get_mut(&ip) {
|
if let Some(count) = user_ips.get_mut(&ip) {
|
||||||
if *count > 1 {
|
if *count > 1 {
|
||||||
*count -= 1;
|
*count -= 1;
|
||||||
} else {
|
} else {
|
||||||
user_ips.remove(&ip);
|
if user_ips.remove(&ip).is_some() {
|
||||||
|
removed_active_entries = 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if user_ips.is_empty() {
|
if user_ips.is_empty() {
|
||||||
active_ips.remove(username);
|
active_ips.remove(username);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Self::decrement_counter(&self.active_entry_count, removed_active_entries);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_recent_counts_for_users(&self, users: &[String]) -> HashMap<String, usize> {
|
pub async fn get_recent_counts_for_users(&self, users: &[String]) -> HashMap<String, usize> {
|
||||||
self.drain_cleanup_queue().await;
|
self.drain_cleanup_queue().await;
|
||||||
|
self.get_recent_counts_for_users_snapshot(users).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn get_recent_counts_for_users_snapshot(
|
||||||
|
&self,
|
||||||
|
users: &[String],
|
||||||
|
) -> HashMap<String, usize> {
|
||||||
let window = *self.limit_window.read().await;
|
let window = *self.limit_window.read().await;
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let recent_ips = self.recent_ips.read().await;
|
let recent_ips = self.recent_ips.read().await;
|
||||||
@@ -400,19 +556,29 @@ impl UserIpTracker {
|
|||||||
|
|
||||||
pub async fn get_stats(&self) -> Vec<(String, usize, usize)> {
|
pub async fn get_stats(&self) -> Vec<(String, usize, usize)> {
|
||||||
self.drain_cleanup_queue().await;
|
self.drain_cleanup_queue().await;
|
||||||
|
self.get_stats_snapshot().await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn get_stats_snapshot(&self) -> Vec<(String, usize, usize)> {
|
||||||
let active_ips = self.active_ips.read().await;
|
let active_ips = self.active_ips.read().await;
|
||||||
|
let active_counts = active_ips
|
||||||
|
.iter()
|
||||||
|
.map(|(username, user_ips)| (username.clone(), user_ips.len()))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
drop(active_ips);
|
||||||
|
|
||||||
let max_ips = self.max_ips.read().await;
|
let max_ips = self.max_ips.read().await;
|
||||||
let default_max_ips = *self.default_max_ips.read().await;
|
let default_max_ips = *self.default_max_ips.read().await;
|
||||||
|
|
||||||
let mut stats = Vec::new();
|
let mut stats = Vec::with_capacity(active_counts.len());
|
||||||
for (username, user_ips) in active_ips.iter() {
|
for (username, active_count) in active_counts {
|
||||||
let limit = max_ips
|
let limit = max_ips
|
||||||
.get(username)
|
.get(&username)
|
||||||
.copied()
|
.copied()
|
||||||
.filter(|limit| *limit > 0)
|
.filter(|limit| *limit > 0)
|
||||||
.or((default_max_ips > 0).then_some(default_max_ips))
|
.or((default_max_ips > 0).then_some(default_max_ips))
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
stats.push((username.clone(), user_ips.len(), limit));
|
stats.push((username, active_count, limit));
|
||||||
}
|
}
|
||||||
|
|
||||||
stats.sort_by(|a, b| a.0.cmp(&b.0));
|
stats.sort_by(|a, b| a.0.cmp(&b.0));
|
||||||
@@ -421,20 +587,30 @@ impl UserIpTracker {
|
|||||||
|
|
||||||
pub async fn clear_user_ips(&self, username: &str) {
|
pub async fn clear_user_ips(&self, username: &str) {
|
||||||
let mut active_ips = self.active_ips.write().await;
|
let mut active_ips = self.active_ips.write().await;
|
||||||
active_ips.remove(username);
|
let removed_active_entries = active_ips
|
||||||
|
.remove(username)
|
||||||
|
.map(|ips| ips.len())
|
||||||
|
.unwrap_or(0);
|
||||||
drop(active_ips);
|
drop(active_ips);
|
||||||
|
Self::decrement_counter(&self.active_entry_count, removed_active_entries);
|
||||||
|
|
||||||
let mut recent_ips = self.recent_ips.write().await;
|
let mut recent_ips = self.recent_ips.write().await;
|
||||||
recent_ips.remove(username);
|
let removed_recent_entries = recent_ips
|
||||||
|
.remove(username)
|
||||||
|
.map(|ips| ips.len())
|
||||||
|
.unwrap_or(0);
|
||||||
|
Self::decrement_counter(&self.recent_entry_count, removed_recent_entries);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn clear_all(&self) {
|
pub async fn clear_all(&self) {
|
||||||
let mut active_ips = self.active_ips.write().await;
|
let mut active_ips = self.active_ips.write().await;
|
||||||
active_ips.clear();
|
active_ips.clear();
|
||||||
drop(active_ips);
|
drop(active_ips);
|
||||||
|
self.active_entry_count.store(0, Ordering::Relaxed);
|
||||||
|
|
||||||
let mut recent_ips = self.recent_ips.write().await;
|
let mut recent_ips = self.recent_ips.write().await;
|
||||||
recent_ips.clear();
|
recent_ips.clear();
|
||||||
|
self.recent_entry_count.store(0, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn is_ip_active(&self, username: &str, ip: IpAddr) -> bool {
|
pub async fn is_ip_active(&self, username: &str, ip: IpAddr) -> bool {
|
||||||
|
|||||||
163
src/metrics.rs
163
src/metrics.rs
@@ -18,8 +18,13 @@ use crate::ip_tracker::UserIpTracker;
|
|||||||
use crate::proxy::shared_state::ProxySharedState;
|
use crate::proxy::shared_state::ProxySharedState;
|
||||||
use crate::stats::Stats;
|
use crate::stats::Stats;
|
||||||
use crate::stats::beobachten::BeobachtenStore;
|
use crate::stats::beobachten::BeobachtenStore;
|
||||||
|
use crate::tls_front::cache;
|
||||||
|
use crate::tls_front::fetcher;
|
||||||
use crate::transport::{ListenOptions, create_listener};
|
use crate::transport::{ListenOptions, create_listener};
|
||||||
|
|
||||||
|
// Keeps `/metrics` response size bounded when per-user telemetry is enabled.
|
||||||
|
const USER_LABELED_METRICS_MAX_USERS: usize = 4096;
|
||||||
|
|
||||||
pub async fn serve(
|
pub async fn serve(
|
||||||
port: u16,
|
port: u16,
|
||||||
listen: Option<String>,
|
listen: Option<String>,
|
||||||
@@ -311,6 +316,12 @@ async fn render_metrics(
|
|||||||
"telemt_telemetry_user_enabled {}",
|
"telemt_telemetry_user_enabled {}",
|
||||||
if user_enabled { 1 } else { 0 }
|
if user_enabled { 1 } else { 0 }
|
||||||
);
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_stats_user_entries Retained per-user stats entries"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_stats_user_entries gauge");
|
||||||
|
let _ = writeln!(out, "telemt_stats_user_entries {}", stats.user_stats_len());
|
||||||
|
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
out,
|
out,
|
||||||
@@ -366,6 +377,53 @@ async fn render_metrics(
|
|||||||
stats.get_buffer_pool_in_use_gauge()
|
stats.get_buffer_pool_in_use_gauge()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_tls_fetch_profile_cache_entries Current adaptive TLS fetch profile-cache entries"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_tls_fetch_profile_cache_entries gauge");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_tls_fetch_profile_cache_entries {}",
|
||||||
|
fetcher::profile_cache_entries_for_metrics()
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_tls_fetch_profile_cache_cap_drops_total Profile-cache winner inserts skipped because the cache cap was reached"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_tls_fetch_profile_cache_cap_drops_total counter"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_tls_fetch_profile_cache_cap_drops_total {}",
|
||||||
|
fetcher::profile_cache_cap_drops_for_metrics()
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_tls_front_full_cert_budget_ips Current IP entries tracked by TLS full-cert budget"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_tls_front_full_cert_budget_ips gauge");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_tls_front_full_cert_budget_ips {}",
|
||||||
|
cache::full_cert_sent_ips_for_metrics()
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_tls_front_full_cert_budget_cap_drops_total New IPs denied full-cert budget tracking because the cap was reached"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_tls_front_full_cert_budget_cap_drops_total counter"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_tls_front_full_cert_budget_cap_drops_total {}",
|
||||||
|
cache::full_cert_sent_cap_drops_for_metrics()
|
||||||
|
);
|
||||||
|
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
out,
|
out,
|
||||||
"# HELP telemt_connections_total Total accepted connections"
|
"# HELP telemt_connections_total Total accepted connections"
|
||||||
@@ -3019,17 +3077,6 @@ async fn render_metrics(
|
|||||||
0
|
0
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag"
|
|
||||||
);
|
|
||||||
let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_suppressed gauge");
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_telemetry_user_series_suppressed {}",
|
|
||||||
if user_enabled { 0 } else { 1 }
|
|
||||||
);
|
|
||||||
|
|
||||||
let ip_memory = ip_tracker.memory_stats().await;
|
let ip_memory = ip_tracker.memory_stats().await;
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
out,
|
out,
|
||||||
@@ -3071,11 +3118,46 @@ async fn render_metrics(
|
|||||||
"telemt_ip_tracker_cleanup_queue_len {}",
|
"telemt_ip_tracker_cleanup_queue_len {}",
|
||||||
ip_memory.cleanup_queue_len
|
ip_memory.cleanup_queue_len
|
||||||
);
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_ip_tracker_cleanup_total Release cleanups deferred through the cleanup queue"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_ip_tracker_cleanup_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_ip_tracker_cleanup_total{{path=\"deferred\"}} {}",
|
||||||
|
ip_memory.cleanup_deferred_releases
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_ip_tracker_cap_rejects_total New connection rejects caused by global IP tracker caps"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_ip_tracker_cap_rejects_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_ip_tracker_cap_rejects_total{{scope=\"active\"}} {}",
|
||||||
|
ip_memory.active_cap_rejects
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_ip_tracker_cap_rejects_total{{scope=\"recent\"}} {}",
|
||||||
|
ip_memory.recent_cap_rejects
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut user_stats_emitted = 0usize;
|
||||||
|
let mut user_stats_suppressed = 0usize;
|
||||||
|
let mut unique_ip_emitted = 0usize;
|
||||||
|
let mut unique_ip_suppressed = 0usize;
|
||||||
|
|
||||||
if user_enabled {
|
if user_enabled {
|
||||||
for entry in stats.iter_user_stats() {
|
for entry in stats.iter_user_stats() {
|
||||||
|
if user_stats_emitted >= USER_LABELED_METRICS_MAX_USERS {
|
||||||
|
user_stats_suppressed = user_stats_suppressed.saturating_add(1);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
let user = entry.key();
|
let user = entry.key();
|
||||||
let s = entry.value();
|
let s = entry.value();
|
||||||
|
user_stats_emitted = user_stats_emitted.saturating_add(1);
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
out,
|
out,
|
||||||
"telemt_user_connections_total{{user=\"{}\"}} {}",
|
"telemt_user_connections_total{{user=\"{}\"}} {}",
|
||||||
@@ -3117,7 +3199,7 @@ async fn render_metrics(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let ip_stats = ip_tracker.get_stats().await;
|
let ip_stats = ip_tracker.get_stats_snapshot().await;
|
||||||
let ip_counts: HashMap<String, usize> = ip_stats
|
let ip_counts: HashMap<String, usize> = ip_stats
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(user, count, _)| (user, count))
|
.map(|(user, count, _)| (user, count))
|
||||||
@@ -3129,7 +3211,7 @@ async fn render_metrics(
|
|||||||
unique_users.extend(ip_counts.keys().cloned());
|
unique_users.extend(ip_counts.keys().cloned());
|
||||||
let unique_users_vec: Vec<String> = unique_users.iter().cloned().collect();
|
let unique_users_vec: Vec<String> = unique_users.iter().cloned().collect();
|
||||||
let recent_counts = ip_tracker
|
let recent_counts = ip_tracker
|
||||||
.get_recent_counts_for_users(&unique_users_vec)
|
.get_recent_counts_for_users_snapshot(&unique_users_vec)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
@@ -3154,6 +3236,11 @@ async fn render_metrics(
|
|||||||
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge");
|
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge");
|
||||||
|
|
||||||
for user in unique_users {
|
for user in unique_users {
|
||||||
|
if unique_ip_emitted >= USER_LABELED_METRICS_MAX_USERS {
|
||||||
|
unique_ip_suppressed = unique_ip_suppressed.saturating_add(1);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
unique_ip_emitted = unique_ip_emitted.saturating_add(1);
|
||||||
let current = ip_counts.get(&user).copied().unwrap_or(0);
|
let current = ip_counts.get(&user).copied().unwrap_or(0);
|
||||||
let limit = config
|
let limit = config
|
||||||
.access
|
.access
|
||||||
@@ -3193,6 +3280,46 @@ async fn render_metrics(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_suppressed gauge");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_telemetry_user_series_suppressed {}",
|
||||||
|
if user_enabled && user_stats_suppressed == 0 && unique_ip_suppressed == 0 {
|
||||||
|
0
|
||||||
|
} else {
|
||||||
|
1
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_telemetry_user_series_users User-labeled metric users by export status"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_users gauge");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_telemetry_user_series_users{{family=\"stats\",status=\"emitted\"}} {}",
|
||||||
|
user_stats_emitted
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_telemetry_user_series_users{{family=\"stats\",status=\"suppressed\"}} {}",
|
||||||
|
user_stats_suppressed
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_telemetry_user_series_users{{family=\"unique_ip\",status=\"emitted\"}} {}",
|
||||||
|
unique_ip_emitted
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_telemetry_user_series_users{{family=\"unique_ip\",status=\"suppressed\"}} {}",
|
||||||
|
unique_ip_suppressed
|
||||||
|
);
|
||||||
|
|
||||||
out
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3406,9 +3533,19 @@ mod tests {
|
|||||||
assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge"));
|
assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge"));
|
||||||
assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge"));
|
assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge"));
|
||||||
assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge"));
|
assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge"));
|
||||||
|
assert!(output.contains("# TYPE telemt_stats_user_entries gauge"));
|
||||||
|
assert!(output.contains("# TYPE telemt_telemetry_user_series_users gauge"));
|
||||||
assert!(output.contains("# TYPE telemt_ip_tracker_users gauge"));
|
assert!(output.contains("# TYPE telemt_ip_tracker_users gauge"));
|
||||||
assert!(output.contains("# TYPE telemt_ip_tracker_entries gauge"));
|
assert!(output.contains("# TYPE telemt_ip_tracker_entries gauge"));
|
||||||
assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_queue_len gauge"));
|
assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_queue_len gauge"));
|
||||||
|
assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_total counter"));
|
||||||
|
assert!(output.contains("# TYPE telemt_ip_tracker_cap_rejects_total counter"));
|
||||||
|
assert!(output.contains("# TYPE telemt_tls_fetch_profile_cache_entries gauge"));
|
||||||
|
assert!(output.contains("# TYPE telemt_tls_fetch_profile_cache_cap_drops_total counter"));
|
||||||
|
assert!(output.contains("# TYPE telemt_tls_front_full_cert_budget_ips gauge"));
|
||||||
|
assert!(
|
||||||
|
output.contains("# TYPE telemt_tls_front_full_cert_budget_cap_drops_total counter")
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|||||||
@@ -1432,7 +1432,7 @@ impl RunningClientHandler {
|
|||||||
/// Main dispatch after successful handshake.
|
/// Main dispatch after successful handshake.
|
||||||
/// Two modes:
|
/// Two modes:
|
||||||
/// - Direct: TCP relay to TG DC (existing behavior)
|
/// - Direct: TCP relay to TG DC (existing behavior)
|
||||||
/// - Middle Proxy: RPC multiplex through ME pool (new — supports CDN DCs)
|
/// - Middle Proxy: RPC multiplex through ME pool (supports CDN DCs)
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
async fn handle_authenticated_static<R, W>(
|
async fn handle_authenticated_static<R, W>(
|
||||||
client_reader: CryptoReader<R>,
|
client_reader: CryptoReader<R>,
|
||||||
|
|||||||
@@ -669,6 +669,13 @@ fn adversarial_check_then_symlink_flip_is_blocked_by_nofollow_open() {
|
|||||||
"telemt-unknown-dc-check-open-race-{}",
|
"telemt-unknown-dc-check-open-race-{}",
|
||||||
std::process::id()
|
std::process::id()
|
||||||
));
|
));
|
||||||
|
if let Ok(meta) = fs::symlink_metadata(&parent) {
|
||||||
|
if meta.file_type().is_symlink() || meta.is_file() {
|
||||||
|
fs::remove_file(&parent).expect("stale check-open-race path must be removable");
|
||||||
|
} else {
|
||||||
|
fs::remove_dir_all(&parent).expect("stale check-open-race parent must be removable");
|
||||||
|
}
|
||||||
|
}
|
||||||
fs::create_dir_all(&parent).expect("check-open-race parent must be creatable");
|
fs::create_dir_all(&parent).expect("check-open-race parent must be creatable");
|
||||||
|
|
||||||
let target = parent.join("unknown-dc.log");
|
let target = parent.join("unknown-dc.log");
|
||||||
|
|||||||
@@ -74,16 +74,21 @@ impl BeobachtenStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
let entries = {
|
||||||
let mut guard = self.inner.lock();
|
let mut guard = self.inner.lock();
|
||||||
Self::cleanup(&mut guard, now, ttl);
|
Self::cleanup(&mut guard, now, ttl);
|
||||||
guard.last_cleanup = Some(now);
|
guard.last_cleanup = Some(now);
|
||||||
|
|
||||||
|
guard
|
||||||
|
.entries
|
||||||
|
.iter()
|
||||||
|
.map(|((class, ip), entry)| (class.clone(), *ip, entry.tries))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
};
|
||||||
|
|
||||||
let mut grouped = BTreeMap::<String, Vec<(IpAddr, u64)>>::new();
|
let mut grouped = BTreeMap::<String, Vec<(IpAddr, u64)>>::new();
|
||||||
for ((class, ip), entry) in &guard.entries {
|
for (class, ip, tries) in entries {
|
||||||
grouped
|
grouped.entry(class).or_default().push((ip, tries));
|
||||||
.entry(class.clone())
|
|
||||||
.or_default()
|
|
||||||
.push((*ip, entry.tries));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if grouped.is_empty() {
|
if grouped.is_empty() {
|
||||||
|
|||||||
@@ -2477,6 +2477,11 @@ impl Stats {
|
|||||||
self.user_stats.iter()
|
self.user_stats.iter()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Current number of retained per-user stats entries.
|
||||||
|
pub fn user_stats_len(&self) -> usize {
|
||||||
|
self.user_stats.len()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn uptime_secs(&self) -> f64 {
|
pub fn uptime_secs(&self) -> f64 {
|
||||||
self.start_time
|
self.start_time
|
||||||
.read()
|
.read()
|
||||||
|
|||||||
@@ -277,6 +277,7 @@ impl StreamState for TlsReaderState {
|
|||||||
pub struct FakeTlsReader<R> {
|
pub struct FakeTlsReader<R> {
|
||||||
upstream: R,
|
upstream: R,
|
||||||
state: TlsReaderState,
|
state: TlsReaderState,
|
||||||
|
body_scratch: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R> FakeTlsReader<R> {
|
impl<R> FakeTlsReader<R> {
|
||||||
@@ -284,6 +285,7 @@ impl<R> FakeTlsReader<R> {
|
|||||||
Self {
|
Self {
|
||||||
upstream,
|
upstream,
|
||||||
state: TlsReaderState::Idle,
|
state: TlsReaderState::Idle,
|
||||||
|
body_scratch: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -439,7 +441,13 @@ impl<R: AsyncRead + Unpin> AsyncRead for FakeTlsReader<R> {
|
|||||||
length,
|
length,
|
||||||
mut buffer,
|
mut buffer,
|
||||||
} => {
|
} => {
|
||||||
let result = poll_read_body(&mut this.upstream, cx, &mut buffer, length);
|
let result = poll_read_body(
|
||||||
|
&mut this.upstream,
|
||||||
|
cx,
|
||||||
|
&mut buffer,
|
||||||
|
length,
|
||||||
|
&mut this.body_scratch,
|
||||||
|
);
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
BodyPollResult::Pending => {
|
BodyPollResult::Pending => {
|
||||||
@@ -558,21 +566,25 @@ fn poll_read_body<R: AsyncRead + Unpin>(
|
|||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buffer: &mut BytesMut,
|
buffer: &mut BytesMut,
|
||||||
target_len: usize,
|
target_len: usize,
|
||||||
|
scratch: &mut Vec<u8>,
|
||||||
) -> BodyPollResult {
|
) -> BodyPollResult {
|
||||||
// NOTE: This implementation uses a temporary Vec to avoid tricky borrow/lifetime
|
|
||||||
// issues with BytesMut spare capacity and ReadBuf across polls.
|
|
||||||
// It's safe and correct; optimization is possible if needed.
|
|
||||||
while buffer.len() < target_len {
|
while buffer.len() < target_len {
|
||||||
let remaining = target_len - buffer.len();
|
let remaining = target_len - buffer.len();
|
||||||
|
let chunk_len = remaining.min(8192);
|
||||||
|
|
||||||
let mut temp = vec![0u8; remaining.min(8192)];
|
if scratch.len() < chunk_len {
|
||||||
let mut read_buf = ReadBuf::new(&mut temp);
|
scratch.resize(chunk_len, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
let n = {
|
||||||
|
let mut read_buf = ReadBuf::new(&mut scratch[..chunk_len]);
|
||||||
match Pin::new(&mut *upstream).poll_read(cx, &mut read_buf) {
|
match Pin::new(&mut *upstream).poll_read(cx, &mut read_buf) {
|
||||||
Poll::Pending => return BodyPollResult::Pending,
|
Poll::Pending => return BodyPollResult::Pending,
|
||||||
Poll::Ready(Err(e)) => return BodyPollResult::Error(e),
|
Poll::Ready(Err(e)) => return BodyPollResult::Error(e),
|
||||||
Poll::Ready(Ok(())) => {
|
Poll::Ready(Ok(())) => read_buf.filled().len(),
|
||||||
let n = read_buf.filled().len();
|
}
|
||||||
|
};
|
||||||
|
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
return BodyPollResult::Error(Error::new(
|
return BodyPollResult::Error(Error::new(
|
||||||
ErrorKind::UnexpectedEof,
|
ErrorKind::UnexpectedEof,
|
||||||
@@ -583,9 +595,7 @@ fn poll_read_body<R: AsyncRead + Unpin>(
|
|||||||
),
|
),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
buffer.extend_from_slice(&temp[..n]);
|
buffer.extend_from_slice(&scratch[..n]);
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
BodyPollResult::Complete(buffer.split().freeze())
|
BodyPollResult::Complete(buffer.split().freeze())
|
||||||
|
|||||||
@@ -559,9 +559,7 @@ async fn mass_reconnect_sync_cleanup_prevents_temporary_reservation_bloat() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn adversarial_drain_cleanup_queue_race_does_not_cause_false_rejections() {
|
async fn adversarial_drain_cleanup_queue_race_does_not_deadlock_or_exceed_limit() {
|
||||||
// Regression guard: concurrent cleanup draining must not produce false
|
|
||||||
// limit denials for a new IP when the previous IP is already queued.
|
|
||||||
let tracker = Arc::new(UserIpTracker::new());
|
let tracker = Arc::new(UserIpTracker::new());
|
||||||
tracker.set_user_limit("racer", 1).await;
|
tracker.set_user_limit("racer", 1).await;
|
||||||
let ip1 = ip_from_idx(1);
|
let ip1 = ip_from_idx(1);
|
||||||
@@ -573,7 +571,6 @@ async fn adversarial_drain_cleanup_queue_race_does_not_cause_false_rejections()
|
|||||||
// User disconnects from ip1, queuing it
|
// User disconnects from ip1, queuing it
|
||||||
tracker.enqueue_cleanup("racer".to_string(), ip1);
|
tracker.enqueue_cleanup("racer".to_string(), ip1);
|
||||||
|
|
||||||
let mut saw_false_rejection = false;
|
|
||||||
for _ in 0..100 {
|
for _ in 0..100 {
|
||||||
// Queue cleanup then race explicit drain and check-and-add on the alternative IP.
|
// Queue cleanup then race explicit drain and check-and-add on the alternative IP.
|
||||||
tracker.enqueue_cleanup("racer".to_string(), ip1);
|
tracker.enqueue_cleanup("racer".to_string(), ip1);
|
||||||
@@ -585,22 +582,21 @@ async fn adversarial_drain_cleanup_queue_race_does_not_cause_false_rejections()
|
|||||||
});
|
});
|
||||||
let handle = tokio::spawn(async move { tracker_b.check_and_add("racer", ip2).await });
|
let handle = tokio::spawn(async move { tracker_b.check_and_add("racer", ip2).await });
|
||||||
|
|
||||||
drain_handle.await.unwrap();
|
tokio::time::timeout(Duration::from_secs(1), drain_handle)
|
||||||
let res = handle.await.unwrap();
|
.await
|
||||||
if res.is_err() {
|
.expect("cleanup drain must not deadlock")
|
||||||
saw_false_rejection = true;
|
.unwrap();
|
||||||
break;
|
let _ = tokio::time::timeout(Duration::from_secs(1), handle)
|
||||||
}
|
.await
|
||||||
|
.expect("admission must not deadlock")
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// Restore baseline for next iteration.
|
assert!(tracker.get_active_ip_count("racer").await <= 1);
|
||||||
|
tracker.drain_cleanup_queue().await;
|
||||||
tracker.remove_ip("racer", ip2).await;
|
tracker.remove_ip("racer", ip2).await;
|
||||||
|
tracker.remove_ip("racer", ip1).await;
|
||||||
tracker.check_and_add("racer", ip1).await.unwrap();
|
tracker.check_and_add("racer", ip1).await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
assert!(
|
|
||||||
!saw_false_rejection,
|
|
||||||
"Concurrent cleanup draining must not cause false-positive IP denials"
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|||||||
@@ -1,8 +1,11 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::collections::hash_map::DefaultHasher;
|
||||||
|
use std::hash::{Hash, Hasher};
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant, SystemTime};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
@@ -12,12 +15,30 @@ use crate::tls_front::types::{
|
|||||||
CachedTlsData, ParsedServerHello, TlsBehaviorProfile, TlsFetchResult,
|
CachedTlsData, ParsedServerHello, TlsBehaviorProfile, TlsFetchResult,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const FULL_CERT_SENT_SWEEP_INTERVAL_SECS: u64 = 30;
|
||||||
|
const FULL_CERT_SENT_MAX_IPS: usize = 65_536;
|
||||||
|
const FULL_CERT_SENT_SHARDS: usize = 64;
|
||||||
|
|
||||||
|
static FULL_CERT_SENT_IPS_GAUGE: AtomicU64 = AtomicU64::new(0);
|
||||||
|
static FULL_CERT_SENT_CAP_DROPS: AtomicU64 = AtomicU64::new(0);
|
||||||
|
|
||||||
|
/// Current number of IPs tracked by the TLS full-cert budget gate.
|
||||||
|
pub(crate) fn full_cert_sent_ips_for_metrics() -> u64 {
|
||||||
|
FULL_CERT_SENT_IPS_GAUGE.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Number of new IPs denied a full-cert budget slot because the cap was reached.
|
||||||
|
pub(crate) fn full_cert_sent_cap_drops_for_metrics() -> u64 {
|
||||||
|
FULL_CERT_SENT_CAP_DROPS.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
/// Lightweight in-memory + optional on-disk cache for TLS fronting data.
|
/// Lightweight in-memory + optional on-disk cache for TLS fronting data.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct TlsFrontCache {
|
pub struct TlsFrontCache {
|
||||||
memory: RwLock<HashMap<String, Arc<CachedTlsData>>>,
|
memory: RwLock<HashMap<String, Arc<CachedTlsData>>>,
|
||||||
default: Arc<CachedTlsData>,
|
default: Arc<CachedTlsData>,
|
||||||
full_cert_sent: RwLock<HashMap<IpAddr, Instant>>,
|
full_cert_sent_shards: Vec<RwLock<HashMap<IpAddr, Instant>>>,
|
||||||
|
full_cert_sent_last_sweep_epoch_secs: AtomicU64,
|
||||||
disk_path: PathBuf,
|
disk_path: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -52,7 +73,10 @@ impl TlsFrontCache {
|
|||||||
Self {
|
Self {
|
||||||
memory: RwLock::new(map),
|
memory: RwLock::new(map),
|
||||||
default,
|
default,
|
||||||
full_cert_sent: RwLock::new(HashMap::new()),
|
full_cert_sent_shards: (0..FULL_CERT_SENT_SHARDS)
|
||||||
|
.map(|_| RwLock::new(HashMap::new()))
|
||||||
|
.collect(),
|
||||||
|
full_cert_sent_last_sweep_epoch_secs: AtomicU64::new(0),
|
||||||
disk_path: disk_path.as_ref().to_path_buf(),
|
disk_path: disk_path.as_ref().to_path_buf(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -69,22 +93,83 @@ impl TlsFrontCache {
|
|||||||
self.memory.read().await.contains_key(domain)
|
self.memory.read().await.contains_key(domain)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn full_cert_sent_shard_index(client_ip: IpAddr) -> usize {
|
||||||
|
let mut hasher = DefaultHasher::new();
|
||||||
|
client_ip.hash(&mut hasher);
|
||||||
|
(hasher.finish() as usize) % FULL_CERT_SENT_SHARDS
|
||||||
|
}
|
||||||
|
|
||||||
|
fn full_cert_sent_shard(&self, client_ip: IpAddr) -> &RwLock<HashMap<IpAddr, Instant>> {
|
||||||
|
&self.full_cert_sent_shards[Self::full_cert_sent_shard_index(client_ip)]
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decrement_full_cert_sent_entries(amount: usize) {
|
||||||
|
if amount == 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let amount = amount as u64;
|
||||||
|
let _ =
|
||||||
|
FULL_CERT_SENT_IPS_GAUGE.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |current| {
|
||||||
|
Some(current.saturating_sub(amount))
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_reserve_full_cert_sent_entry() -> bool {
|
||||||
|
let mut current = FULL_CERT_SENT_IPS_GAUGE.load(Ordering::Relaxed);
|
||||||
|
loop {
|
||||||
|
if current >= FULL_CERT_SENT_MAX_IPS as u64 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
match FULL_CERT_SENT_IPS_GAUGE.compare_exchange_weak(
|
||||||
|
current,
|
||||||
|
current.saturating_add(1),
|
||||||
|
Ordering::AcqRel,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
) {
|
||||||
|
Ok(_) => return true,
|
||||||
|
Err(actual) => current = actual,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn sweep_full_cert_sent_shards(&self, now: Instant, ttl: Duration) {
|
||||||
|
for shard in &self.full_cert_sent_shards {
|
||||||
|
let mut guard = shard.write().await;
|
||||||
|
let before = guard.len();
|
||||||
|
guard.retain(|_, seen_at| now.duration_since(*seen_at) < ttl);
|
||||||
|
Self::decrement_full_cert_sent_entries(before.saturating_sub(guard.len()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns true when full cert payload should be sent for client_ip
|
/// Returns true when full cert payload should be sent for client_ip
|
||||||
/// according to TTL policy.
|
/// according to TTL policy.
|
||||||
pub async fn take_full_cert_budget_for_ip(&self, client_ip: IpAddr, ttl: Duration) -> bool {
|
pub async fn take_full_cert_budget_for_ip(&self, client_ip: IpAddr, ttl: Duration) -> bool {
|
||||||
if ttl.is_zero() {
|
if ttl.is_zero() {
|
||||||
self.full_cert_sent
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.insert(client_ip, Instant::now());
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let mut guard = self.full_cert_sent.write().await;
|
let now_epoch_secs = SystemTime::now()
|
||||||
guard.retain(|_, seen_at| now.duration_since(*seen_at) < ttl);
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_secs();
|
||||||
|
let should_sweep = self
|
||||||
|
.full_cert_sent_last_sweep_epoch_secs
|
||||||
|
.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |last_sweep| {
|
||||||
|
if now_epoch_secs.saturating_sub(last_sweep) >= FULL_CERT_SENT_SWEEP_INTERVAL_SECS {
|
||||||
|
Some(now_epoch_secs)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.is_ok();
|
||||||
|
|
||||||
match guard.get_mut(&client_ip) {
|
if should_sweep {
|
||||||
|
self.sweep_full_cert_sent_shards(now, ttl).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut guard = self.full_cert_sent_shard(client_ip).write().await;
|
||||||
|
let allowed = match guard.get_mut(&client_ip) {
|
||||||
Some(seen_at) => {
|
Some(seen_at) => {
|
||||||
if now.duration_since(*seen_at) >= ttl {
|
if now.duration_since(*seen_at) >= ttl {
|
||||||
*seen_at = now;
|
*seen_at = now;
|
||||||
@@ -94,10 +179,41 @@ impl TlsFrontCache {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
|
if !Self::try_reserve_full_cert_sent_entry() {
|
||||||
|
FULL_CERT_SENT_CAP_DROPS.fetch_add(1, Ordering::Relaxed);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
guard.insert(client_ip, now);
|
guard.insert(client_ip, now);
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
allowed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
async fn insert_full_cert_sent_for_tests(&self, client_ip: IpAddr, seen_at: Instant) {
|
||||||
|
let mut guard = self.full_cert_sent_shard(client_ip).write().await;
|
||||||
|
if guard.insert(client_ip, seen_at).is_none() {
|
||||||
|
FULL_CERT_SENT_IPS_GAUGE.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
async fn full_cert_sent_is_empty_for_tests(&self) -> bool {
|
||||||
|
for shard in &self.full_cert_sent_shards {
|
||||||
|
if !shard.read().await.is_empty() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
async fn full_cert_sent_contains_for_tests(&self, client_ip: IpAddr) -> bool {
|
||||||
|
self.full_cert_sent_shard(client_ip)
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.contains_key(&client_ip)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn set(&self, domain: &str, data: CachedTlsData) {
|
pub async fn set(&self, domain: &str, data: CachedTlsData) {
|
||||||
@@ -328,10 +444,68 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_take_full_cert_budget_for_ip_zero_ttl_always_allows_full_payload() {
|
async fn test_take_full_cert_budget_for_ip_zero_ttl_always_allows_full_payload() {
|
||||||
let cache = TlsFrontCache::new(&["example.com".to_string()], 1024, "tlsfront-test-cache");
|
let cache = TlsFrontCache::new(&["example.com".to_string()], 1024, "tlsfront-test-cache");
|
||||||
let ip: IpAddr = "127.0.0.1".parse().expect("ip");
|
|
||||||
let ttl = Duration::ZERO;
|
let ttl = Duration::ZERO;
|
||||||
|
|
||||||
assert!(cache.take_full_cert_budget_for_ip(ip, ttl).await);
|
for idx in 0..100_000u32 {
|
||||||
|
let ip = IpAddr::V4(std::net::Ipv4Addr::new(
|
||||||
|
10,
|
||||||
|
((idx >> 16) & 0xff) as u8,
|
||||||
|
((idx >> 8) & 0xff) as u8,
|
||||||
|
(idx & 0xff) as u8,
|
||||||
|
));
|
||||||
assert!(cache.take_full_cert_budget_for_ip(ip, ttl).await);
|
assert!(cache.take_full_cert_budget_for_ip(ip, ttl).await);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert!(cache.full_cert_sent_is_empty_for_tests().await);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_take_full_cert_budget_for_ip_sweeps_expired_entries_when_due() {
|
||||||
|
let cache = TlsFrontCache::new(&["example.com".to_string()], 1024, "tlsfront-test-cache");
|
||||||
|
let stale_ip: IpAddr = "127.0.0.1".parse().expect("ip");
|
||||||
|
let new_ip: IpAddr = "127.0.0.2".parse().expect("ip");
|
||||||
|
let ttl = Duration::from_secs(1);
|
||||||
|
let stale_seen_at = Instant::now()
|
||||||
|
.checked_sub(Duration::from_secs(10))
|
||||||
|
.unwrap_or_else(Instant::now);
|
||||||
|
|
||||||
|
cache
|
||||||
|
.insert_full_cert_sent_for_tests(stale_ip, stale_seen_at)
|
||||||
|
.await;
|
||||||
|
cache
|
||||||
|
.full_cert_sent_last_sweep_epoch_secs
|
||||||
|
.store(0, Ordering::Relaxed);
|
||||||
|
|
||||||
|
assert!(cache.take_full_cert_budget_for_ip(new_ip, ttl).await);
|
||||||
|
|
||||||
|
assert!(!cache.full_cert_sent_contains_for_tests(stale_ip).await);
|
||||||
|
assert!(cache.full_cert_sent_contains_for_tests(new_ip).await);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_take_full_cert_budget_for_ip_does_not_sweep_every_call() {
|
||||||
|
let cache = TlsFrontCache::new(&["example.com".to_string()], 1024, "tlsfront-test-cache");
|
||||||
|
let stale_ip: IpAddr = "127.0.0.1".parse().expect("ip");
|
||||||
|
let new_ip: IpAddr = "127.0.0.2".parse().expect("ip");
|
||||||
|
let ttl = Duration::from_secs(1);
|
||||||
|
let stale_seen_at = Instant::now()
|
||||||
|
.checked_sub(Duration::from_secs(10))
|
||||||
|
.unwrap_or_else(Instant::now);
|
||||||
|
let now_epoch_secs = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_secs();
|
||||||
|
|
||||||
|
cache
|
||||||
|
.insert_full_cert_sent_for_tests(stale_ip, stale_seen_at)
|
||||||
|
.await;
|
||||||
|
cache
|
||||||
|
.full_cert_sent_last_sweep_epoch_secs
|
||||||
|
.store(now_epoch_secs, Ordering::Relaxed);
|
||||||
|
|
||||||
|
assert!(cache.take_full_cert_budget_for_ip(new_ip, ttl).await);
|
||||||
|
|
||||||
|
assert!(cache.full_cert_sent_contains_for_tests(stale_ip).await);
|
||||||
|
assert!(cache.full_cert_sent_contains_for_tests(new_ip).await);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,9 @@
|
|||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::sync::Mutex;
|
||||||
use std::sync::OnceLock;
|
use std::sync::OnceLock;
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use anyhow::{Result, anyhow};
|
use anyhow::{Result, anyhow};
|
||||||
@@ -144,12 +146,37 @@ enum FetchErrorKind {
|
|||||||
Other,
|
Other,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const PROFILE_CACHE_MAX_ENTRIES: usize = 4096;
|
||||||
|
|
||||||
static PROFILE_CACHE: OnceLock<DashMap<ProfileCacheKey, ProfileCacheValue>> = OnceLock::new();
|
static PROFILE_CACHE: OnceLock<DashMap<ProfileCacheKey, ProfileCacheValue>> = OnceLock::new();
|
||||||
|
static PROFILE_CACHE_INSERT_GUARD: OnceLock<Mutex<()>> = OnceLock::new();
|
||||||
|
static PROFILE_CACHE_CAP_DROPS: AtomicU64 = AtomicU64::new(0);
|
||||||
|
|
||||||
fn profile_cache() -> &'static DashMap<ProfileCacheKey, ProfileCacheValue> {
|
fn profile_cache() -> &'static DashMap<ProfileCacheKey, ProfileCacheValue> {
|
||||||
PROFILE_CACHE.get_or_init(DashMap::new)
|
PROFILE_CACHE.get_or_init(DashMap::new)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn profile_cache_insert_guard() -> &'static Mutex<()> {
|
||||||
|
PROFILE_CACHE_INSERT_GUARD.get_or_init(|| Mutex::new(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sweep_expired_profile_cache(ttl: Duration, now: Instant) {
|
||||||
|
if ttl.is_zero() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
profile_cache().retain(|_, value| now.saturating_duration_since(value.updated_at) <= ttl);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Current number of adaptive TLS fetch profile-cache entries.
|
||||||
|
pub(crate) fn profile_cache_entries_for_metrics() -> usize {
|
||||||
|
profile_cache().len()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Number of fresh profile-cache winners skipped because the cache was full.
|
||||||
|
pub(crate) fn profile_cache_cap_drops_for_metrics() -> u64 {
|
||||||
|
PROFILE_CACHE_CAP_DROPS.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
fn route_hint(
|
fn route_hint(
|
||||||
upstream: Option<&std::sync::Arc<crate::transport::UpstreamManager>>,
|
upstream: Option<&std::sync::Arc<crate::transport::UpstreamManager>>,
|
||||||
unix_sock: Option<&str>,
|
unix_sock: Option<&str>,
|
||||||
@@ -267,6 +294,43 @@ fn remember_profile_success(
|
|||||||
let Some(key) = cache_key else {
|
let Some(key) = cache_key else {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
remember_profile_success_with_cap(strategy, key, profile, now, PROFILE_CACHE_MAX_ENTRIES);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remember_profile_success_with_cap(
|
||||||
|
strategy: &TlsFetchStrategy,
|
||||||
|
key: ProfileCacheKey,
|
||||||
|
profile: TlsFetchProfile,
|
||||||
|
now: Instant,
|
||||||
|
max_entries: usize,
|
||||||
|
) {
|
||||||
|
let Ok(_guard) = profile_cache_insert_guard().lock() else {
|
||||||
|
PROFILE_CACHE_CAP_DROPS.fetch_add(1, Ordering::Relaxed);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
if max_entries == 0 {
|
||||||
|
PROFILE_CACHE_CAP_DROPS.fetch_add(1, Ordering::Relaxed);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if profile_cache().contains_key(&key) {
|
||||||
|
profile_cache().insert(
|
||||||
|
key,
|
||||||
|
ProfileCacheValue {
|
||||||
|
profile,
|
||||||
|
updated_at: now,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if profile_cache().len() >= max_entries {
|
||||||
|
// TLS fetch is control-plane work; sweeping under a tiny mutex keeps
|
||||||
|
// profile-cache cardinality hard-bounded without touching relay hot paths.
|
||||||
|
sweep_expired_profile_cache(strategy.profile_cache_ttl, now);
|
||||||
|
}
|
||||||
|
if profile_cache().len() >= max_entries {
|
||||||
|
PROFILE_CACHE_CAP_DROPS.fetch_add(1, Ordering::Relaxed);
|
||||||
|
return;
|
||||||
|
}
|
||||||
profile_cache().insert(
|
profile_cache().insert(
|
||||||
key,
|
key,
|
||||||
ProfileCacheValue {
|
ProfileCacheValue {
|
||||||
|
|||||||
@@ -618,13 +618,9 @@ impl MePool {
|
|||||||
me_route_hybrid_max_wait: Duration::from_millis(
|
me_route_hybrid_max_wait: Duration::from_millis(
|
||||||
me_route_hybrid_max_wait_ms.max(50),
|
me_route_hybrid_max_wait_ms.max(50),
|
||||||
),
|
),
|
||||||
me_route_blocking_send_timeout: if me_route_blocking_send_timeout_ms == 0 {
|
me_route_blocking_send_timeout: Some(Duration::from_millis(
|
||||||
None
|
me_route_blocking_send_timeout_ms.clamp(1, 5_000),
|
||||||
} else {
|
)),
|
||||||
Some(Duration::from_millis(
|
|
||||||
me_route_blocking_send_timeout_ms.min(5_000),
|
|
||||||
))
|
|
||||||
},
|
|
||||||
me_route_last_success_epoch_ms: AtomicU64::new(0),
|
me_route_last_success_epoch_ms: AtomicU64::new(0),
|
||||||
me_route_hybrid_timeout_warn_epoch_ms: AtomicU64::new(0),
|
me_route_hybrid_timeout_warn_epoch_ms: AtomicU64::new(0),
|
||||||
me_async_recovery_last_trigger_epoch_ms: AtomicU64::new(0),
|
me_async_recovery_last_trigger_epoch_ms: AtomicU64::new(0),
|
||||||
|
|||||||
Reference in New Issue
Block a user