diff --git a/src/config/defaults.rs b/src/config/defaults.rs index d43ace9..80fcc07 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -121,6 +121,18 @@ pub(crate) fn default_desync_all_full() -> bool { false } +pub(crate) fn default_beobachten_minutes() -> u64 { + 10 +} + +pub(crate) fn default_beobachten_flush_secs() -> u64 { + 15 +} + +pub(crate) fn default_beobachten_file() -> String { + "cache/beobachten.txt".to_string() +} + pub(crate) fn default_tls_new_session_tickets() -> u8 { 0 } diff --git a/src/config/load.rs b/src/config/load.rs index 5698a71..aab553f 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -153,6 +153,24 @@ impl ProxyConfig { )); } + if config.general.beobachten_minutes == 0 { + return Err(ProxyError::Config( + "general.beobachten_minutes must be > 0".to_string(), + )); + } + + if config.general.beobachten_flush_secs == 0 { + return Err(ProxyError::Config( + "general.beobachten_flush_secs must be > 0".to_string(), + )); + } + + if config.general.beobachten_file.trim().is_empty() { + return Err(ProxyError::Config( + "general.beobachten_file cannot be empty".to_string(), + )); + } + if config.general.me_hardswap_warmup_delay_max_ms == 0 { return Err(ProxyError::Config( "general.me_hardswap_warmup_delay_max_ms must be > 0".to_string(), diff --git a/src/config/types.rs b/src/config/types.rs index 0cda9f4..cfa8d31 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -206,6 +206,22 @@ pub struct GeneralConfig { #[serde(default = "default_desync_all_full")] pub desync_all_full: bool, + /// Enable per-IP forensic observation buckets for scanners and handshake failures. + #[serde(default)] + pub beobachten: bool, + + /// Observation retention window in minutes for per-IP forensic buckets. + #[serde(default = "default_beobachten_minutes")] + pub beobachten_minutes: u64, + + /// Snapshot flush interval in seconds for beob output file. + #[serde(default = "default_beobachten_flush_secs")] + pub beobachten_flush_secs: u64, + + /// Snapshot file path for beob output. + #[serde(default = "default_beobachten_file")] + pub beobachten_file: String, + /// Enable C-like hard-swap for ME pool generations. /// When true, Telemt prewarms a new generation and switches once full coverage is reached. #[serde(default = "default_hardswap")] @@ -383,6 +399,10 @@ impl Default for GeneralConfig { crypto_pending_buffer: default_crypto_pending_buffer(), max_client_frame: default_max_client_frame(), desync_all_full: default_desync_all_full(), + beobachten: false, + beobachten_minutes: default_beobachten_minutes(), + beobachten_flush_secs: default_beobachten_flush_secs(), + beobachten_file: default_beobachten_file(), hardswap: default_hardswap(), fast_mode_min_tls_record: default_fast_mode_min_tls_record(), update_every: Some(default_update_every_secs()), diff --git a/src/main.rs b/src/main.rs index 3bcbf3e..ab524a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,6 +35,7 @@ use crate::crypto::SecureRandom; use crate::ip_tracker::UserIpTracker; use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe}; use crate::proxy::ClientHandler; +use crate::stats::beobachten::BeobachtenStore; use crate::stats::{ReplayChecker, Stats}; use crate::stream::BufferPool; use crate::transport::middle_proxy::{ @@ -159,6 +160,15 @@ fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { info!(target: "telemt::links", "------------------------"); } +async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result<()> { + if let Some(parent) = std::path::Path::new(path).parent() + && !parent.as_os_str().is_empty() + { + tokio::fs::create_dir_all(parent).await?; + } + tokio::fs::write(path, payload).await +} + #[tokio::main] async fn main() -> std::result::Result<(), Box> { let (config_path, cli_silent, cli_log_level) = parse_cli(); @@ -256,6 +266,7 @@ async fn main() -> std::result::Result<(), Box> { let prefer_ipv6 = decision.prefer_ipv6(); let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me); let stats = Arc::new(Stats::new()); + let beobachten = Arc::new(BeobachtenStore::new()); let rng = Arc::new(SecureRandom::new()); // IP Tracker initialization @@ -692,6 +703,26 @@ async fn main() -> std::result::Result<(), Box> { detected_ip_v6, ); + let beobachten_writer = beobachten.clone(); + let config_rx_beobachten = config_rx.clone(); + tokio::spawn(async move { + loop { + let cfg = config_rx_beobachten.borrow().clone(); + let sleep_secs = cfg.general.beobachten_flush_secs.max(1); + + if cfg.general.beobachten { + let ttl = Duration::from_secs(cfg.general.beobachten_minutes.saturating_mul(60)); + let path = cfg.general.beobachten_file.clone(); + let snapshot = beobachten_writer.snapshot_text(ttl); + if let Err(e) = write_beobachten_snapshot(&path, &snapshot).await { + warn!(error = %e, path = %path, "Failed to flush beobachten snapshot"); + } + } + + tokio::time::sleep(Duration::from_secs(sleep_secs)).await; + } + }); + if let Some(ref pool) = me_pool { let pool_clone = pool.clone(); let rng_clone = rng.clone(); @@ -860,6 +891,7 @@ async fn main() -> std::result::Result<(), Box> { let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); + let beobachten = beobachten.clone(); let max_connections_unix = max_connections.clone(); tokio::spawn(async move { @@ -887,6 +919,7 @@ async fn main() -> std::result::Result<(), Box> { let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); + let beobachten = beobachten.clone(); let proxy_protocol_enabled = config.server.proxy_protocol; tokio::spawn(async move { @@ -894,7 +927,7 @@ async fn main() -> std::result::Result<(), Box> { if let Err(e) = crate::proxy::client::handle_client_stream( stream, fake_peer, config, stats, upstream_manager, replay_checker, buffer_pool, rng, - me_pool, tls_cache, ip_tracker, proxy_protocol_enabled, + me_pool, tls_cache, ip_tracker, beobachten, proxy_protocol_enabled, ).await { debug!(error = %e, "Unix socket connection error"); } @@ -942,9 +975,11 @@ async fn main() -> std::result::Result<(), Box> { if let Some(port) = config.server.metrics_port { let stats = stats.clone(); + let beobachten = beobachten.clone(); + let config_rx_metrics = config_rx.clone(); let whitelist = config.server.metrics_whitelist.clone(); tokio::spawn(async move { - metrics::serve(port, stats, whitelist).await; + metrics::serve(port, stats, beobachten, config_rx_metrics, whitelist).await; }); } @@ -958,6 +993,7 @@ async fn main() -> std::result::Result<(), Box> { let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); + let beobachten = beobachten.clone(); let max_connections_tcp = max_connections.clone(); tokio::spawn(async move { @@ -980,6 +1016,7 @@ async fn main() -> std::result::Result<(), Box> { let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); + let beobachten = beobachten.clone(); let proxy_protocol_enabled = listener_proxy_protocol; tokio::spawn(async move { @@ -996,6 +1033,7 @@ async fn main() -> std::result::Result<(), Box> { me_pool, tls_cache, ip_tracker, + beobachten, proxy_protocol_enabled, ) .run() diff --git a/src/metrics.rs b/src/metrics.rs index 0051858..08abb2d 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,6 +1,7 @@ use std::convert::Infallible; use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; use http_body_util::Full; use hyper::body::Bytes; @@ -11,9 +12,17 @@ use ipnetwork::IpNetwork; use tokio::net::TcpListener; use tracing::{info, warn, debug}; +use crate::config::ProxyConfig; +use crate::stats::beobachten::BeobachtenStore; use crate::stats::Stats; -pub async fn serve(port: u16, stats: Arc, whitelist: Vec) { +pub async fn serve( + port: u16, + stats: Arc, + beobachten: Arc, + config_rx: tokio::sync::watch::Receiver>, + whitelist: Vec, +) { let addr = SocketAddr::from(([0, 0, 0, 0], port)); let listener = match TcpListener::bind(addr).await { Ok(l) => l, @@ -22,7 +31,7 @@ pub async fn serve(port: u16, stats: Arc, whitelist: Vec) { return; } }; - info!("Metrics endpoint: http://{}/metrics", addr); + info!("Metrics endpoint: http://{}/metrics and /beobachten", addr); loop { let (stream, peer) = match listener.accept().await { @@ -39,10 +48,14 @@ pub async fn serve(port: u16, stats: Arc, whitelist: Vec) { } let stats = stats.clone(); + let beobachten = beobachten.clone(); + let config_rx_conn = config_rx.clone(); tokio::spawn(async move { let svc = service_fn(move |req| { let stats = stats.clone(); - async move { handle(req, &stats) } + let beobachten = beobachten.clone(); + let config = config_rx_conn.borrow().clone(); + async move { handle(req, &stats, &beobachten, &config) } }); if let Err(e) = http1::Builder::new() .serve_connection(hyper_util::rt::TokioIo::new(stream), svc) @@ -54,24 +67,48 @@ pub async fn serve(port: u16, stats: Arc, whitelist: Vec) { } } -fn handle(req: Request, stats: &Stats) -> Result>, Infallible> { - if req.uri().path() != "/metrics" { +fn handle( + req: Request, + stats: &Stats, + beobachten: &BeobachtenStore, + config: &ProxyConfig, +) -> Result>, Infallible> { + if req.uri().path() == "/metrics" { + let body = render_metrics(stats); let resp = Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Full::new(Bytes::from("Not Found\n"))) + .status(StatusCode::OK) + .header("content-type", "text/plain; version=0.0.4; charset=utf-8") + .body(Full::new(Bytes::from(body))) + .unwrap(); + return Ok(resp); + } + + if req.uri().path() == "/beobachten" { + let body = render_beobachten(beobachten, config); + let resp = Response::builder() + .status(StatusCode::OK) + .header("content-type", "text/plain; charset=utf-8") + .body(Full::new(Bytes::from(body))) .unwrap(); return Ok(resp); } - let body = render_metrics(stats); let resp = Response::builder() - .status(StatusCode::OK) - .header("content-type", "text/plain; version=0.0.4; charset=utf-8") - .body(Full::new(Bytes::from(body))) + .status(StatusCode::NOT_FOUND) + .body(Full::new(Bytes::from("Not Found\n"))) .unwrap(); Ok(resp) } +fn render_beobachten(beobachten: &BeobachtenStore, config: &ProxyConfig) -> String { + if !config.general.beobachten { + return "beobachten disabled\n".to_string(); + } + + let ttl = Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60)); + beobachten.snapshot_text(ttl) +} + fn render_metrics(stats: &Stats) -> String { use std::fmt::Write; let mut out = String::with_capacity(4096); @@ -318,6 +355,7 @@ fn render_metrics(stats: &Stats) -> String { #[cfg(test)] mod tests { use super::*; + use std::net::IpAddr; use http_body_util::BodyExt; #[test] @@ -375,6 +413,8 @@ mod tests { #[tokio::test] async fn test_endpoint_integration() { let stats = Arc::new(Stats::new()); + let beobachten = Arc::new(BeobachtenStore::new()); + let mut config = ProxyConfig::default(); stats.increment_connects_all(); stats.increment_connects_all(); stats.increment_connects_all(); @@ -383,16 +423,34 @@ mod tests { .uri("/metrics") .body(()) .unwrap(); - let resp = handle(req, &stats).unwrap(); + let resp = handle(req, &stats, &beobachten, &config).unwrap(); assert_eq!(resp.status(), StatusCode::OK); let body = resp.into_body().collect().await.unwrap().to_bytes(); assert!(std::str::from_utf8(body.as_ref()).unwrap().contains("telemt_connections_total 3")); + config.general.beobachten = true; + config.general.beobachten_minutes = 10; + beobachten.record( + "TLS-scanner", + "203.0.113.10".parse::().unwrap(), + Duration::from_secs(600), + ); + let req_beob = Request::builder() + .uri("/beobachten") + .body(()) + .unwrap(); + let resp_beob = handle(req_beob, &stats, &beobachten, &config).unwrap(); + assert_eq!(resp_beob.status(), StatusCode::OK); + let body_beob = resp_beob.into_body().collect().await.unwrap().to_bytes(); + let beob_text = std::str::from_utf8(body_beob.as_ref()).unwrap(); + assert!(beob_text.contains("[TLS-scanner]")); + assert!(beob_text.contains("203.0.113.10-1")); + let req404 = Request::builder() .uri("/other") .body(()) .unwrap(); - let resp404 = handle(req404, &stats).unwrap(); + let resp404 = handle(req404, &stats, &beobachten, &config).unwrap(); assert_eq!(resp404.status(), StatusCode::NOT_FOUND); } } diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 483f6e0..c598023 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -1,7 +1,7 @@ //! Client Handler use std::future::Future; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -27,6 +27,7 @@ use crate::error::{HandshakeResult, ProxyError, Result}; use crate::ip_tracker::UserIpTracker; use crate::protocol::constants::*; use crate::protocol::tls; +use crate::stats::beobachten::BeobachtenStore; use crate::stats::{ReplayChecker, Stats}; use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; use crate::transport::middle_proxy::MePool; @@ -39,6 +40,36 @@ use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle use crate::proxy::masking::handle_bad_client; use crate::proxy::middle_relay::handle_via_middle_proxy; +fn beobachten_ttl(config: &ProxyConfig) -> Duration { + Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60)) +} + +fn record_beobachten_class( + beobachten: &BeobachtenStore, + config: &ProxyConfig, + peer_ip: IpAddr, + class: &str, +) { + if !config.general.beobachten { + return; + } + beobachten.record(class, peer_ip, beobachten_ttl(config)); +} + +fn record_handshake_failure_class( + beobachten: &BeobachtenStore, + config: &ProxyConfig, + peer_ip: IpAddr, + error: &ProxyError, +) { + let class = if error.to_string().contains("expected 64 bytes, got 0") { + "expected_64_got_0" + } else { + "other" + }; + record_beobachten_class(beobachten, config, peer_ip, class); +} + pub async fn handle_client_stream( mut stream: S, peer: SocketAddr, @@ -51,6 +82,7 @@ pub async fn handle_client_stream( me_pool: Option>, tls_cache: Option>, ip_tracker: Arc, + beobachten: Arc, proxy_protocol_enabled: bool, ) -> Result<()> where @@ -73,6 +105,7 @@ where Err(e) => { stats.increment_connects_bad(); warn!(peer = %peer, error = %e, "Invalid PROXY protocol header"); + record_beobachten_class(&beobachten, &config, peer.ip(), "other"); return Err(e); } } @@ -82,6 +115,9 @@ where let handshake_timeout = Duration::from_secs(config.timeouts.client_handshake); let stats_for_timeout = stats.clone(); + let config_for_timeout = config.clone(); + let beobachten_for_timeout = beobachten.clone(); + let peer_for_timeout = real_peer.ip(); // For non-TCP streams, use a synthetic local address let local_addr: SocketAddr = format!("0.0.0.0:{}", config.server.port) @@ -103,7 +139,15 @@ where debug!(peer = %real_peer, tls_len = tls_len, "TLS handshake too short"); stats.increment_connects_bad(); let (reader, writer) = tokio::io::split(stream); - handle_bad_client(reader, writer, &first_bytes, &config).await; + handle_bad_client( + reader, + writer, + &first_bytes, + real_peer.ip(), + &config, + &beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } @@ -120,7 +164,15 @@ where HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { stats.increment_connects_bad(); - handle_bad_client(reader, writer, &handshake, &config).await; + handle_bad_client( + reader, + writer, + &handshake, + real_peer.ip(), + &config, + &beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } HandshakeResult::Error(e) => return Err(e), @@ -156,7 +208,15 @@ where debug!(peer = %real_peer, "Non-TLS modes disabled"); stats.increment_connects_bad(); let (reader, writer) = tokio::io::split(stream); - handle_bad_client(reader, writer, &first_bytes, &config).await; + handle_bad_client( + reader, + writer, + &first_bytes, + real_peer.ip(), + &config, + &beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } @@ -173,7 +233,15 @@ where HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { stats.increment_connects_bad(); - handle_bad_client(reader, writer, &handshake, &config).await; + handle_bad_client( + reader, + writer, + &handshake, + real_peer.ip(), + &config, + &beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } HandshakeResult::Error(e) => return Err(e), @@ -200,11 +268,23 @@ where Ok(Ok(outcome)) => outcome, Ok(Err(e)) => { debug!(peer = %peer, error = %e, "Handshake failed"); + record_handshake_failure_class( + &beobachten_for_timeout, + &config_for_timeout, + peer_for_timeout, + &e, + ); return Err(e); } Err(_) => { stats_for_timeout.increment_handshake_timeouts(); debug!(peer = %peer, "Handshake timeout"); + record_beobachten_class( + &beobachten_for_timeout, + &config_for_timeout, + peer_for_timeout, + "other", + ); return Err(ProxyError::TgHandshakeTimeout); } }; @@ -230,6 +310,7 @@ pub struct RunningClientHandler { me_pool: Option>, tls_cache: Option>, ip_tracker: Arc, + beobachten: Arc, proxy_protocol_enabled: bool, } @@ -246,6 +327,7 @@ impl ClientHandler { me_pool: Option>, tls_cache: Option>, ip_tracker: Arc, + beobachten: Arc, proxy_protocol_enabled: bool, ) -> RunningClientHandler { RunningClientHandler { @@ -260,6 +342,7 @@ impl ClientHandler { me_pool, tls_cache, ip_tracker, + beobachten, proxy_protocol_enabled, } } @@ -284,17 +367,32 @@ impl RunningClientHandler { let handshake_timeout = Duration::from_secs(self.config.timeouts.client_handshake); let stats = self.stats.clone(); + let config_for_timeout = self.config.clone(); + let beobachten_for_timeout = self.beobachten.clone(); + let peer_for_timeout = peer.ip(); // Phase 1: handshake (with timeout) let outcome = match timeout(handshake_timeout, self.do_handshake()).await { Ok(Ok(outcome)) => outcome, Ok(Err(e)) => { debug!(peer = %peer, error = %e, "Handshake failed"); + record_handshake_failure_class( + &beobachten_for_timeout, + &config_for_timeout, + peer_for_timeout, + &e, + ); return Err(e); } Err(_) => { stats.increment_handshake_timeouts(); debug!(peer = %peer, "Handshake timeout"); + record_beobachten_class( + &beobachten_for_timeout, + &config_for_timeout, + peer_for_timeout, + "other", + ); return Err(ProxyError::TgHandshakeTimeout); } }; @@ -321,6 +419,12 @@ impl RunningClientHandler { Err(e) => { self.stats.increment_connects_bad(); warn!(peer = %self.peer, error = %e, "Invalid PROXY protocol header"); + record_beobachten_class( + &self.beobachten, + &self.config, + self.peer.ip(), + "other", + ); return Err(e); } } @@ -354,7 +458,15 @@ impl RunningClientHandler { debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short"); self.stats.increment_connects_bad(); let (reader, writer) = self.stream.into_split(); - handle_bad_client(reader, writer, &first_bytes, &self.config).await; + handle_bad_client( + reader, + writer, + &first_bytes, + peer.ip(), + &self.config, + &self.beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } @@ -385,7 +497,15 @@ impl RunningClientHandler { HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { stats.increment_connects_bad(); - handle_bad_client(reader, writer, &handshake, &config).await; + handle_bad_client( + reader, + writer, + &handshake, + peer.ip(), + &config, + &self.beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } HandshakeResult::Error(e) => return Err(e), @@ -446,7 +566,15 @@ impl RunningClientHandler { debug!(peer = %peer, "Non-TLS modes disabled"); self.stats.increment_connects_bad(); let (reader, writer) = self.stream.into_split(); - handle_bad_client(reader, writer, &first_bytes, &self.config).await; + handle_bad_client( + reader, + writer, + &first_bytes, + peer.ip(), + &self.config, + &self.beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } @@ -476,7 +604,15 @@ impl RunningClientHandler { HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { stats.increment_connects_bad(); - handle_bad_client(reader, writer, &handshake, &config).await; + handle_bad_client( + reader, + writer, + &handshake, + peer.ip(), + &config, + &self.beobachten, + ) + .await; return Ok(HandshakeOutcome::Handled); } HandshakeResult::Error(e) => return Err(e), diff --git a/src/proxy/masking.rs b/src/proxy/masking.rs index 72175fe..cdb6cf9 100644 --- a/src/proxy/masking.rs +++ b/src/proxy/masking.rs @@ -1,6 +1,7 @@ //! Masking - forward unrecognized traffic to mask host use std::str; +use std::net::IpAddr; use std::time::Duration; use tokio::net::TcpStream; #[cfg(unix)] @@ -9,6 +10,7 @@ use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt}; use tokio::time::timeout; use tracing::debug; use crate::config::ProxyConfig; +use crate::stats::beobachten::BeobachtenStore; const MASK_TIMEOUT: Duration = Duration::from_secs(5); /// Maximum duration for the entire masking relay. @@ -50,20 +52,26 @@ pub async fn handle_bad_client( reader: R, writer: W, initial_data: &[u8], + peer_ip: IpAddr, config: &ProxyConfig, + beobachten: &BeobachtenStore, ) where R: AsyncRead + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static, { + let client_type = detect_client_type(initial_data); + if config.general.beobachten { + let ttl = Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60)); + beobachten.record(client_type, peer_ip, ttl); + } + if !config.censorship.mask { // Masking disabled, just consume data consume_client_data(reader).await; return; } - let client_type = detect_client_type(initial_data); - // Connect via Unix socket or TCP #[cfg(unix)] if let Some(ref sock_path) = config.censorship.mask_unix_sock { diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 5f4c98e..1e32bb7 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -2,6 +2,8 @@ #![allow(dead_code)] +pub mod beobachten; + use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Instant, Duration}; use dashmap::DashMap;