Compare commits

..

16 Commits

Author SHA1 Message Date
Alexey
d67c37afd7 Merge pull request #762 from astronaut808/feature/tls-front-profile-health
Add TLS Front Profile Health metrics
2026-05-05 15:23:01 +03:00
astronaut808
9f9ca9f270 Add TLS front profile health metrics 2026-05-03 18:07:24 +05:00
Alexey
cdd2239047 Merge pull request #758 from mammuthus/feature/metrics-bad-class-export-dashboard
Add class-based error metrics and dashboard panels
2026-05-02 00:46:53 +03:00
mamuthus
a7a2f4ab27 Adjust General metrics dashboard layout 2026-05-01 19:19:00 +00:00
mamuthus
9dae14aa66 Add class-based error metrics and dashboard panels 2026-05-01 18:26:32 +00:00
Alexey
e50026e776 Update README.md 2026-04-30 19:41:40 +03:00
Alexey
7106f38fae Update Cargo.lock 2026-04-30 11:38:33 +03:00
Alexey
2a694470d5 Update Cargo.toml 2026-04-30 11:37:18 +03:00
Alexey
b98cd37211 TLS Full Certificate Budget Bookkeeping + Hot-path Cleanup and Timeout Invariants + IP-Tracker refactoring + Shard TLS Full-Cert Budget: merge pull request #753 from telemt/flow
TLS Full Certificate Budget Bookkeeping + Hot-path Cleanup and Timeout Invariants + IP-Tracker refactoring + Shard TLS Full-Cert Budget
2026-04-30 11:36:30 +03:00
Alexey
8b62965978 Stabilize unknown-DC symlink race test setup 2026-04-30 11:11:04 +03:00
Alexey
d46bda9880 Preserve synchronous IP cleanup queue contract + Rustfmt 2026-04-30 11:05:18 +03:00
Alexey
c3de07db6a Shard TLS full-cert budget tracking + Bound user-labeled metrics export cardinality 2026-04-30 11:01:10 +03:00
Alexey
61f9af7ffc Reduce Lock-free IP-Tracker Cleanup backlog 2026-04-30 10:51:04 +03:00
Alexey
1f90e28871 Cap scanner-sensitive Caches and IP-Tracker Cardinality 2026-04-30 10:43:27 +03:00
Alexey
876b74ebf7 Hot-path Cleanup and Timeout Invariants 2026-04-29 23:16:11 +03:00
Alexey
b34e1d71ae TLS Full Certificate Budget Bookkeeping 2026-04-29 23:00:25 +03:00
21 changed files with 7789 additions and 5073 deletions

2
Cargo.lock generated
View File

