From c6c3d71b08954cc0279fd23d40a7fec08a01c072 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 25 Feb 2026 01:26:01 +0300 Subject: [PATCH 1/3] ME Pool Flap-Detect in statistics --- src/metrics.rs | 93 ++++++++++++++++++++++++++++++ src/stats/mod.rs | 49 ++++++++++++++++ src/transport/middle_proxy/pool.rs | 10 ++++ 3 files changed, 152 insertions(+) diff --git a/src/metrics.rs b/src/metrics.rs index 53ddd5d..0051858 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -199,6 +199,95 @@ fn render_metrics(stats: &Stats) -> String { stats.get_pool_stale_pick_total() ); + let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals"); + let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_removed_total {}", + stats.get_me_writer_removed_total() + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_removed_unexpected_total Unexpected ME writer removals that triggered refill" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_removed_unexpected_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_removed_unexpected_total {}", + stats.get_me_writer_removed_unexpected_total() + ); + + let _ = writeln!(out, "# HELP telemt_me_refill_triggered_total Immediate ME refill runs started"); + let _ = writeln!(out, "# TYPE telemt_me_refill_triggered_total counter"); + let _ = writeln!( + out, + "telemt_me_refill_triggered_total {}", + stats.get_me_refill_triggered_total() + ); + + let _ = writeln!( + out, + "# HELP telemt_me_refill_skipped_inflight_total Immediate ME refill skips due to inflight dedup" + ); + let _ = writeln!(out, "# TYPE telemt_me_refill_skipped_inflight_total counter"); + let _ = writeln!( + out, + "telemt_me_refill_skipped_inflight_total {}", + stats.get_me_refill_skipped_inflight_total() + ); + + let _ = writeln!(out, "# HELP telemt_me_refill_failed_total Immediate ME refill failures"); + let _ = writeln!(out, "# TYPE telemt_me_refill_failed_total counter"); + let _ = writeln!( + out, + "telemt_me_refill_failed_total {}", + stats.get_me_refill_failed_total() + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_restored_same_endpoint_total Refilled ME writer restored on the same endpoint" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_restored_same_endpoint_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_restored_same_endpoint_total {}", + stats.get_me_writer_restored_same_endpoint_total() + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_restored_fallback_total Refilled ME writer restored via fallback endpoint" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_restored_fallback_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_restored_fallback_total {}", + stats.get_me_writer_restored_fallback_total() + ); + + let unresolved_writer_losses = stats + .get_me_writer_removed_unexpected_total() + .saturating_sub( + stats + .get_me_writer_restored_same_endpoint_total() + .saturating_add(stats.get_me_writer_restored_fallback_total()), + ); + let _ = writeln!( + out, + "# HELP telemt_me_writer_removed_unexpected_minus_restored_total Unexpected writer removals not yet compensated by restore" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge" + ); + let _ = writeln!( + out, + "telemt_me_writer_removed_unexpected_minus_restored_total {}", + unresolved_writer_losses + ); + let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections"); let _ = writeln!(out, "# TYPE telemt_user_connections_total counter"); let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections"); @@ -277,6 +366,10 @@ mod tests { assert!(output.contains("# TYPE telemt_connections_total counter")); assert!(output.contains("# TYPE telemt_connections_bad_total counter")); assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); + assert!(output.contains( + "# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge" + )); } #[tokio::test] diff --git a/src/stats/mod.rs b/src/stats/mod.rs index a58996d..5f4c98e 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -43,6 +43,13 @@ pub struct Stats { pool_drain_active: AtomicU64, pool_force_close_total: AtomicU64, pool_stale_pick_total: AtomicU64, + me_writer_removed_total: AtomicU64, + me_writer_removed_unexpected_total: AtomicU64, + me_refill_triggered_total: AtomicU64, + me_refill_skipped_inflight_total: AtomicU64, + me_refill_failed_total: AtomicU64, + me_writer_restored_same_endpoint_total: AtomicU64, + me_writer_restored_fallback_total: AtomicU64, user_stats: DashMap, start_time: parking_lot::RwLock>, } @@ -142,6 +149,27 @@ impl Stats { pub fn increment_pool_stale_pick_total(&self) { self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed); } + pub fn increment_me_writer_removed_total(&self) { + self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_me_writer_removed_unexpected_total(&self) { + self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_me_refill_triggered_total(&self) { + self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_me_refill_skipped_inflight_total(&self) { + self.me_refill_skipped_inflight_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_me_refill_failed_total(&self) { + self.me_refill_failed_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_me_writer_restored_same_endpoint_total(&self) { + self.me_writer_restored_same_endpoint_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_me_writer_restored_fallback_total(&self) { + self.me_writer_restored_fallback_total.fetch_add(1, Ordering::Relaxed); + } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } @@ -195,6 +223,27 @@ impl Stats { pub fn get_pool_stale_pick_total(&self) -> u64 { self.pool_stale_pick_total.load(Ordering::Relaxed) } + pub fn get_me_writer_removed_total(&self) -> u64 { + self.me_writer_removed_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_removed_unexpected_total(&self) -> u64 { + self.me_writer_removed_unexpected_total.load(Ordering::Relaxed) + } + pub fn get_me_refill_triggered_total(&self) -> u64 { + self.me_refill_triggered_total.load(Ordering::Relaxed) + } + pub fn get_me_refill_skipped_inflight_total(&self) -> u64 { + self.me_refill_skipped_inflight_total.load(Ordering::Relaxed) + } + pub fn get_me_refill_failed_total(&self) -> u64 { + self.me_refill_failed_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_restored_same_endpoint_total(&self) -> u64 { + self.me_writer_restored_same_endpoint_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_restored_fallback_total(&self) -> u64 { + self.me_writer_restored_fallback_total.load(Ordering::Relaxed) + } pub fn increment_user_connects(&self, user: &str) { self.user_stats.entry(user.to_string()).or_default() diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index aa14e5b..e5aebe4 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -708,6 +708,7 @@ impl MePool { match self.connect_one(addr, self.rng.as_ref()).await { Ok(()) => { self.stats.increment_me_reconnect_success(); + self.stats.increment_me_writer_restored_same_endpoint_total(); info!( %addr, attempt = attempt + 1, @@ -728,6 +729,7 @@ impl MePool { let dc_endpoints = self.endpoints_for_same_dc(addr).await; if dc_endpoints.is_empty() { + self.stats.increment_me_refill_failed_total(); return false; } @@ -738,6 +740,7 @@ impl MePool { .await { self.stats.increment_me_reconnect_success(); + self.stats.increment_me_writer_restored_fallback_total(); info!( %addr, attempt = attempt + 1, @@ -747,6 +750,7 @@ impl MePool { } } + self.stats.increment_me_refill_failed_total(); false } @@ -756,9 +760,11 @@ impl MePool { { let mut guard = pool.refill_inflight.lock().await; if !guard.insert(addr) { + pool.stats.increment_me_refill_skipped_inflight_total(); return; } } + pool.stats.increment_me_refill_triggered_total(); let restored = pool.refill_writer_after_loss(addr).await; if !restored { @@ -1189,9 +1195,13 @@ impl MePool { if was_draining { self.stats.decrement_pool_drain_active(); } + self.stats.increment_me_writer_removed_total(); w.cancel.cancel(); removed_addr = Some(w.addr); trigger_refill = !was_draining; + if trigger_refill { + self.stats.increment_me_writer_removed_unexpected_total(); + } close_tx = Some(w.tx.clone()); self.conn_count.fetch_sub(1, Ordering::Relaxed); } From 618b7a183797978c555482f82bcb2e77be5be08a Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 25 Feb 2026 02:10:14 +0300 Subject: [PATCH 2/3] ME Pool Beobachter --- src/config/defaults.rs | 12 ++++ src/config/load.rs | 18 +++++ src/config/types.rs | 20 ++++++ src/main.rs | 42 ++++++++++- src/metrics.rs | 84 ++++++++++++++++++---- src/proxy/client.rs | 154 ++++++++++++++++++++++++++++++++++++++--- src/proxy/masking.rs | 12 +++- src/stats/mod.rs | 2 + 8 files changed, 318 insertions(+), 26 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index d43ace9..80fcc07 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -121,6 +121,18 @@ pub(crate) fn default_desync_all_full() -> bool { false } +pub(crate) fn default_beobachten_minutes() -> u64 { + 10 +} + +pub(crate) fn default_beobachten_flush_secs() -> u64 { + 15 +} + +pub(crate) fn default_beobachten_file() -> String { + "cache/beobachten.txt".to_string() +} + pub(crate) fn default_tls_new_session_tickets() -> u8 { 0 } diff --git a/src/config/load.rs b/src/config/load.rs index 5698a71..aab553f 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -153,6 +153,24 @@ impl ProxyConfig { )); } + if config.general.beobachten_minutes == 0 { + return Err(ProxyError::Config( + "general.beobachten_minutes must be > 0".to_string(), + )); + } + + if config.general.beobachten_flush_secs == 0 { + return Err(ProxyError::Config( + "general.beobachten_flush_secs must be > 0".to_string(), + )); + } + + if config.general.beobachten_file.trim().is_empty() { + return Err(ProxyError::Config( + "general.beobachten_file cannot be empty".to_string(), + )); + } + if config.general.me_hardswap_warmup_delay_max_ms == 0 { return Err(ProxyError::Config( "general.me_hardswap_warmup_delay_max_ms must be > 0".to_string(), diff --git a/src/config/types.rs b/src/config/types.rs index 0cda9f4..cfa8d31 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -206,6 +206,22 @@ pub struct GeneralConfig { #[serde(default = "default_desync_all_full")] pub desync_all_full: bool, + /// Enable per-IP forensic observation buckets for scanners and handshake failures. + #[serde(default)] + pub beobachten: bool, + + /// Observation retention window in minutes for per-IP forensic buckets. + #[serde(default = "default_beobachten_minutes")] + pub beobachten_minutes: u64, + + /// Snapshot flush interval in seconds for beob output file. + #[serde(default = "default_beobachten_flush_secs")] + pub beobachten_flush_secs: u64, + + /// Snapshot file path for beob output. + #[serde(default = "default_beobachten_file")] + pub beobachten_file: String, + /// Enable C-like hard-swap for ME pool generations. /// When true, Telemt prewarms a new generation and switches once full coverage is reached. #[serde(default = "default_hardswap")] @@ -383,6 +399,10 @@ impl Default for GeneralConfig { crypto_pending_buffer: default_crypto_pending_buffer(), max_client_frame: default_max_client_frame(), desync_all_full: default_desync_all_full(), + beobachten: false, + beobachten_minutes: default_beobachten_minutes(), + beobachten_flush_secs: default_beobachten_flush_secs(), + beobachten_file: default_beobachten_file(), hardswap: default_hardswap(), fast_mode_min_tls_record: default_fast_mode_min_tls_record(), update_every: Some(default_update_every_secs()), diff --git a/src/main.rs b/src/main.rs index 3bcbf3e..ab524a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,6 +35,7 @@ use crate::crypto::SecureRandom; use crate::ip_tracker::UserIpTracker; use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe}; use crate::proxy::ClientHandler; +use crate::stats::beobachten::BeobachtenStore; use crate::stats::{ReplayChecker, Stats}; use crate::stream::BufferPool; use crate::transport::middle_proxy::{ @@ -159,6 +160,15 @@ fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { info!(target: "telemt::links", "------------------------"); } +async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result<()> { + if let Some(parent) = std::path::Path::new(path).parent() + && !parent.as_os_str().is_empty() + { + tokio::fs::create_dir_all(parent).await?; + } + tokio::fs::write(path, payload).await +} + #[tokio::main] async fn main() -> std::result::Result<(), Box> { let (config_path, cli_silent, cli_log_level) = parse_cli(); @@ -256,6 +266,7 @@ async fn main() -> std::result::Result<(), Box> { let prefer_ipv6 = decision.prefer_ipv6(); let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me); let stats = Arc::new(Stats::new()); + let beobachten = Arc::new(BeobachtenStore::new()); let rng = Arc::new(SecureRandom::new()); // IP Tracker initialization @@ -692,6 +703,26 @@ async fn main() -> std::result::Result<(), Box> { detected_ip_v6, ); + let beobachten_writer = beobachten.clone(); + let config_rx_beobachten = config_rx.clone(); + tokio::spawn(async move { + loop { + let cfg = config_rx_beobachten.borrow().clone(); + let sleep_secs = cfg.general.beobachten_flush_secs.max(1); + + if cfg.general.beobachten { + let ttl = Duration::from_secs(cfg.general.beobachten_minutes.saturating_mul(60)); + let path = cfg.general.beobachten_file.clone(); + let snapshot = beobachten_writer.snapshot_text(ttl); + if let Err(e) = write_beobachten_snapshot(&path, &snapshot).await { + warn!(error = %e, path = %path, "Failed to flush beobachten snapshot"); + } + } + + tokio::time::sleep(Duration::from_secs(sleep_secs)).await; + } + }); + if let Some(ref pool) = me_pool { let pool_clone = pool.clone(); let rng_clone = rng.clone(); @@ -860,6 +891,7 @@ async fn main() -> std::result::Result<(), Box> { let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); + let beobachten = beobachten.clone(); let max_connections_unix = max_connections.clone(); tokio::spawn(async move { @@ -887,6 +919,7 @@ async fn main() -> std::result::Result<(), Box> { let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); + let beobachten = beobachten.clone(); let proxy_protocol_enabled = config.server.proxy_protocol; tokio::spawn(async move { @@ -894,7 +927,7 @@ async fn main() -> std::result::Result<(), Box> { if let Err(e) = crate::proxy::client::handle_client_stream( stream, fake_peer, config, stats, upstream_manager, replay_checker, buffer_pool, rng, - me_pool, tls_cache, ip_tracker, proxy_protocol_enabled, + me_pool, tls_cache, ip_tracker, beobachten, proxy_protocol_enabled, ).await { debug!(error = %e, "Unix socket connection error"); } @@ -942,9 +975,11 @@ async fn main() -> std::result::Result<(), Box> { if let Some(port) = config.server.metrics_port { let stats = stats.clone(); + let beobachten = beobachten.clone(); + let config_rx_metrics = config_rx.clone(); let whitelist = config.server.metrics_whitelist.clone(); tokio::spawn(async move { - metrics::serve(port, stats, whitelist).await; + metrics::serve(port, stats, beobachten, config_rx_metrics, whitelist).await; }); } @@ -958,6 +993,7 @@ async fn main() -> std::result::Result<(), Box> { let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); + let beobachten = beobachten.clone(); let max_connections_tcp = max_connections.clone(); tokio::spawn(async move { @@ -980,6 +1016,7 @@ async fn main() -> std::result::Result<(), Box> { let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); + let beobachten = beobachten.clone(); let proxy_protocol_enabled = listener_proxy_protocol; tokio::spawn(async move { @@ -996,6 +1033,7 @@ async fn main() -> std::result::Result<(), Box> { me_pool, tls_cache, ip_tracker, + beobachten, proxy_protocol_enabled, ) .run() diff --git a/src/metrics.rs b/src/metrics.rs index 0051858..08abb2d 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,6 +1,7 @@ use std::convert::Infallible; use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; use http_body_util::Full; use hyper::body::Bytes; @@ -11,9 +12,17 @@ use ipnetwork::IpNetwork; use tokio::net::TcpListener; use tracing::{info, warn, debug}; +use crate::config::ProxyConfig; +use crate::stats::beobachten::BeobachtenStore; use crate::stats::Stats; -pub async fn serve(port: u16, stats: Arc, whitelist: Vec) { +pub async fn serve( + port: u16, + stats: Arc, + beobachten: Arc, + config_rx: tokio::sync::watch::Receiver>, + whitelist: Vec, +) { let addr = SocketAddr::from(([0, 0, 0, 0], port)); let listener = match TcpListener::bind(addr).await { Ok(l) => l, @@ -22,7 +31,7 @@ pub async fn serve(port: u16, stats: Arc, whitelist: Vec) { return; } }; - info!("Metrics endpoint: http://{}/metrics", addr); + info!("Metrics endpoint: http://{}/metrics and /beobachten", addr); loop { let (stream, peer) = match listener.accept().await { @@ -39,10 +48,14 @@ pub async fn serve(port: u16, stats: Arc, whitelist: Vec) { } let stats = stats.clone(); + let beobachten = beobachten.clone(); + let config_rx_conn = config_rx.clone(); tokio::spawn(async move { let svc = service_fn(move |req| { let stats = stats.clone(); - async move { handle(req, &stats) } + let beobachten = beobachten.clone(); + let config = config_rx_conn.borrow().clone(); + async move { handle(req, &stats, &beobachten, &config) } }); if let Err(e) = http1::Builder::new() .serve_connection(hyper_util::rt::TokioIo::new(stream), svc) @@ -54,24 +67,48 @@ pub async fn serve(port: u16, stats: Arc, whitelist: Vec) { } } -fn handle(req: Request, stats: &Stats) -> Result>, Infallible> { - if req.uri().path() != "/metrics" { +fn handle( + req: Request, + stats: &Stats, + beobachten: &BeobachtenStore, + config: &ProxyConfig, +) -> Result>, Infallible> { + if req.uri().path() == "/metrics" { + let body = render_metrics(stats); let resp = Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Full::new(Bytes::from("Not Found\n"))) + .status(StatusCode::OK) + .header("content-type", "text/plain; version=0.0.4; charset=utf-8") + .body(Full::new(Bytes::from(body))) + .unwrap(); + return Ok(resp); + } + + if req.uri().path() == "/beobachten" { + let body = render_beobachten(beobachten, config); + let resp = Response::builder() + .status(StatusCode::OK) + .header("content-type", "text/plain; charset=utf-8") + .body(Full::new(Bytes::from(body))) .unwrap(); return Ok(resp); } - let body = render_metrics(stats); let resp = Response::builder() - .status(StatusCode::OK) - .header("content-type", "text/plain; version=0.0.4; charset=utf-8") - .body(Full::new(Bytes::from(body))) + .status(StatusCode::NOT_FOUND) + .body(Full::new(Bytes::from("Not Found\n"))) .unwrap(); Ok(resp) } +fn render_beobachten(beobachten: &BeobachtenStore, config: &ProxyConfig) -> String { + if !config.general.beobachten { + return "beobachten disabled\n".to_string(); + } + + let ttl = Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60)); + beobachten.snapshot_text(ttl) +} + fn render_metrics(stats: &Stats) -> String { use std::fmt::Write; let mut out = String::with_capacity(4096); @@ -318,6 +355,7 @@ fn render_metrics(stats: &Stats) -> String { #[cfg(test)] mod tests { use super::*; + use std::net::IpAddr; use http_body_util::BodyExt; #[test] @@ -375,6 +413,8 @@ mod tests { #[tokio::test] async fn test_endpoint_integration() { let stats = Arc::new(Stats::new()); + let beobachten = Arc::new(BeobachtenStore::new()); + let mut config = ProxyConfig::default(); stats.increment_connects_all(); stats.increment_connects_all(); stats.increment_connects_all(); @@ -383,16 +423,34 @@ mod tests { .uri("/metrics") .body(()) .unwrap(); - let resp = handle(req, &stats).unwrap(); + let resp = handle(req, &stats, &beobachten, &config).unwrap(); assert_eq!(resp.status(), StatusCode::OK); let body = resp.into_body().collect().await.unwrap().to_bytes(); assert!(std::str::from_utf8(body.as_ref()).unwrap().contains("telemt_connections_total 3")); + config.general.beobachten = true; + config.general.beobachten_minutes = 10; + beobachten.record( + "TLS-scanner", + "203.0.113.10".parse::().unwrap(), + Duration::from_secs(600), + ); + let req_beob = Request::builder() + .uri("/beobachten") + .body(()) + .unwrap(); + let resp_beob = handle(req_beob, &stats, &beobachten, &config).unwrap(); + assert_eq!(resp_beob.status(), StatusCode::OK); + let body_beob = resp_beob.into_body().collect().await.unwrap().to_bytes(); + let beob_text = std::str::from_utf8(body_beob.as_ref()).unwrap(); + assert!(beob_text.contains("[TLS-scanner]")); + assert!(beob_text.contains("203.0.113.10-1")); + let req404 = Request::builder() .uri("/other") .body(()) .unwrap(); - let resp404 = handle(req404, &stats).unwrap(); + let resp404 = handle(req404, &stats, &beobachten, &config).unwrap(); assert_eq!(resp404.status(), StatusCode::NOT_FOUND); } } diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 483f6e0..c598023 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -1,7 +1,7 @@ //! Client Handler use std::future::Future; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -27,6 +27,7 @@ use crate::error::{HandshakeResult, ProxyError, Result}; use crate::ip_tracker::UserIpTracker; use crate::protocol::constants::*; use crate::protocol::tls; +use crate::stats::beobachten::BeobachtenStore; use crate::stats::{ReplayChecker, Stats}; use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; use crate::transport::middle_proxy::MePool; @@ -39,6 +40,36 @@ use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle use crate::proxy::masking::handle_bad_client; use crate::proxy::middle_relay::handle_via_middle_proxy; +fn beobachten_ttl(config: &ProxyConfig) -> Duration { + Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60)) +} + +fn record_beobachten_class( + beobachten: &BeobachtenStore, + config: &ProxyConfig, + peer_ip: IpAddr, + class: &str, +) { + if !config.general.beobachten { + return; + } + beobachten.record(class, peer_ip, beobachten_ttl(config)); +} + +fn record_handshake_failure_class( + beobachten: &BeobachtenStore, + config: &ProxyConfig, + peer_ip: IpAddr, + error: &ProxyError, +) { + let class = if error.to_string().contains("expected 64 bytes, got 0") { + "expected_64_got_0" + } else { + "other" + }; + record_beobachten_class(beobachten, config, peer_ip, class); +} + pub async fn handle_client_stream( mut stream: S, peer: SocketAddr, @@ -51,6 +82,7 @@ pub async fn handle_client_stream( me_pool: Option>, tls_cache: Option>, ip_tracker: Arc, + beobachten: Arc, proxy_protocol_enabled: bool, ) -> Result<()> where @@ -73,6 +105,7 @@ where Err(e) => { stats.increment_connects_bad(); warn!(peer = %peer, error = %e, "Invalid PROXY protocol header"); + record_beobachten_class(&beobachten, &config, peer.ip(), "other"); return Err(e); } } @@ -82,6 +115,9 @@ where let handshake_timeout = Duration::from_secs(config.timeouts.client_handshake); let stats_for_timeout = stats.clone(); + let config_for_timeout = config.clone(); + let beobachten_for_timeout = beobachten.clone(); + let peer_for_timeout = real_peer.ip(); // For non-TCP streams, use a synthetic local address let local_addr: SocketAddr = format!("0.0.0.0:{}", config.server.port) @@ -103,7 +139,15 @@ where debug!(peer = %real_peer, tls_len = tls_len, "TLS handshake too short"); stats.increment_connects_bad(); let (reader, writer) = tokio::io::split(stream); - handle_bad_client(reader, writer, &first_bytes, &config).await; + handle_bad_client( + reader, + writer, + &first_bytes, + real_peer.ip(), + &config, + &beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } @@ -120,7 +164,15 @@ where HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { stats.increment_connects_bad(); - handle_bad_client(reader, writer, &handshake, &config).await; + handle_bad_client( + reader, + writer, + &handshake, + real_peer.ip(), + &config, + &beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } HandshakeResult::Error(e) => return Err(e), @@ -156,7 +208,15 @@ where debug!(peer = %real_peer, "Non-TLS modes disabled"); stats.increment_connects_bad(); let (reader, writer) = tokio::io::split(stream); - handle_bad_client(reader, writer, &first_bytes, &config).await; + handle_bad_client( + reader, + writer, + &first_bytes, + real_peer.ip(), + &config, + &beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } @@ -173,7 +233,15 @@ where HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { stats.increment_connects_bad(); - handle_bad_client(reader, writer, &handshake, &config).await; + handle_bad_client( + reader, + writer, + &handshake, + real_peer.ip(), + &config, + &beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } HandshakeResult::Error(e) => return Err(e), @@ -200,11 +268,23 @@ where Ok(Ok(outcome)) => outcome, Ok(Err(e)) => { debug!(peer = %peer, error = %e, "Handshake failed"); + record_handshake_failure_class( + &beobachten_for_timeout, + &config_for_timeout, + peer_for_timeout, + &e, + ); return Err(e); } Err(_) => { stats_for_timeout.increment_handshake_timeouts(); debug!(peer = %peer, "Handshake timeout"); + record_beobachten_class( + &beobachten_for_timeout, + &config_for_timeout, + peer_for_timeout, + "other", + ); return Err(ProxyError::TgHandshakeTimeout); } }; @@ -230,6 +310,7 @@ pub struct RunningClientHandler { me_pool: Option>, tls_cache: Option>, ip_tracker: Arc, + beobachten: Arc, proxy_protocol_enabled: bool, } @@ -246,6 +327,7 @@ impl ClientHandler { me_pool: Option>, tls_cache: Option>, ip_tracker: Arc, + beobachten: Arc, proxy_protocol_enabled: bool, ) -> RunningClientHandler { RunningClientHandler { @@ -260,6 +342,7 @@ impl ClientHandler { me_pool, tls_cache, ip_tracker, + beobachten, proxy_protocol_enabled, } } @@ -284,17 +367,32 @@ impl RunningClientHandler { let handshake_timeout = Duration::from_secs(self.config.timeouts.client_handshake); let stats = self.stats.clone(); + let config_for_timeout = self.config.clone(); + let beobachten_for_timeout = self.beobachten.clone(); + let peer_for_timeout = peer.ip(); // Phase 1: handshake (with timeout) let outcome = match timeout(handshake_timeout, self.do_handshake()).await { Ok(Ok(outcome)) => outcome, Ok(Err(e)) => { debug!(peer = %peer, error = %e, "Handshake failed"); + record_handshake_failure_class( + &beobachten_for_timeout, + &config_for_timeout, + peer_for_timeout, + &e, + ); return Err(e); } Err(_) => { stats.increment_handshake_timeouts(); debug!(peer = %peer, "Handshake timeout"); + record_beobachten_class( + &beobachten_for_timeout, + &config_for_timeout, + peer_for_timeout, + "other", + ); return Err(ProxyError::TgHandshakeTimeout); } }; @@ -321,6 +419,12 @@ impl RunningClientHandler { Err(e) => { self.stats.increment_connects_bad(); warn!(peer = %self.peer, error = %e, "Invalid PROXY protocol header"); + record_beobachten_class( + &self.beobachten, + &self.config, + self.peer.ip(), + "other", + ); return Err(e); } } @@ -354,7 +458,15 @@ impl RunningClientHandler { debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short"); self.stats.increment_connects_bad(); let (reader, writer) = self.stream.into_split(); - handle_bad_client(reader, writer, &first_bytes, &self.config).await; + handle_bad_client( + reader, + writer, + &first_bytes, + peer.ip(), + &self.config, + &self.beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } @@ -385,7 +497,15 @@ impl RunningClientHandler { HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { stats.increment_connects_bad(); - handle_bad_client(reader, writer, &handshake, &config).await; + handle_bad_client( + reader, + writer, + &handshake, + peer.ip(), + &config, + &self.beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } HandshakeResult::Error(e) => return Err(e), @@ -446,7 +566,15 @@ impl RunningClientHandler { debug!(peer = %peer, "Non-TLS modes disabled"); self.stats.increment_connects_bad(); let (reader, writer) = self.stream.into_split(); - handle_bad_client(reader, writer, &first_bytes, &self.config).await; + handle_bad_client( + reader, + writer, + &first_bytes, + peer.ip(), + &self.config, + &self.beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } @@ -476,7 +604,15 @@ impl RunningClientHandler { HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { stats.increment_connects_bad(); - handle_bad_client(reader, writer, &handshake, &config).await; + handle_bad_client( + reader, + writer, + &handshake, + peer.ip(), + &config, + &self.beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } HandshakeResult::Error(e) => return Err(e), diff --git a/src/proxy/masking.rs b/src/proxy/masking.rs index 72175fe..cdb6cf9 100644 --- a/src/proxy/masking.rs +++ b/src/proxy/masking.rs @@ -1,6 +1,7 @@ //! Masking - forward unrecognized traffic to mask host use std::str; +use std::net::IpAddr; use std::time::Duration; use tokio::net::TcpStream; #[cfg(unix)] @@ -9,6 +10,7 @@ use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt}; use tokio::time::timeout; use tracing::debug; use crate::config::ProxyConfig; +use crate::stats::beobachten::BeobachtenStore; const MASK_TIMEOUT: Duration = Duration::from_secs(5); /// Maximum duration for the entire masking relay. @@ -50,20 +52,26 @@ pub async fn handle_bad_client( reader: R, writer: W, initial_data: &[u8], + peer_ip: IpAddr, config: &ProxyConfig, + beobachten: &BeobachtenStore, ) where R: AsyncRead + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static, { + let client_type = detect_client_type(initial_data); + if config.general.beobachten { + let ttl = Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60)); + beobachten.record(client_type, peer_ip, ttl); + } + if !config.censorship.mask { // Masking disabled, just consume data consume_client_data(reader).await; return; } - let client_type = detect_client_type(initial_data); - // Connect via Unix socket or TCP #[cfg(unix)] if let Some(ref sock_path) = config.censorship.mask_unix_sock { diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 5f4c98e..1e32bb7 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -2,6 +2,8 @@ #![allow(dead_code)] +pub mod beobachten; + use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Instant, Duration}; use dashmap::DashMap; From 6b8619d3c91e309897032243d55da9e5120b0fef Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 25 Feb 2026 02:17:48 +0300 Subject: [PATCH 3/3] Create beobachten.rs --- src/stats/beobachten.rs | 117 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 src/stats/beobachten.rs diff --git a/src/stats/beobachten.rs b/src/stats/beobachten.rs new file mode 100644 index 0000000..2e87fcc --- /dev/null +++ b/src/stats/beobachten.rs @@ -0,0 +1,117 @@ +//! Per-IP forensic buckets for scanner and handshake failure observation. + +use std::collections::{BTreeMap, HashMap}; +use std::net::IpAddr; +use std::time::{Duration, Instant}; + +use parking_lot::Mutex; + +const CLEANUP_INTERVAL: Duration = Duration::from_secs(30); + +#[derive(Default)] +struct BeobachtenInner { + entries: HashMap<(String, IpAddr), BeobachtenEntry>, + last_cleanup: Option, +} + +#[derive(Clone, Copy)] +struct BeobachtenEntry { + tries: u64, + last_seen: Instant, +} + +/// In-memory, TTL-scoped per-IP counters keyed by source class. +pub struct BeobachtenStore { + inner: Mutex, +} + +impl Default for BeobachtenStore { + fn default() -> Self { + Self::new() + } +} + +impl BeobachtenStore { + pub fn new() -> Self { + Self { + inner: Mutex::new(BeobachtenInner::default()), + } + } + + pub fn record(&self, class: &str, ip: IpAddr, ttl: Duration) { + if class.is_empty() || ttl.is_zero() { + return; + } + + let now = Instant::now(); + let mut guard = self.inner.lock(); + Self::cleanup_if_needed(&mut guard, now, ttl); + + let key = (class.to_string(), ip); + let entry = guard.entries.entry(key).or_insert(BeobachtenEntry { + tries: 0, + last_seen: now, + }); + entry.tries = entry.tries.saturating_add(1); + entry.last_seen = now; + } + + pub fn snapshot_text(&self, ttl: Duration) -> String { + if ttl.is_zero() { + return "beobachten disabled\n".to_string(); + } + + let now = Instant::now(); + let mut guard = self.inner.lock(); + Self::cleanup(&mut guard, now, ttl); + guard.last_cleanup = Some(now); + + let mut grouped = BTreeMap::>::new(); + for ((class, ip), entry) in &guard.entries { + grouped + .entry(class.clone()) + .or_default() + .push((*ip, entry.tries)); + } + + if grouped.is_empty() { + return "empty\n".to_string(); + } + + let mut out = String::with_capacity(grouped.len() * 64); + for (class, entries) in &mut grouped { + out.push('['); + out.push_str(class); + out.push_str("]\n"); + + entries.sort_by(|(ip_a, tries_a), (ip_b, tries_b)| { + tries_b + .cmp(tries_a) + .then_with(|| ip_a.to_string().cmp(&ip_b.to_string())) + }); + + for (ip, tries) in entries { + out.push_str(&format!("{ip}-{tries}\n")); + } + } + + out + } + + fn cleanup_if_needed(inner: &mut BeobachtenInner, now: Instant, ttl: Duration) { + let should_cleanup = match inner.last_cleanup { + Some(last) => now.saturating_duration_since(last) >= CLEANUP_INTERVAL, + None => true, + }; + if should_cleanup { + Self::cleanup(inner, now, ttl); + inner.last_cleanup = Some(now); + } + } + + fn cleanup(inner: &mut BeobachtenInner, now: Instant, ttl: Duration) { + inner.entries.retain(|_, entry| { + now.saturating_duration_since(entry.last_seen) <= ttl + }); + } +}