@@ -2791,7 +2791,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "telemt"
version = "3.4.9"
version = "3.4.10"
dependencies = [
"aes",
"anyhow",

View File

@@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.4.9"
version = "3.4.10"
edition = "2024"
[features]

View File

@@ -1,6 +1,6 @@
# Telemt - MTProxy on Rust + Tokio
[![Latest Release](https://img.shields.io/github/v/release/telemt/telemt?color=neon)](https://github.com/telemt/telemt/releases/latest) [![Stars](https://img.shields.io/github/stars/telemt/telemt?style=social)](https://github.com/telemt/telemt/stargazers) [![Forks](https://img.shields.io/github/forks/telemt/telemt?style=social)](https://github.com/telemt/telemt/network/members) [![Telegram](https://img.shields.io/badge/Telegram-Chat-24a1de?logo=telegram&logoColor=24a1de)](https://t.me/telemtrs)
[![Latest Release](https://img.shields.io/github/v/release/telemt/telemt?color=neon)](https://github.com/telemt/telemt/releases/latest) [![Stars](https://img.shields.io/github/stars/telemt/telemt?style=social)](https://github.com/telemt/telemt/stargazers) [![Forks](https://img.shields.io/github/forks/telemt/telemt?style=social)](https://github.com/telemt/telemt/network/members)
[🇷🇺 README на русском](https://github.com/telemt/telemt/blob/main/README.ru.md)
@@ -14,7 +14,7 @@
<p align="center">
<a href="https://t.me/telemtrs">
<img src="/docs/assets/telegram_button.svg" width="150"/>
<img src="https://github.com/user-attachments/assets/30b7e7b9-974a-4e3d-aab6-b58a85de4507" width="240"/>
</a>
</p>

View File

@@ -128,7 +128,48 @@ Recommended for cleaner testing:
Persisted cache artifacts are useful, but they are not required if packet captures already demonstrate the runtime result.
### 4. Capture a direct-origin trace
### 4. Check TLS-front profile health metrics
If the metrics endpoint is enabled, check the TLS-front profile health before packet-capture validation:
```bash
curl -s http://127.0.0.1:9999/metrics | grep -E 'telemt_tls_front_profile|telemt_tls_fetch_profile_cache|telemt_tls_front_full_cert'
```
The profile-health metrics expose the runtime state of configured TLS front domains:
- `telemt_tls_front_profile_domains` shows configured, emitted, and suppressed domain series.
- `telemt_tls_front_profile_info` shows profile source and feature flags per domain.
- `telemt_tls_front_profile_age_seconds` shows cached profile age.
- `telemt_tls_front_profile_app_data_records` shows cached AppData record count.
- `telemt_tls_front_profile_ticket_records` shows cached ticket-like tail record count.
- `telemt_tls_front_profile_change_cipher_spec_records` shows cached ChangeCipherSpec count.
- `telemt_tls_front_profile_app_data_bytes` shows total cached AppData bytes.
Interpretation:
- `source="merged"` or `source="raw"` means real TLS profile data is being used.
- `source="default"` or `is_default="true"` means the domain currently uses the synthetic default fallback.
- `has_cert_payload="true"` means certificate payload data is available for TLS emulation.
- Non-zero AppData/ticket/CCS counters show captured server-flight shape.
Example healthy output:
```text
telemt_tls_front_profile_domains{status="configured"} 1
telemt_tls_front_profile_domains{status="emitted"} 1
telemt_tls_front_profile_domains{status="suppressed"} 0
telemt_tls_front_profile_info{domain="itunes.apple.com",source="merged",is_default="false",has_cert_info="true",has_cert_payload="true"} 1
telemt_tls_front_profile_age_seconds{domain="itunes.apple.com"} 20
telemt_tls_front_profile_app_data_records{domain="itunes.apple.com"} 3
telemt_tls_front_profile_ticket_records{domain="itunes.apple.com"} 1
telemt_tls_front_profile_change_cipher_spec_records{domain="itunes.apple.com"} 1
telemt_tls_front_profile_app_data_bytes{domain="itunes.apple.com"} 5240
```
These metrics do not prove byte-level origin equivalence. They are an operational health signal that the configured domain is backed by real cached profile data instead of default fallback data.
### 5. Capture a direct-origin trace
From a separate client host, connect directly to the origin:
@@ -142,7 +183,7 @@ Capture with:
sudo tcpdump -i any -w origin-direct.pcap host ORIGIN_IP and port 443
```
### 5. Capture a Telemt FakeTLS success-path trace
### 6. Capture a Telemt FakeTLS success-path trace
Now connect to Telemt with a real Telegram client through an `ee` proxy link that targets the Telemt instance.
@@ -154,7 +195,7 @@ Capture with:
sudo tcpdump -i any -w telemt-emulated.pcap host TELEMT_IP and port 443
```
### 6. Decode TLS record structure
### 7. Decode TLS record structure
Use `tshark` to print record-level structure:
@@ -182,7 +223,7 @@ Focus on the server flight after ClientHello:
- `20` = ChangeCipherSpec
- `23` = ApplicationData
### 7. Build a comparison table
### 8. Build a comparison table
A compact table like the following is usually enough:

View File

@@ -126,9 +126,50 @@ openssl s_client -connect ORIGIN_IP:443 -servername YOUR_DOMAIN </dev/null
2. Дайте ему получить TLS front profile data для выбранного домена.
3. Если `tls_front_dir` хранится persistently, убедитесь, что TLS front cache заполнен.
Persisted cache artifacts полезны, но не обязательны, если packet capture уже показывают runtime result.
Сохранённые артефакты кэша полезны, но не обязательны, если packet capture уже показывает результат в runtime.
### 4. Снять direct-origin trace
### 4. Проверить метрики состояния TLS-front profile
Если endpoint метрик включён, перед проверкой через packet capture можно быстро проверить состояние TLS-front profile:
```bash
curl -s http://127.0.0.1:9999/metrics | grep -E 'telemt_tls_front_profile|telemt_tls_fetch_profile_cache|telemt_tls_front_full_cert'
```
Метрики состояния профиля показывают runtime-состояние настроенных TLS-front доменов:
- `telemt_tls_front_profile_domains` показывает количество настроенных, экспортируемых и скрытых из-за лимита доменов.
- `telemt_tls_front_profile_info` показывает источник профиля и флаги доступных данных по каждому домену.
- `telemt_tls_front_profile_age_seconds` показывает возраст закешированного профиля.
- `telemt_tls_front_profile_app_data_records` показывает количество закешированных AppData records.
- `telemt_tls_front_profile_ticket_records` показывает количество закешированных ticket-like tail records.
- `telemt_tls_front_profile_change_cipher_spec_records` показывает закешированное количество ChangeCipherSpec records.
- `telemt_tls_front_profile_app_data_bytes` показывает общий размер закешированных AppData bytes.
Интерпретация:
- `source="merged"` или `source="raw"` означает, что используются реальные данные TLS-профиля.
- `source="default"` или `is_default="true"` означает, что домен сейчас работает на synthetic default fallback.
- `has_cert_payload="true"` означает, что certificate payload доступен для TLS emulation.
- Ненулевые AppData/ticket/CCS counters показывают захваченную форму server flight.
Пример здорового состояния:
```text
telemt_tls_front_profile_domains{status="configured"} 1
telemt_tls_front_profile_domains{status="emitted"} 1
telemt_tls_front_profile_domains{status="suppressed"} 0
telemt_tls_front_profile_info{domain="itunes.apple.com",source="merged",is_default="false",has_cert_info="true",has_cert_payload="true"} 1
telemt_tls_front_profile_age_seconds{domain="itunes.apple.com"} 20
telemt_tls_front_profile_app_data_records{domain="itunes.apple.com"} 3
telemt_tls_front_profile_ticket_records{domain="itunes.apple.com"} 1
telemt_tls_front_profile_change_cipher_spec_records{domain="itunes.apple.com"} 1
telemt_tls_front_profile_app_data_bytes{domain="itunes.apple.com"} 5240
```
Эти метрики не доказывают побайтную эквивалентность с origin. Это эксплуатационный сигнал состояния: настроенный домен действительно основан на реальных закешированных данных профиля, а не на default fallback.
### 5. Снять direct-origin trace
С отдельной клиентской машины подключитесь напрямую к origin:
@@ -142,7 +183,7 @@ Capture:
sudo tcpdump -i any -w origin-direct.pcap host ORIGIN_IP and port 443
```
### 5. Снять Telemt FakeTLS success-path trace
### 6. Снять Telemt FakeTLS success-path trace
Теперь подключитесь к Telemt через реальный Telegram client с `ee` proxy link, который указывает на Telemt instance.
@@ -154,7 +195,7 @@ Capture:
sudo tcpdump -i any -w telemt-emulated.pcap host TELEMT_IP and port 443
```
### 6. Декодировать структуру TLS records
### 7. Декодировать структуру TLS records
Используйте `tshark`, чтобы вывести record-level structure:
@@ -182,7 +223,7 @@ tshark -r telemt-emulated.pcap -Y "tls.record" -T fields \
- `20` = ChangeCipherSpec
- `23` = ApplicationData
### 7. Собрать сравнительную таблицу
### 8. Собрать сравнительную таблицу
Обычно достаточно короткой таблицы такого вида:

View File

@@ -1087,9 +1087,9 @@ impl ProxyConfig {
));
}
if config.general.me_route_blocking_send_timeout_ms > 5000 {
if !(1..=5000).contains(&config.general.me_route_blocking_send_timeout_ms) {
return Err(ProxyError::Config(
"general.me_route_blocking_send_timeout_ms must be within [0, 5000]".to_string(),
"general.me_route_blocking_send_timeout_ms must be within [1, 5000]".to_string(),
));
}
@@ -2602,6 +2602,26 @@ mod tests {
let _ = std::fs::remove_file(path);
}
#[test]
fn me_route_blocking_send_timeout_ms_zero_is_rejected() {
let toml = r#"
[general]
me_route_blocking_send_timeout_ms = 0
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_route_blocking_send_timeout_zero_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_route_blocking_send_timeout_ms must be within [1, 5000]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn me_route_no_writer_mode_is_parsed() {
let toml = r#"

View File

@@ -778,7 +778,7 @@ pub struct GeneralConfig {
pub me_route_hybrid_max_wait_ms: u64,
/// Maximum wait in milliseconds for blocking ME writer channel send fallback.
/// `0` keeps legacy unbounded wait behavior.
/// Must be within [1, 5000].
#[serde(default = "default_me_route_blocking_send_timeout_ms")]
pub me_route_blocking_send_timeout_ms: u64,

View File

@@ -9,14 +9,24 @@ use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::{Mutex as AsyncMutex, RwLock};
use tokio::sync::{Mutex as AsyncMutex, RwLock, RwLockWriteGuard};
use crate::config::UserMaxUniqueIpsMode;
const CLEANUP_DRAIN_BATCH_LIMIT: usize = 1024;
const MAX_ACTIVE_IP_ENTRIES: u64 = 131_072;
const MAX_RECENT_IP_ENTRIES: u64 = 262_144;
/// Tracks active and recent client IPs for per-user admission control.
#[derive(Debug, Clone)]
pub struct UserIpTracker {
active_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, usize>>>>,
recent_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, Instant>>>>,
active_entry_count: Arc<AtomicU64>,
recent_entry_count: Arc<AtomicU64>,
active_cap_rejects: Arc<AtomicU64>,
recent_cap_rejects: Arc<AtomicU64>,
cleanup_deferred_releases: Arc<AtomicU64>,
max_ips: Arc<RwLock<HashMap<String, usize>>>,
default_max_ips: Arc<RwLock<usize>>,
limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>,
@@ -26,13 +36,25 @@ pub struct UserIpTracker {
cleanup_drain_lock: Arc<AsyncMutex<()>>,
}
/// Point-in-time memory counters for user/IP limiter state.
#[derive(Debug, Clone, Copy)]
pub struct UserIpTrackerMemoryStats {
/// Number of users with active IP state.
pub active_users: usize,
/// Number of users with recent IP state.
pub recent_users: usize,
/// Number of active `(user, ip)` entries.
pub active_entries: usize,
/// Number of recent-window `(user, ip)` entries.
pub recent_entries: usize,
/// Number of deferred disconnect cleanups waiting to be drained.
pub cleanup_queue_len: usize,
/// Number of new connections rejected by the global active-entry cap.
pub active_cap_rejects: u64,
/// Number of new connections rejected by the global recent-entry cap.
pub recent_cap_rejects: u64,
/// Number of release cleanups deferred through the cleanup queue.
pub cleanup_deferred_releases: u64,
}
impl UserIpTracker {
@@ -40,6 +62,11 @@ impl UserIpTracker {
Self {
active_ips: Arc::new(RwLock::new(HashMap::new())),
recent_ips: Arc::new(RwLock::new(HashMap::new())),
active_entry_count: Arc::new(AtomicU64::new(0)),
recent_entry_count: Arc::new(AtomicU64::new(0)),
active_cap_rejects: Arc::new(AtomicU64::new(0)),
recent_cap_rejects: Arc::new(AtomicU64::new(0)),
cleanup_deferred_releases: Arc::new(AtomicU64::new(0)),
max_ips: Arc::new(RwLock::new(HashMap::new())),
default_max_ips: Arc::new(RwLock::new(0)),
limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)),
@@ -50,16 +77,59 @@ impl UserIpTracker {
}
}
fn decrement_counter(counter: &AtomicU64, amount: usize) {
if amount == 0 {
return;
}
let amount = amount as u64;
let _ = counter.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |current| {
Some(current.saturating_sub(amount))
});
}
fn apply_active_cleanup(
active_ips: &mut HashMap<String, HashMap<IpAddr, usize>>,
user: &str,
ip: IpAddr,
pending_count: usize,
) -> usize {
if pending_count == 0 {
return 0;
}
let mut remove_user = false;
let mut removed_active_entries = 0usize;
if let Some(user_ips) = active_ips.get_mut(user) {
if let Some(count) = user_ips.get_mut(&ip) {
if *count > pending_count {
*count -= pending_count;
} else if user_ips.remove(&ip).is_some() {
removed_active_entries = 1;
}
}
remove_user = user_ips.is_empty();
}
if remove_user {
active_ips.remove(user);
}
removed_active_entries
}
/// Queues a deferred active IP cleanup for a later async drain.
pub fn enqueue_cleanup(&self, user: String, ip: IpAddr) {
match self.cleanup_queue.lock() {
Ok(mut queue) => {
let count = queue.entry((user, ip)).or_insert(0);
*count = count.saturating_add(1);
self.cleanup_deferred_releases
.fetch_add(1, Ordering::Relaxed);
}
Err(poisoned) => {
let mut queue = poisoned.into_inner();
let count = queue.entry((user.clone(), ip)).or_insert(0);
*count = count.saturating_add(1);
self.cleanup_deferred_releases
.fetch_add(1, Ordering::Relaxed);
self.cleanup_queue.clear_poison();
tracing::warn!(
"UserIpTracker cleanup_queue lock poisoned; recovered and enqueued IP cleanup for {} ({})",
@@ -86,16 +156,27 @@ impl UserIpTracker {
}
pub(crate) async fn drain_cleanup_queue(&self) {
// Serialize queue draining and active-IP mutation so check-and-add cannot
// observe stale active entries that are already queued for removal.
let _drain_guard = self.cleanup_drain_lock.lock().await;
let Ok(_drain_guard) = self.cleanup_drain_lock.try_lock() else {
return;
};
let to_remove = {
match self.cleanup_queue.lock() {
Ok(mut queue) => {
if queue.is_empty() {
return;
}
std::mem::take(&mut *queue)
let mut drained =
HashMap::with_capacity(queue.len().min(CLEANUP_DRAIN_BATCH_LIMIT));
for _ in 0..CLEANUP_DRAIN_BATCH_LIMIT {
let Some(key) = queue.keys().next().cloned() else {
break;
};
if let Some(count) = queue.remove(&key) {
drained.insert(key, count);
}
}
drained
}
Err(poisoned) => {
let mut queue = poisoned.into_inner();
@@ -103,31 +184,33 @@ impl UserIpTracker {
self.cleanup_queue.clear_poison();
return;
}
let drained = std::mem::take(&mut *queue);
let mut drained =
HashMap::with_capacity(queue.len().min(CLEANUP_DRAIN_BATCH_LIMIT));
for _ in 0..CLEANUP_DRAIN_BATCH_LIMIT {
let Some(key) = queue.keys().next().cloned() else {
break;
};
if let Some(count) = queue.remove(&key) {
drained.insert(key, count);
}
}
self.cleanup_queue.clear_poison();
drained
}
}
};
if to_remove.is_empty() {
return;
}
let mut active_ips = self.active_ips.write().await;
let mut removed_active_entries = 0usize;
for ((user, ip), pending_count) in to_remove {
if pending_count == 0 {
continue;
}
if let Some(user_ips) = active_ips.get_mut(&user) {
if let Some(count) = user_ips.get_mut(&ip) {
if *count > pending_count {
*count -= pending_count;
} else {
user_ips.remove(&ip);
}
}
if user_ips.is_empty() {
active_ips.remove(&user);
}
}
removed_active_entries = removed_active_entries.saturating_add(
Self::apply_active_cleanup(&mut active_ips, &user, ip, pending_count),
);
}
Self::decrement_counter(&self.active_entry_count, removed_active_entries);
}
fn now_epoch_secs() -> u64 {
@@ -137,6 +220,24 @@ impl UserIpTracker {
.as_secs()
}
async fn active_and_recent_write(
&self,
) -> (
RwLockWriteGuard<'_, HashMap<String, HashMap<IpAddr, usize>>>,
RwLockWriteGuard<'_, HashMap<String, HashMap<IpAddr, Instant>>>,
) {
loop {
let active_ips = self.active_ips.write().await;
match self.recent_ips.try_write() {
Ok(recent_ips) => return (active_ips, recent_ips),
Err(_) => {
drop(active_ips);
tokio::task::yield_now().await;
}
}
}
}
async fn maybe_compact_empty_users(&self) {
const COMPACT_INTERVAL_SECS: u64 = 60;
let now_epoch_secs = Self::now_epoch_secs();
@@ -157,14 +258,16 @@ impl UserIpTracker {
return;
}
let mut active_ips = self.active_ips.write().await;
let mut recent_ips = self.recent_ips.write().await;
let window = *self.limit_window.read().await;
let now = Instant::now();
let (mut active_ips, mut recent_ips) = self.active_and_recent_write().await;
let mut pruned_recent_entries = 0usize;
for user_recent in recent_ips.values_mut() {
Self::prune_recent(user_recent, now, window);
pruned_recent_entries =
pruned_recent_entries.saturating_add(Self::prune_recent(user_recent, now, window));
}
Self::decrement_counter(&self.recent_entry_count, pruned_recent_entries);
let mut users =
Vec::<String>::with_capacity(active_ips.len().saturating_add(recent_ips.len()));
@@ -208,6 +311,9 @@ impl UserIpTracker {
active_entries,
recent_entries,
cleanup_queue_len,
active_cap_rejects: self.active_cap_rejects.load(Ordering::Relaxed),
recent_cap_rejects: self.recent_cap_rejects.load(Ordering::Relaxed),
cleanup_deferred_releases: self.cleanup_deferred_releases.load(Ordering::Relaxed),
}
}
@@ -238,11 +344,17 @@ impl UserIpTracker {
max_ips.clone_from(limits);
}
fn prune_recent(user_recent: &mut HashMap<IpAddr, Instant>, now: Instant, window: Duration) {
fn prune_recent(
user_recent: &mut HashMap<IpAddr, Instant>,
now: Instant,
window: Duration,
) -> usize {
if user_recent.is_empty() {
return;
return 0;
}
let before = user_recent.len();
user_recent.retain(|_, seen_at| now.duration_since(*seen_at) <= window);
before.saturating_sub(user_recent.len())
}
pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> {
@@ -261,24 +373,36 @@ impl UserIpTracker {
let window = *self.limit_window.read().await;
let now = Instant::now();
let mut active_ips = self.active_ips.write().await;
let (mut active_ips, mut recent_ips) = self.active_and_recent_write().await;
let user_active = active_ips
.entry(username.to_string())
.or_insert_with(HashMap::new);
let mut recent_ips = self.recent_ips.write().await;
let user_recent = recent_ips
.entry(username.to_string())
.or_insert_with(HashMap::new);
Self::prune_recent(user_recent, now, window);
let pruned_recent_entries = Self::prune_recent(user_recent, now, window);
Self::decrement_counter(&self.recent_entry_count, pruned_recent_entries);
let recent_contains_ip = user_recent.contains_key(&ip);
if let Some(count) = user_active.get_mut(&ip) {
if !recent_contains_ip
&& self.recent_entry_count.load(Ordering::Relaxed) >= MAX_RECENT_IP_ENTRIES
{
self.recent_cap_rejects.fetch_add(1, Ordering::Relaxed);
return Err(format!(
"IP tracker recent entry cap reached: entries={}/{}",
self.recent_entry_count.load(Ordering::Relaxed),
MAX_RECENT_IP_ENTRIES
));
}
*count = count.saturating_add(1);
user_recent.insert(ip, now);
if user_recent.insert(ip, now).is_none() {
self.recent_entry_count.fetch_add(1, Ordering::Relaxed);
}
return Ok(());
}
let is_new_ip = !user_recent.contains_key(&ip);
let is_new_ip = !recent_contains_ip;
if let Some(limit) = limit {
let active_limit_reached = user_active.len() >= limit;
@@ -302,30 +426,62 @@ impl UserIpTracker {
}
}
user_active.insert(ip, 1);
user_recent.insert(ip, now);
if self.active_entry_count.load(Ordering::Relaxed) >= MAX_ACTIVE_IP_ENTRIES {
self.active_cap_rejects.fetch_add(1, Ordering::Relaxed);
return Err(format!(
"IP tracker active entry cap reached: entries={}/{}",
self.active_entry_count.load(Ordering::Relaxed),
MAX_ACTIVE_IP_ENTRIES
));
}
if is_new_ip && self.recent_entry_count.load(Ordering::Relaxed) >= MAX_RECENT_IP_ENTRIES {
self.recent_cap_rejects.fetch_add(1, Ordering::Relaxed);
return Err(format!(
"IP tracker recent entry cap reached: entries={}/{}",
self.recent_entry_count.load(Ordering::Relaxed),
MAX_RECENT_IP_ENTRIES
));
}
if user_active.insert(ip, 1).is_none() {
self.active_entry_count.fetch_add(1, Ordering::Relaxed);
}
if user_recent.insert(ip, now).is_none() {
self.recent_entry_count.fetch_add(1, Ordering::Relaxed);
}
Ok(())
}
pub async fn remove_ip(&self, username: &str, ip: IpAddr) {
self.maybe_compact_empty_users().await;
let mut active_ips = self.active_ips.write().await;
let mut removed_active_entries = 0usize;
if let Some(user_ips) = active_ips.get_mut(username) {
if let Some(count) = user_ips.get_mut(&ip) {
if *count > 1 {
*count -= 1;
} else {
user_ips.remove(&ip);
if user_ips.remove(&ip).is_some() {
removed_active_entries = 1;
}
}
}
if user_ips.is_empty() {
active_ips.remove(username);
}
}
Self::decrement_counter(&self.active_entry_count, removed_active_entries);
}
pub async fn get_recent_counts_for_users(&self, users: &[String]) -> HashMap<String, usize> {
self.drain_cleanup_queue().await;
self.get_recent_counts_for_users_snapshot(users).await
}
pub(crate) async fn get_recent_counts_for_users_snapshot(
&self,
users: &[String],
) -> HashMap<String, usize> {
let window = *self.limit_window.read().await;
let now = Instant::now();
let recent_ips = self.recent_ips.read().await;
@@ -400,19 +556,29 @@ impl UserIpTracker {
pub async fn get_stats(&self) -> Vec<(String, usize, usize)> {
self.drain_cleanup_queue().await;
self.get_stats_snapshot().await
}
pub(crate) async fn get_stats_snapshot(&self) -> Vec<(String, usize, usize)> {
let active_ips = self.active_ips.read().await;
let active_counts = active_ips
.iter()
.map(|(username, user_ips)| (username.clone(), user_ips.len()))
.collect::<Vec<_>>();
drop(active_ips);
let max_ips = self.max_ips.read().await;
let default_max_ips = *self.default_max_ips.read().await;
let mut stats = Vec::new();
for (username, user_ips) in active_ips.iter() {
let mut stats = Vec::with_capacity(active_counts.len());
for (username, active_count) in active_counts {
let limit = max_ips
.get(username)
.get(&username)
.copied()
.filter(|limit| *limit > 0)
.or((default_max_ips > 0).then_some(default_max_ips))
.unwrap_or(0);
stats.push((username.clone(), user_ips.len(), limit));
stats.push((username, active_count, limit));
}
stats.sort_by(|a, b| a.0.cmp(&b.0));
@@ -421,20 +587,30 @@ impl UserIpTracker {
pub async fn clear_user_ips(&self, username: &str) {
let mut active_ips = self.active_ips.write().await;
active_ips.remove(username);
let removed_active_entries = active_ips
.remove(username)
.map(|ips| ips.len())
.unwrap_or(0);
drop(active_ips);
Self::decrement_counter(&self.active_entry_count, removed_active_entries);
let mut recent_ips = self.recent_ips.write().await;
recent_ips.remove(username);
let removed_recent_entries = recent_ips
.remove(username)
.map(|ips| ips.len())
.unwrap_or(0);
Self::decrement_counter(&self.recent_entry_count, removed_recent_entries);
}
pub async fn clear_all(&self) {
let mut active_ips = self.active_ips.write().await;
active_ips.clear();
drop(active_ips);
self.active_entry_count.store(0, Ordering::Relaxed);
let mut recent_ips = self.recent_ips.write().await;
recent_ips.clear();
self.recent_entry_count.store(0, Ordering::Relaxed);
}
pub async fn is_ip_active(&self, username: &str, ip: IpAddr) -> bool {

View File

@@ -814,6 +814,7 @@ async fn run_telemt_core(
beobachten.clone(),
shared_state.clone(),
ip_tracker.clone(),
tls_cache.clone(),
config_rx.clone(),
)
.await;

View File

@@ -21,6 +21,7 @@ use crate::startup::{
use crate::stats::beobachten::BeobachtenStore;
use crate::stats::telemetry::TelemetryPolicy;
use crate::stats::{ReplayChecker, Stats};
use crate::tls_front::TlsFrontCache;
use crate::transport::UpstreamManager;
use crate::transport::middle_proxy::{MePool, MeReinitTrigger};
@@ -328,6 +329,7 @@ pub(crate) async fn spawn_metrics_if_configured(
beobachten: Arc<BeobachtenStore>,
shared_state: Arc<ProxySharedState>,
ip_tracker: Arc<UserIpTracker>,
tls_cache: Option<Arc<TlsFrontCache>>,
config_rx: watch::Receiver<Arc<ProxyConfig>>,
) {
// metrics_listen takes precedence; fall back to metrics_port for backward compat.
@@ -363,6 +365,7 @@ pub(crate) async fn spawn_metrics_if_configured(
let shared_state = shared_state.clone();
let config_rx_metrics = config_rx.clone();
let ip_tracker_metrics = ip_tracker.clone();
let tls_cache_metrics = tls_cache.clone();
let whitelist = config.server.metrics_whitelist.clone();
let listen_backlog = config.server.listen_backlog;
tokio::spawn(async move {
@@ -374,6 +377,7 @@ pub(crate) async fn spawn_metrics_if_configured(
beobachten,
shared_state,
ip_tracker_metrics,
tls_cache_metrics,
config_rx_metrics,
whitelist,
)

View File

@@ -18,8 +18,16 @@ use crate::ip_tracker::UserIpTracker;
use crate::proxy::shared_state::ProxySharedState;
use crate::stats::Stats;
use crate::stats::beobachten::BeobachtenStore;
use crate::tls_front::TlsFrontCache;
use crate::tls_front::cache;
use crate::tls_front::fetcher;
use crate::transport::{ListenOptions, create_listener};
// Keeps `/metrics` response size bounded when per-user telemetry is enabled.
const USER_LABELED_METRICS_MAX_USERS: usize = 4096;
// Keeps TLS-front per-domain health series bounded for large generated configs.
const TLS_FRONT_PROFILE_HEALTH_MAX_DOMAINS: usize = 256;
pub async fn serve(
port: u16,
listen: Option<String>,
@@ -28,6 +36,7 @@ pub async fn serve(
beobachten: Arc<BeobachtenStore>,
shared_state: Arc<ProxySharedState>,
ip_tracker: Arc<UserIpTracker>,
tls_cache: Option<Arc<TlsFrontCache>>,
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
whitelist: Vec<IpNetwork>,
) {
@@ -52,6 +61,7 @@ pub async fn serve(
beobachten,
shared_state,
ip_tracker,
tls_cache,
config_rx,
whitelist,
)
@@ -107,6 +117,7 @@ pub async fn serve(
beobachten,
shared_state,
ip_tracker,
tls_cache,
config_rx,
whitelist,
)
@@ -117,6 +128,7 @@ pub async fn serve(
let beobachten_v6 = beobachten.clone();
let shared_state_v6 = shared_state.clone();
let ip_tracker_v6 = ip_tracker.clone();
let tls_cache_v6 = tls_cache.clone();
let config_rx_v6 = config_rx.clone();
let whitelist_v6 = whitelist.clone();
tokio::spawn(async move {
@@ -126,6 +138,7 @@ pub async fn serve(
beobachten_v6,
shared_state_v6,
ip_tracker_v6,
tls_cache_v6,
config_rx_v6,
whitelist_v6,
)
@@ -137,6 +150,7 @@ pub async fn serve(
beobachten,
shared_state,
ip_tracker,
tls_cache,
config_rx,
whitelist,
)
@@ -166,6 +180,7 @@ async fn serve_listener(
beobachten: Arc<BeobachtenStore>,
shared_state: Arc<ProxySharedState>,
ip_tracker: Arc<UserIpTracker>,
tls_cache: Option<Arc<TlsFrontCache>>,
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
whitelist: Arc<Vec<IpNetwork>>,
) {
@@ -187,6 +202,7 @@ async fn serve_listener(
let beobachten = beobachten.clone();
let shared_state = shared_state.clone();
let ip_tracker = ip_tracker.clone();
let tls_cache = tls_cache.clone();
let config_rx_conn = config_rx.clone();
tokio::spawn(async move {
let svc = service_fn(move |req| {
@@ -194,6 +210,7 @@ async fn serve_listener(
let beobachten = beobachten.clone();
let shared_state = shared_state.clone();
let ip_tracker = ip_tracker.clone();
let tls_cache = tls_cache.clone();
let config = config_rx_conn.borrow().clone();
async move {
handle(
@@ -202,6 +219,7 @@ async fn serve_listener(
&beobachten,
&shared_state,
&ip_tracker,
tls_cache.as_deref(),
&config,
)
.await
@@ -223,10 +241,11 @@ async fn handle<B>(
beobachten: &BeobachtenStore,
shared_state: &ProxySharedState,
ip_tracker: &UserIpTracker,
tls_cache: Option<&TlsFrontCache>,
config: &ProxyConfig,
) -> Result<Response<Full<Bytes>>, Infallible> {
if req.uri().path() == "/metrics" {
let body = render_metrics(stats, shared_state, config, ip_tracker).await;
let body = render_metrics(stats, shared_state, config, ip_tracker, tls_cache).await;
let resp = Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/plain; version=0.0.4; charset=utf-8")
@@ -261,11 +280,151 @@ fn render_beobachten(beobachten: &BeobachtenStore, config: &ProxyConfig) -> Stri
beobachten.snapshot_text(ttl)
}
fn tls_front_domains(config: &ProxyConfig) -> Vec<String> {
let mut domains = Vec::with_capacity(1 + config.censorship.tls_domains.len());
if !config.censorship.tls_domain.is_empty() {
domains.push(config.censorship.tls_domain.clone());
}
for domain in &config.censorship.tls_domains {
if !domain.is_empty() && !domains.contains(domain) {
domains.push(domain.clone());
}
}
domains
}
fn prometheus_label_value(value: &str) -> String {
value.replace('\\', "\\\\").replace('"', "\\\"")
}
async fn render_tls_front_profile_health(
out: &mut String,
config: &ProxyConfig,
tls_cache: Option<&TlsFrontCache>,
) {
use std::fmt::Write;
let domains = tls_front_domains(config);
let (health, suppressed) = match (config.censorship.tls_emulation, tls_cache) {
(true, Some(cache)) => {
cache
.profile_health_snapshot(&domains, TLS_FRONT_PROFILE_HEALTH_MAX_DOMAINS)
.await
}
_ => (Vec::new(), domains.len()),
};
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_domains TLS front configured profile domains by export status"
);
let _ = writeln!(out, "# TYPE telemt_tls_front_profile_domains gauge");
let _ = writeln!(
out,
"telemt_tls_front_profile_domains{{status=\"configured\"}} {}",
domains.len()
);
let _ = writeln!(
out,
"telemt_tls_front_profile_domains{{status=\"emitted\"}} {}",
health.len()
);
let _ = writeln!(
out,
"telemt_tls_front_profile_domains{{status=\"suppressed\"}} {}",
suppressed
);
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_info TLS front profile source and feature flags per configured domain"
);
let _ = writeln!(out, "# TYPE telemt_tls_front_profile_info gauge");
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_age_seconds Age of cached TLS front profile data per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_age_seconds gauge"
);
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_app_data_records TLS front cached app-data record count per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_app_data_records gauge"
);
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_ticket_records TLS front cached ticket-like tail record count per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_ticket_records gauge"
);
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_change_cipher_spec_records TLS front cached ChangeCipherSpec record count per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_change_cipher_spec_records gauge"
);
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_app_data_bytes TLS front cached total app-data bytes per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_app_data_bytes gauge"
);
for item in health {
let domain = prometheus_label_value(&item.domain);
let _ = writeln!(
out,
"telemt_tls_front_profile_info{{domain=\"{}\",source=\"{}\",is_default=\"{}\",has_cert_info=\"{}\",has_cert_payload=\"{}\"}} 1",
domain,
item.source,
item.is_default,
item.has_cert_info,
item.has_cert_payload
);
let _ = writeln!(
out,
"telemt_tls_front_profile_age_seconds{{domain=\"{}\"}} {}",
domain, item.age_seconds
);
let _ = writeln!(
out,
"telemt_tls_front_profile_app_data_records{{domain=\"{}\"}} {}",
domain, item.app_data_records
);
let _ = writeln!(
out,
"telemt_tls_front_profile_ticket_records{{domain=\"{}\"}} {}",
domain, item.ticket_records
);
let _ = writeln!(
out,
"telemt_tls_front_profile_change_cipher_spec_records{{domain=\"{}\"}} {}",
domain, item.change_cipher_spec_count
);
let _ = writeln!(
out,
"telemt_tls_front_profile_app_data_bytes{{domain=\"{}\"}} {}",
domain, item.total_app_data_len
);
}
}
async fn render_metrics(
stats: &Stats,
shared_state: &ProxySharedState,
config: &ProxyConfig,
ip_tracker: &UserIpTracker,
tls_cache: Option<&TlsFrontCache>,
) -> String {
use std::fmt::Write;
let mut out = String::with_capacity(4096);
@@ -311,6 +470,12 @@ async fn render_metrics(
"telemt_telemetry_user_enabled {}",
if user_enabled { 1 } else { 0 }
);
let _ = writeln!(
out,
"# HELP telemt_stats_user_entries Retained per-user stats entries"
);
let _ = writeln!(out, "# TYPE telemt_stats_user_entries gauge");
let _ = writeln!(out, "telemt_stats_user_entries {}", stats.user_stats_len());
let _ = writeln!(
out,
@@ -366,6 +531,54 @@ async fn render_metrics(
stats.get_buffer_pool_in_use_gauge()
);
let _ = writeln!(
out,
"# HELP telemt_tls_fetch_profile_cache_entries Current adaptive TLS fetch profile-cache entries"
);
let _ = writeln!(out, "# TYPE telemt_tls_fetch_profile_cache_entries gauge");
let _ = writeln!(
out,
"telemt_tls_fetch_profile_cache_entries {}",
fetcher::profile_cache_entries_for_metrics()
);
let _ = writeln!(
out,
"# HELP telemt_tls_fetch_profile_cache_cap_drops_total Profile-cache winner inserts skipped because the cache cap was reached"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_fetch_profile_cache_cap_drops_total counter"
);
let _ = writeln!(
out,
"telemt_tls_fetch_profile_cache_cap_drops_total {}",
fetcher::profile_cache_cap_drops_for_metrics()
);
let _ = writeln!(
out,
"# HELP telemt_tls_front_full_cert_budget_ips Current IP entries tracked by TLS full-cert budget"
);
let _ = writeln!(out, "# TYPE telemt_tls_front_full_cert_budget_ips gauge");
let _ = writeln!(
out,
"telemt_tls_front_full_cert_budget_ips {}",
cache::full_cert_sent_ips_for_metrics()
);
let _ = writeln!(
out,
"# HELP telemt_tls_front_full_cert_budget_cap_drops_total New IPs denied full-cert budget tracking because the cap was reached"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_full_cert_budget_cap_drops_total counter"
);
let _ = writeln!(
out,
"telemt_tls_front_full_cert_budget_cap_drops_total {}",
cache::full_cert_sent_cap_drops_for_metrics()
);
render_tls_front_profile_health(&mut out, config, tls_cache).await;
let _ = writeln!(
out,
"# HELP telemt_connections_total Total accepted connections"
@@ -396,6 +609,21 @@ async fn render_metrics(
}
);
let _ = writeln!(
out,
"# HELP telemt_connections_bad_by_class_total Bad/rejected connections by class"
);
let _ = writeln!(out, "# TYPE telemt_connections_bad_by_class_total counter");
if core_enabled {
for (class, total) in stats.get_connects_bad_class_counts() {
let _ = writeln!(
out,
"telemt_connections_bad_by_class_total{{class=\"{}\"}} {}",
class, total
);
}
}
let _ = writeln!(
out,
"# HELP telemt_handshake_timeouts_total Handshake timeouts"
@@ -411,6 +639,24 @@ async fn render_metrics(
}
);
let _ = writeln!(
out,
"# HELP telemt_handshake_failures_by_class_total Handshake failures by class"
);
let _ = writeln!(
out,
"# TYPE telemt_handshake_failures_by_class_total counter"
);
if core_enabled {
for (class, total) in stats.get_handshake_failure_class_counts() {
let _ = writeln!(
out,
"telemt_handshake_failures_by_class_total{{class=\"{}\"}} {}",
class, total
);
}
}
let _ = writeln!(
out,
"# HELP telemt_auth_expensive_checks_total Expensive authentication candidate checks executed during handshake validation"
@@ -3019,17 +3265,6 @@ async fn render_metrics(
0
}
);
let _ = writeln!(
out,
"# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag"
);
let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_suppressed gauge");
let _ = writeln!(
out,
"telemt_telemetry_user_series_suppressed {}",
if user_enabled { 0 } else { 1 }
);
let ip_memory = ip_tracker.memory_stats().await;
let _ = writeln!(
out,
@@ -3071,11 +3306,46 @@ async fn render_metrics(
"telemt_ip_tracker_cleanup_queue_len {}",
ip_memory.cleanup_queue_len
);
let _ = writeln!(
out,
"# HELP telemt_ip_tracker_cleanup_total Release cleanups deferred through the cleanup queue"
);
let _ = writeln!(out, "# TYPE telemt_ip_tracker_cleanup_total counter");
let _ = writeln!(
out,
"telemt_ip_tracker_cleanup_total{{path=\"deferred\"}} {}",
ip_memory.cleanup_deferred_releases
);
let _ = writeln!(
out,
"# HELP telemt_ip_tracker_cap_rejects_total New connection rejects caused by global IP tracker caps"
);
let _ = writeln!(out, "# TYPE telemt_ip_tracker_cap_rejects_total counter");
let _ = writeln!(
out,
"telemt_ip_tracker_cap_rejects_total{{scope=\"active\"}} {}",
ip_memory.active_cap_rejects
);
let _ = writeln!(
out,
"telemt_ip_tracker_cap_rejects_total{{scope=\"recent\"}} {}",
ip_memory.recent_cap_rejects
);
let mut user_stats_emitted = 0usize;
let mut user_stats_suppressed = 0usize;
let mut unique_ip_emitted = 0usize;
let mut unique_ip_suppressed = 0usize;
if user_enabled {
for entry in stats.iter_user_stats() {
if user_stats_emitted >= USER_LABELED_METRICS_MAX_USERS {
user_stats_suppressed = user_stats_suppressed.saturating_add(1);
continue;
}
let user = entry.key();
let s = entry.value();
user_stats_emitted = user_stats_emitted.saturating_add(1);
let _ = writeln!(
out,
"telemt_user_connections_total{{user=\"{}\"}} {}",
@@ -3117,7 +3387,7 @@ async fn render_metrics(
);
}
let ip_stats = ip_tracker.get_stats().await;
let ip_stats = ip_tracker.get_stats_snapshot().await;
let ip_counts: HashMap<String, usize> = ip_stats
.into_iter()
.map(|(user, count, _)| (user, count))
@@ -3129,7 +3399,7 @@ async fn render_metrics(
unique_users.extend(ip_counts.keys().cloned());
let unique_users_vec: Vec<String> = unique_users.iter().cloned().collect();
let recent_counts = ip_tracker
.get_recent_counts_for_users(&unique_users_vec)
.get_recent_counts_for_users_snapshot(&unique_users_vec)
.await;
let _ = writeln!(
@@ -3154,6 +3424,11 @@ async fn render_metrics(
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge");
for user in unique_users {
if unique_ip_emitted >= USER_LABELED_METRICS_MAX_USERS {
unique_ip_suppressed = unique_ip_suppressed.saturating_add(1);
continue;
}
unique_ip_emitted = unique_ip_emitted.saturating_add(1);
let current = ip_counts.get(&user).copied().unwrap_or(0);
let limit = config
.access
@@ -3193,6 +3468,46 @@ async fn render_metrics(
}
}
let _ = writeln!(
out,
"# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag"
);
let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_suppressed gauge");
let _ = writeln!(
out,
"telemt_telemetry_user_series_suppressed {}",
if user_enabled && user_stats_suppressed == 0 && unique_ip_suppressed == 0 {
0
} else {
1
}
);
let _ = writeln!(
out,
"# HELP telemt_telemetry_user_series_users User-labeled metric users by export status"
);
let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_users gauge");
let _ = writeln!(
out,
"telemt_telemetry_user_series_users{{family=\"stats\",status=\"emitted\"}} {}",
user_stats_emitted
);
let _ = writeln!(
out,
"telemt_telemetry_user_series_users{{family=\"stats\",status=\"suppressed\"}} {}",
user_stats_suppressed
);
let _ = writeln!(
out,
"telemt_telemetry_user_series_users{{family=\"unique_ip\",status=\"emitted\"}} {}",
unique_ip_emitted
);
let _ = writeln!(
out,
"telemt_telemetry_user_series_users{{family=\"unique_ip\",status=\"suppressed\"}} {}",
unique_ip_suppressed
);
out
}
@@ -3201,6 +3516,11 @@ mod tests {
use super::*;
use http_body_util::BodyExt;
use std::net::IpAddr;
use std::time::SystemTime;
use crate::tls_front::types::{
CachedTlsData, ParsedServerHello, TlsBehaviorProfile, TlsCertPayload, TlsProfileSource,
};
#[tokio::test]
async fn test_render_metrics_format() {
@@ -3215,8 +3535,9 @@ mod tests {
stats.increment_connects_all();
stats.increment_connects_all();
stats.increment_connects_bad();
stats.increment_connects_bad_with_class("tls_handshake_bad_client");
stats.increment_handshake_timeouts();
stats.increment_handshake_failure_class("timeout");
shared_state
.handshake
.auth_expensive_checks_total
@@ -3268,7 +3589,7 @@ mod tests {
.await
.unwrap();
let output = render_metrics(&stats, shared_state.as_ref(), &config, &tracker).await;
let output = render_metrics(&stats, shared_state.as_ref(), &config, &tracker, None).await;
assert!(output.contains(&format!(
"telemt_build_info{{version=\"{}\"}} 1",
@@ -3276,7 +3597,10 @@ mod tests {
)));
assert!(output.contains("telemt_connections_total 2"));
assert!(output.contains("telemt_connections_bad_total 1"));
assert!(output
.contains("telemt_connections_bad_by_class_total{class=\"tls_handshake_bad_client\"} 1"));
assert!(output.contains("telemt_handshake_timeouts_total 1"));
assert!(output.contains("telemt_handshake_failures_by_class_total{class=\"timeout\"} 1"));
assert!(output.contains("telemt_auth_expensive_checks_total 9"));
assert!(output.contains("telemt_auth_budget_exhausted_total 2"));
assert!(output.contains("telemt_upstream_connect_attempt_total 2"));
@@ -3330,13 +3654,86 @@ mod tests {
assert!(output.contains("telemt_ip_tracker_cleanup_queue_len 0"));
}
#[tokio::test]
async fn test_render_tls_front_profile_health() {
let stats = Stats::new();
let shared_state = ProxySharedState::new();
let tracker = UserIpTracker::new();
let mut config = ProxyConfig::default();
config.censorship.tls_domain = "primary.example".to_string();
config.censorship.tls_domains = vec!["fallback.example".to_string()];
let cache = TlsFrontCache::new(
&[
"primary.example".to_string(),
"fallback.example".to_string(),
],
1024,
"tlsfront-profile-health-test",
);
cache
.set(
"primary.example",
CachedTlsData {
server_hello_template: ParsedServerHello {
version: [0x03, 0x03],
random: [0u8; 32],
session_id: Vec::new(),
cipher_suite: [0x13, 0x01],
compression: 0,
extensions: Vec::new(),
},
cert_info: None,
cert_payload: Some(TlsCertPayload {
cert_chain_der: vec![vec![0x30, 0x01]],
certificate_message: vec![0x0b, 0x00, 0x00, 0x00],
}),
app_data_records_sizes: vec![1024, 512],
total_app_data_len: 1536,
behavior_profile: TlsBehaviorProfile {
change_cipher_spec_count: 1,
app_data_record_sizes: vec![1024, 512],
ticket_record_sizes: vec![69],
source: TlsProfileSource::Merged,
},
fetched_at: SystemTime::now(),
domain: "primary.example".to_string(),
},
)
.await;
let output = render_metrics(&stats, &shared_state, &config, &tracker, Some(&cache)).await;
assert!(output.contains("telemt_tls_front_profile_domains{status=\"configured\"} 2"));
assert!(output.contains("telemt_tls_front_profile_domains{status=\"emitted\"} 2"));
assert!(output.contains("telemt_tls_front_profile_domains{status=\"suppressed\"} 0"));
assert!(
output.contains("telemt_tls_front_profile_info{domain=\"primary.example\",source=\"merged\",is_default=\"false\",has_cert_info=\"false\",has_cert_payload=\"true\"} 1")
);
assert!(
output.contains("telemt_tls_front_profile_info{domain=\"fallback.example\",source=\"default\",is_default=\"true\",has_cert_info=\"false\",has_cert_payload=\"false\"} 1")
);
assert!(
output.contains("telemt_tls_front_profile_app_data_records{domain=\"primary.example\"} 2")
);
assert!(
output.contains("telemt_tls_front_profile_ticket_records{domain=\"primary.example\"} 1")
);
assert!(
output.contains("telemt_tls_front_profile_change_cipher_spec_records{domain=\"primary.example\"} 1")
);
assert!(
output.contains("telemt_tls_front_profile_app_data_bytes{domain=\"primary.example\"} 1536")
);
}
#[tokio::test]
async fn test_render_empty_stats() {
let stats = Stats::new();
let shared_state = ProxySharedState::new();
let tracker = UserIpTracker::new();
let config = ProxyConfig::default();
let output = render_metrics(&stats, &shared_state, &config, &tracker).await;
let output = render_metrics(&stats, &shared_state, &config, &tracker, None).await;
assert!(output.contains("telemt_connections_total 0"));
assert!(output.contains("telemt_connections_bad_total 0"));
assert!(output.contains("telemt_handshake_timeouts_total 0"));
@@ -3360,7 +3757,7 @@ mod tests {
let mut config = ProxyConfig::default();
config.access.user_max_unique_ips_global_each = 2;
let output = render_metrics(&stats, &shared_state, &config, &tracker).await;
let output = render_metrics(&stats, &shared_state, &config, &tracker, None).await;
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 2"));
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.500000"));
@@ -3372,11 +3769,13 @@ mod tests {
let shared_state = ProxySharedState::new();
let tracker = UserIpTracker::new();
let config = ProxyConfig::default();
let output = render_metrics(&stats, &shared_state, &config, &tracker).await;
let output = render_metrics(&stats, &shared_state, &config, &tracker, None).await;
assert!(output.contains("# TYPE telemt_uptime_seconds gauge"));
assert!(output.contains("# TYPE telemt_connections_total counter"));
assert!(output.contains("# TYPE telemt_connections_bad_total counter"));
assert!(output.contains("# TYPE telemt_connections_bad_by_class_total counter"));
assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter"));
assert!(output.contains("# TYPE telemt_handshake_failures_by_class_total counter"));
assert!(output.contains("# TYPE telemt_auth_expensive_checks_total counter"));
assert!(output.contains("# TYPE telemt_auth_budget_exhausted_total counter"));
assert!(output.contains("# TYPE telemt_upstream_connect_attempt_total counter"));
@@ -3406,9 +3805,28 @@ mod tests {
assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge"));
assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge"));
assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge"));
assert!(output.contains("# TYPE telemt_stats_user_entries gauge"));
assert!(output.contains("# TYPE telemt_telemetry_user_series_users gauge"));
assert!(output.contains("# TYPE telemt_ip_tracker_users gauge"));
assert!(output.contains("# TYPE telemt_ip_tracker_entries gauge"));
assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_queue_len gauge"));
assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_total counter"));
assert!(output.contains("# TYPE telemt_ip_tracker_cap_rejects_total counter"));
assert!(output.contains("# TYPE telemt_tls_fetch_profile_cache_entries gauge"));
assert!(output.contains("# TYPE telemt_tls_fetch_profile_cache_cap_drops_total counter"));
assert!(output.contains("# TYPE telemt_tls_front_full_cert_budget_ips gauge"));
assert!(
output.contains("# TYPE telemt_tls_front_full_cert_budget_cap_drops_total counter")
);
assert!(output.contains("# TYPE telemt_tls_front_profile_domains gauge"));
assert!(output.contains("# TYPE telemt_tls_front_profile_info gauge"));
assert!(output.contains("# TYPE telemt_tls_front_profile_age_seconds gauge"));
assert!(output.contains("# TYPE telemt_tls_front_profile_app_data_records gauge"));
assert!(output.contains("# TYPE telemt_tls_front_profile_ticket_records gauge"));
assert!(
output.contains("# TYPE telemt_tls_front_profile_change_cipher_spec_records gauge")
);
assert!(output.contains("# TYPE telemt_tls_front_profile_app_data_bytes gauge"));
}
#[tokio::test]
@@ -3429,6 +3847,7 @@ mod tests {
&beobachten,
shared_state.as_ref(),
&tracker,
None,
&config,
)
.await
@@ -3463,6 +3882,7 @@ mod tests {
&beobachten,
shared_state.as_ref(),
&tracker,
None,
&config,
)
.await
@@ -3480,6 +3900,7 @@ mod tests {
&beobachten,
shared_state.as_ref(),
&tracker,
None,
&config,
)
.await

View File

@@ -1431,8 +1431,8 @@ impl RunningClientHandler {
/// Main dispatch after successful handshake.
/// Two modes:
/// - Direct: TCP relay to TG DC (existing behavior)
/// - Middle Proxy: RPC multiplex through ME pool (new — supports CDN DCs)
/// - Direct: TCP relay to TG DC (existing behavior)
/// - Middle Proxy: RPC multiplex through ME pool (supports CDN DCs)
#[cfg(test)]
async fn handle_authenticated_static<R, W>(
client_reader: CryptoReader<R>,

View File

@@ -669,6 +669,13 @@ fn adversarial_check_then_symlink_flip_is_blocked_by_nofollow_open() {
"telemt-unknown-dc-check-open-race-{}",
std::process::id()
));
if let Ok(meta) = fs::symlink_metadata(&parent) {
if meta.file_type().is_symlink() || meta.is_file() {
fs::remove_file(&parent).expect("stale check-open-race path must be removable");
} else {
fs::remove_dir_all(&parent).expect("stale check-open-race parent must be removable");
}
}
fs::create_dir_all(&parent).expect("check-open-race parent must be creatable");
let target = parent.join("unknown-dc.log");

View File

@@ -74,16 +74,21 @@ impl BeobachtenStore {
}
let now = Instant::now();
let mut guard = self.inner.lock();
Self::cleanup(&mut guard, now, ttl);
guard.last_cleanup = Some(now);
let entries = {
let mut guard = self.inner.lock();
Self::cleanup(&mut guard, now, ttl);
guard.last_cleanup = Some(now);
guard
.entries
.iter()
.map(|((class, ip), entry)| (class.clone(), *ip, entry.tries))
.collect::<Vec<_>>()
};
let mut grouped = BTreeMap::<String, Vec<(IpAddr, u64)>>::new();
for ((class, ip), entry) in &guard.entries {
grouped
.entry(class.clone())
.or_default()
.push((*ip, entry.tries));
for (class, ip, tries) in entries {
grouped.entry(class).or_default().push((ip, tries));
}
if grouped.is_empty() {

View File

@@ -2477,6 +2477,11 @@ impl Stats {
self.user_stats.iter()
}
/// Current number of retained per-user stats entries.
pub fn user_stats_len(&self) -> usize {
self.user_stats.len()
}
pub fn uptime_secs(&self) -> f64 {
self.start_time
.read()

View File

@@ -277,6 +277,7 @@ impl StreamState for TlsReaderState {
pub struct FakeTlsReader<R> {
upstream: R,
state: TlsReaderState,
body_scratch: Vec<u8>,
}
impl<R> FakeTlsReader<R> {
@@ -284,6 +285,7 @@ impl<R> FakeTlsReader<R> {
Self {
upstream,
state: TlsReaderState::Idle,
body_scratch: Vec::new(),
}
}
@@ -439,7 +441,13 @@ impl<R: AsyncRead + Unpin> AsyncRead for FakeTlsReader<R> {
length,
mut buffer,
} => {
let result = poll_read_body(&mut this.upstream, cx, &mut buffer, length);
let result = poll_read_body(
&mut this.upstream,
cx,
&mut buffer,
length,
&mut this.body_scratch,
);
match result {
BodyPollResult::Pending => {
@@ -558,34 +566,36 @@ fn poll_read_body<R: AsyncRead + Unpin>(
cx: &mut Context<'_>,
buffer: &mut BytesMut,
target_len: usize,
scratch: &mut Vec<u8>,
) -> BodyPollResult {
// NOTE: This implementation uses a temporary Vec to avoid tricky borrow/lifetime
// issues with BytesMut spare capacity and ReadBuf across polls.
// It's safe and correct; optimization is possible if needed.
while buffer.len() < target_len {
let remaining = target_len - buffer.len();
let chunk_len = remaining.min(8192);
let mut temp = vec![0u8; remaining.min(8192)];
let mut read_buf = ReadBuf::new(&mut temp);
match Pin::new(&mut *upstream).poll_read(cx, &mut read_buf) {
Poll::Pending => return BodyPollResult::Pending,
Poll::Ready(Err(e)) => return BodyPollResult::Error(e),
Poll::Ready(Ok(())) => {
let n = read_buf.filled().len();
if n == 0 {
return BodyPollResult::Error(Error::new(
ErrorKind::UnexpectedEof,
format!(
"unexpected EOF in TLS body (got {} of {} bytes)",
buffer.len(),
target_len
),
));
}
buffer.extend_from_slice(&temp[..n]);
}
if scratch.len() < chunk_len {
scratch.resize(chunk_len, 0);
}
let n = {
let mut read_buf = ReadBuf::new(&mut scratch[..chunk_len]);
match Pin::new(&mut *upstream).poll_read(cx, &mut read_buf) {
Poll::Pending => return BodyPollResult::Pending,
Poll::Ready(Err(e)) => return BodyPollResult::Error(e),
Poll::Ready(Ok(())) => read_buf.filled().len(),
}
};
if n == 0 {
return BodyPollResult::Error(Error::new(
ErrorKind::UnexpectedEof,
format!(
"unexpected EOF in TLS body (got {} of {} bytes)",
buffer.len(),
target_len
),
));
}
buffer.extend_from_slice(&scratch[..n]);
}
BodyPollResult::Complete(buffer.split().freeze())

View File

@@ -559,9 +559,7 @@ async fn mass_reconnect_sync_cleanup_prevents_temporary_reservation_bloat() {
}
#[tokio::test]
async fn adversarial_drain_cleanup_queue_race_does_not_cause_false_rejections() {
// Regression guard: concurrent cleanup draining must not produce false
// limit denials for a new IP when the previous IP is already queued.
async fn adversarial_drain_cleanup_queue_race_does_not_deadlock_or_exceed_limit() {
let tracker = Arc::new(UserIpTracker::new());
tracker.set_user_limit("racer", 1).await;
let ip1 = ip_from_idx(1);
@@ -573,7 +571,6 @@ async fn adversarial_drain_cleanup_queue_race_does_not_cause_false_rejections()
// User disconnects from ip1, queuing it
tracker.enqueue_cleanup("racer".to_string(), ip1);
let mut saw_false_rejection = false;
for _ in 0..100 {
// Queue cleanup then race explicit drain and check-and-add on the alternative IP.
tracker.enqueue_cleanup("racer".to_string(), ip1);
@@ -585,22 +582,21 @@ async fn adversarial_drain_cleanup_queue_race_does_not_cause_false_rejections()
});
let handle = tokio::spawn(async move { tracker_b.check_and_add("racer", ip2).await });
drain_handle.await.unwrap();
let res = handle.await.unwrap();
if res.is_err() {
saw_false_rejection = true;
break;
}
tokio::time::timeout(Duration::from_secs(1), drain_handle)
.await
.expect("cleanup drain must not deadlock")
.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(1), handle)
.await
.expect("admission must not deadlock")
.unwrap();
// Restore baseline for next iteration.
assert!(tracker.get_active_ip_count("racer").await <= 1);
tracker.drain_cleanup_queue().await;
tracker.remove_ip("racer", ip2).await;
tracker.remove_ip("racer", ip1).await;
tracker.check_and_add("racer", ip1).await.unwrap();
}
assert!(
!saw_false_rejection,
"Concurrent cleanup draining must not cause false-positive IP denials"
);
}
#[tokio::test]

View File

@@ -1,26 +1,71 @@
use std::collections::HashMap;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::net::IpAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use tokio::time::sleep;
use tracing::{debug, info, warn};
use crate::tls_front::types::{
CachedTlsData, ParsedServerHello, TlsBehaviorProfile, TlsFetchResult,
CachedTlsData, ParsedServerHello, TlsBehaviorProfile, TlsFetchResult, TlsProfileSource,
};
const FULL_CERT_SENT_SWEEP_INTERVAL_SECS: u64 = 30;
const FULL_CERT_SENT_MAX_IPS: usize = 65_536;
const FULL_CERT_SENT_SHARDS: usize = 64;
static FULL_CERT_SENT_IPS_GAUGE: AtomicU64 = AtomicU64::new(0);
static FULL_CERT_SENT_CAP_DROPS: AtomicU64 = AtomicU64::new(0);
/// Current number of IPs tracked by the TLS full-cert budget gate.
pub(crate) fn full_cert_sent_ips_for_metrics() -> u64 {
FULL_CERT_SENT_IPS_GAUGE.load(Ordering::Relaxed)
}
/// Number of new IPs denied a full-cert budget slot because the cap was reached.
pub(crate) fn full_cert_sent_cap_drops_for_metrics() -> u64 {
FULL_CERT_SENT_CAP_DROPS.load(Ordering::Relaxed)
}
/// Lightweight in-memory + optional on-disk cache for TLS fronting data.
#[derive(Debug)]
pub struct TlsFrontCache {
memory: RwLock<HashMap<String, Arc<CachedTlsData>>>,
default: Arc<CachedTlsData>,
full_cert_sent: RwLock<HashMap<IpAddr, Instant>>,
full_cert_sent_shards: Vec<RwLock<HashMap<IpAddr, Instant>>>,
full_cert_sent_last_sweep_epoch_secs: AtomicU64,
disk_path: PathBuf,
}
/// Read-only health view for one configured TLS front domain.
#[derive(Debug, Clone)]
pub(crate) struct TlsFrontProfileHealth {
pub(crate) domain: String,
pub(crate) source: &'static str,
pub(crate) age_seconds: u64,
pub(crate) is_default: bool,
pub(crate) has_cert_info: bool,
pub(crate) has_cert_payload: bool,
pub(crate) app_data_records: usize,
pub(crate) ticket_records: usize,
pub(crate) change_cipher_spec_count: u8,
pub(crate) total_app_data_len: usize,
}
fn profile_source_label(source: TlsProfileSource) -> &'static str {
match source {
TlsProfileSource::Default => "default",
TlsProfileSource::Raw => "raw",
TlsProfileSource::Rustls => "rustls",
TlsProfileSource::Merged => "merged",
}
}
#[allow(dead_code)]
impl TlsFrontCache {
pub fn new(domains: &[String], default_len: usize, disk_path: impl AsRef<Path>) -> Self {
@@ -52,7 +97,10 @@ impl TlsFrontCache {
Self {
memory: RwLock::new(map),
default,
full_cert_sent: RwLock::new(HashMap::new()),
full_cert_sent_shards: (0..FULL_CERT_SENT_SHARDS)
.map(|_| RwLock::new(HashMap::new()))
.collect(),
full_cert_sent_last_sweep_epoch_secs: AtomicU64::new(0),
disk_path: disk_path.as_ref().to_path_buf(),
}
}
@@ -69,22 +117,128 @@ impl TlsFrontCache {
self.memory.read().await.contains_key(domain)
}
pub(crate) async fn profile_health_snapshot(
&self,
domains: &[String],
max_domains: usize,
) -> (Vec<TlsFrontProfileHealth>, usize) {
let guard = self.memory.read().await;
let now = SystemTime::now();
let mut snapshot = Vec::with_capacity(domains.len().min(max_domains));
let mut suppressed = 0usize;
for domain in domains {
if snapshot.len() >= max_domains {
suppressed = suppressed.saturating_add(1);
continue;
}
let cached = guard
.get(domain)
.cloned()
.unwrap_or_else(|| self.default.clone());
let behavior = &cached.behavior_profile;
let age_seconds = now
.duration_since(cached.fetched_at)
.map(|duration| duration.as_secs())
.unwrap_or(0);
snapshot.push(TlsFrontProfileHealth {
domain: domain.clone(),
source: profile_source_label(behavior.source),
age_seconds,
is_default: cached.domain == "default",
has_cert_info: cached.cert_info.is_some(),
has_cert_payload: cached.cert_payload.is_some(),
app_data_records: cached.app_data_records_sizes.len().max(
behavior.app_data_record_sizes.len(),
),
ticket_records: behavior.ticket_record_sizes.len(),
change_cipher_spec_count: behavior.change_cipher_spec_count,
total_app_data_len: cached.total_app_data_len,
});
}
(snapshot, suppressed)
}
fn full_cert_sent_shard_index(client_ip: IpAddr) -> usize {
let mut hasher = DefaultHasher::new();
client_ip.hash(&mut hasher);
(hasher.finish() as usize) % FULL_CERT_SENT_SHARDS
}
fn full_cert_sent_shard(&self, client_ip: IpAddr) -> &RwLock<HashMap<IpAddr, Instant>> {
&self.full_cert_sent_shards[Self::full_cert_sent_shard_index(client_ip)]
}
fn decrement_full_cert_sent_entries(amount: usize) {
if amount == 0 {
return;
}
let amount = amount as u64;
let _ =
FULL_CERT_SENT_IPS_GAUGE.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |current| {
Some(current.saturating_sub(amount))
});
}
fn try_reserve_full_cert_sent_entry() -> bool {
let mut current = FULL_CERT_SENT_IPS_GAUGE.load(Ordering::Relaxed);
loop {
if current >= FULL_CERT_SENT_MAX_IPS as u64 {
return false;
}
match FULL_CERT_SENT_IPS_GAUGE.compare_exchange_weak(
current,
current.saturating_add(1),
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => return true,
Err(actual) => current = actual,
}
}
}
async fn sweep_full_cert_sent_shards(&self, now: Instant, ttl: Duration) {
for shard in &self.full_cert_sent_shards {
let mut guard = shard.write().await;
let before = guard.len();
guard.retain(|_, seen_at| now.duration_since(*seen_at) < ttl);
Self::decrement_full_cert_sent_entries(before.saturating_sub(guard.len()));
}
}
/// Returns true when full cert payload should be sent for client_ip
/// according to TTL policy.
pub async fn take_full_cert_budget_for_ip(&self, client_ip: IpAddr, ttl: Duration) -> bool {
if ttl.is_zero() {
self.full_cert_sent
.write()
.await
.insert(client_ip, Instant::now());
return true;
}
let now = Instant::now();
let mut guard = self.full_cert_sent.write().await;
guard.retain(|_, seen_at| now.duration_since(*seen_at) < ttl);
let now_epoch_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let should_sweep = self
.full_cert_sent_last_sweep_epoch_secs
.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |last_sweep| {
if now_epoch_secs.saturating_sub(last_sweep) >= FULL_CERT_SENT_SWEEP_INTERVAL_SECS {
Some(now_epoch_secs)
} else {
None
}
})
.is_ok();
match guard.get_mut(&client_ip) {
if should_sweep {
self.sweep_full_cert_sent_shards(now, ttl).await;
}
let mut guard = self.full_cert_sent_shard(client_ip).write().await;
let allowed = match guard.get_mut(&client_ip) {
Some(seen_at) => {
if now.duration_since(*seen_at) >= ttl {
*seen_at = now;
@@ -94,12 +248,43 @@ impl TlsFrontCache {
}
}
None => {
if !Self::try_reserve_full_cert_sent_entry() {
FULL_CERT_SENT_CAP_DROPS.fetch_add(1, Ordering::Relaxed);
return false;
}
guard.insert(client_ip, now);
true
}
};
allowed
}
#[cfg(test)]
async fn insert_full_cert_sent_for_tests(&self, client_ip: IpAddr, seen_at: Instant) {
let mut guard = self.full_cert_sent_shard(client_ip).write().await;
if guard.insert(client_ip, seen_at).is_none() {
FULL_CERT_SENT_IPS_GAUGE.fetch_add(1, Ordering::Relaxed);
}
}
#[cfg(test)]
async fn full_cert_sent_is_empty_for_tests(&self) -> bool {
for shard in &self.full_cert_sent_shards {
if !shard.read().await.is_empty() {
return false;
}
}
true
}
#[cfg(test)]
async fn full_cert_sent_contains_for_tests(&self, client_ip: IpAddr) -> bool {
self.full_cert_sent_shard(client_ip)
.read()
.await
.contains_key(&client_ip)
}
pub async fn set(&self, domain: &str, data: CachedTlsData) {
let mut guard = self.memory.write().await;
guard.insert(domain.to_string(), Arc::new(data));
@@ -328,10 +513,68 @@ mod tests {
#[tokio::test]
async fn test_take_full_cert_budget_for_ip_zero_ttl_always_allows_full_payload() {
let cache = TlsFrontCache::new(&["example.com".to_string()], 1024, "tlsfront-test-cache");
let ip: IpAddr = "127.0.0.1".parse().expect("ip");
let ttl = Duration::ZERO;
assert!(cache.take_full_cert_budget_for_ip(ip, ttl).await);
assert!(cache.take_full_cert_budget_for_ip(ip, ttl).await);
for idx in 0..100_000u32 {
let ip = IpAddr::V4(std::net::Ipv4Addr::new(
10,
((idx >> 16) & 0xff) as u8,
((idx >> 8) & 0xff) as u8,
(idx & 0xff) as u8,
));
assert!(cache.take_full_cert_budget_for_ip(ip, ttl).await);
}
assert!(cache.full_cert_sent_is_empty_for_tests().await);
}
#[tokio::test]
async fn test_take_full_cert_budget_for_ip_sweeps_expired_entries_when_due() {
let cache = TlsFrontCache::new(&["example.com".to_string()], 1024, "tlsfront-test-cache");
let stale_ip: IpAddr = "127.0.0.1".parse().expect("ip");
let new_ip: IpAddr = "127.0.0.2".parse().expect("ip");
let ttl = Duration::from_secs(1);
let stale_seen_at = Instant::now()
.checked_sub(Duration::from_secs(10))
.unwrap_or_else(Instant::now);
cache
.insert_full_cert_sent_for_tests(stale_ip, stale_seen_at)
.await;
cache
.full_cert_sent_last_sweep_epoch_secs
.store(0, Ordering::Relaxed);
assert!(cache.take_full_cert_budget_for_ip(new_ip, ttl).await);
assert!(!cache.full_cert_sent_contains_for_tests(stale_ip).await);
assert!(cache.full_cert_sent_contains_for_tests(new_ip).await);
}
#[tokio::test]
async fn test_take_full_cert_budget_for_ip_does_not_sweep_every_call() {
let cache = TlsFrontCache::new(&["example.com".to_string()], 1024, "tlsfront-test-cache");
let stale_ip: IpAddr = "127.0.0.1".parse().expect("ip");
let new_ip: IpAddr = "127.0.0.2".parse().expect("ip");
let ttl = Duration::from_secs(1);
let stale_seen_at = Instant::now()
.checked_sub(Duration::from_secs(10))
.unwrap_or_else(Instant::now);
let now_epoch_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
cache
.insert_full_cert_sent_for_tests(stale_ip, stale_seen_at)
.await;
cache
.full_cert_sent_last_sweep_epoch_secs
.store(now_epoch_secs, Ordering::Relaxed);
assert!(cache.take_full_cert_budget_for_ip(new_ip, ttl).await);
assert!(cache.full_cert_sent_contains_for_tests(stale_ip).await);
assert!(cache.full_cert_sent_contains_for_tests(new_ip).await);
}
}

View File

@@ -3,7 +3,9 @@
use dashmap::DashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use anyhow::{Result, anyhow};
@@ -144,12 +146,37 @@ enum FetchErrorKind {
Other,
}
const PROFILE_CACHE_MAX_ENTRIES: usize = 4096;
static PROFILE_CACHE: OnceLock<DashMap<ProfileCacheKey, ProfileCacheValue>> = OnceLock::new();
static PROFILE_CACHE_INSERT_GUARD: OnceLock<Mutex<()>> = OnceLock::new();
static PROFILE_CACHE_CAP_DROPS: AtomicU64 = AtomicU64::new(0);
fn profile_cache() -> &'static DashMap<ProfileCacheKey, ProfileCacheValue> {
PROFILE_CACHE.get_or_init(DashMap::new)
}
fn profile_cache_insert_guard() -> &'static Mutex<()> {
PROFILE_CACHE_INSERT_GUARD.get_or_init(|| Mutex::new(()))
}
fn sweep_expired_profile_cache(ttl: Duration, now: Instant) {
if ttl.is_zero() {
return;
}
profile_cache().retain(|_, value| now.saturating_duration_since(value.updated_at) <= ttl);
}
/// Current number of adaptive TLS fetch profile-cache entries.
pub(crate) fn profile_cache_entries_for_metrics() -> usize {
profile_cache().len()
}
/// Number of fresh profile-cache winners skipped because the cache was full.
pub(crate) fn profile_cache_cap_drops_for_metrics() -> u64 {
PROFILE_CACHE_CAP_DROPS.load(Ordering::Relaxed)
}
fn route_hint(
upstream: Option<&std::sync::Arc<crate::transport::UpstreamManager>>,
unix_sock: Option<&str>,
@@ -267,6 +294,43 @@ fn remember_profile_success(
let Some(key) = cache_key else {
return;
};
remember_profile_success_with_cap(strategy, key, profile, now, PROFILE_CACHE_MAX_ENTRIES);
}
fn remember_profile_success_with_cap(
strategy: &TlsFetchStrategy,
key: ProfileCacheKey,
profile: TlsFetchProfile,
now: Instant,
max_entries: usize,
) {
let Ok(_guard) = profile_cache_insert_guard().lock() else {
PROFILE_CACHE_CAP_DROPS.fetch_add(1, Ordering::Relaxed);
return;
};
if max_entries == 0 {
PROFILE_CACHE_CAP_DROPS.fetch_add(1, Ordering::Relaxed);
return;
}
if profile_cache().contains_key(&key) {
profile_cache().insert(
key,
ProfileCacheValue {
profile,
updated_at: now,
},
);
return;
}
if profile_cache().len() >= max_entries {
// TLS fetch is control-plane work; sweeping under a tiny mutex keeps
// profile-cache cardinality hard-bounded without touching relay hot paths.
sweep_expired_profile_cache(strategy.profile_cache_ttl, now);
}
if profile_cache().len() >= max_entries {
PROFILE_CACHE_CAP_DROPS.fetch_add(1, Ordering::Relaxed);
return;
}
profile_cache().insert(
key,
ProfileCacheValue {

View File

@@ -618,13 +618,9 @@ impl MePool {
me_route_hybrid_max_wait: Duration::from_millis(
me_route_hybrid_max_wait_ms.max(50),
),
me_route_blocking_send_timeout: if me_route_blocking_send_timeout_ms == 0 {
None
} else {
Some(Duration::from_millis(
me_route_blocking_send_timeout_ms.min(5_000),
))
},
me_route_blocking_send_timeout: Some(Duration::from_millis(
me_route_blocking_send_timeout_ms.clamp(1, 5_000),
)),
me_route_last_success_epoch_ms: AtomicU64::new(0),
me_route_hybrid_timeout_warn_epoch_ms: AtomicU64::new(0),
me_async_recovery_last_trigger_epoch_ms: AtomicU64::new(0),

File diff suppressed because it is too large Load Diff