diff --git a/Cargo.lock b/Cargo.lock index 2e18868..8ede4c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2791,7 +2791,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" [[package]] name = "telemt" -version = "3.4.5" +version = "3.4.6" dependencies = [ "aes", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 6146466..8983d48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.4.5" +version = "3.4.6" edition = "2024" [features] diff --git a/src/api/mod.rs b/src/api/mod.rs index 46bdc10..f33a89b 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -41,7 +41,7 @@ use config_store::{current_revision, load_config_from_disk, parse_if_match}; use events::ApiEventStore; use http_utils::{error_response, read_json, read_optional_json, success_response}; use model::{ - ApiFailure, CreateUserRequest, DeleteUserResponse, HealthData, HealthReadyData, + ApiFailure, ClassCount, CreateUserRequest, DeleteUserResponse, HealthData, HealthReadyData, PatchUserRequest, RotateSecretRequest, SummaryData, UserActiveIps, }; use runtime_edge::{ @@ -334,10 +334,24 @@ async fn handle( } ("GET", "/v1/stats/summary") => { let revision = current_revision(&shared.config_path).await?; + let connections_bad_by_class = shared + .stats + .get_connects_bad_class_counts() + .into_iter() + .map(|(class, total)| ClassCount { class, total }) + .collect(); + let handshake_failures_by_class = shared + .stats + .get_handshake_failure_class_counts() + .into_iter() + .map(|(class, total)| ClassCount { class, total }) + .collect(); let data = SummaryData { uptime_seconds: shared.stats.uptime_secs(), connections_total: shared.stats.get_connects_all(), connections_bad_total: shared.stats.get_connects_bad(), + connections_bad_by_class, + handshake_failures_by_class, handshake_timeouts_total: shared.stats.get_handshake_timeouts(), configured_users: cfg.access.users.len(), }; diff --git a/src/api/model.rs b/src/api/model.rs index c6e24ea..fa1f063 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -71,11 +71,19 @@ pub(super) struct HealthReadyData { pub(super) total_upstreams: usize, } +#[derive(Serialize, Clone)] +pub(super) struct ClassCount { + pub(super) class: String, + pub(super) total: u64, +} + #[derive(Serialize)] pub(super) struct SummaryData { pub(super) uptime_seconds: f64, pub(super) connections_total: u64, pub(super) connections_bad_total: u64, + pub(super) connections_bad_by_class: Vec, + pub(super) handshake_failures_by_class: Vec, pub(super) handshake_timeouts_total: u64, pub(super) configured_users: usize, } @@ -91,6 +99,8 @@ pub(super) struct ZeroCoreData { pub(super) uptime_seconds: f64, pub(super) connections_total: u64, pub(super) connections_bad_total: u64, + pub(super) connections_bad_by_class: Vec, + pub(super) handshake_failures_by_class: Vec, pub(super) handshake_timeouts_total: u64, pub(super) accept_permit_timeout_total: u64, pub(super) configured_users: usize, diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index 131acef..6fd9bb6 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -7,8 +7,8 @@ use crate::transport::upstream::IpPreference; use super::ApiShared; use super::model::{ - DcEndpointWriters, DcStatus, DcStatusData, MeWriterStatus, MeWritersData, MeWritersSummary, - MinimalAllData, MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData, + ClassCount, DcEndpointWriters, DcStatus, DcStatusData, MeWriterStatus, MeWritersData, + MeWritersSummary, MinimalAllData, MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData, MinimalQuarantineData, UpstreamDcStatus, UpstreamStatus, UpstreamSummaryData, UpstreamsData, ZeroAllData, ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData, ZeroUpstreamData, @@ -26,6 +26,16 @@ pub(crate) struct MinimalCacheEntry { pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> ZeroAllData { let telemetry = stats.telemetry_policy(); + let bad_connection_classes = stats + .get_connects_bad_class_counts() + .into_iter() + .map(|(class, total)| ClassCount { class, total }) + .collect(); + let handshake_failure_classes = stats + .get_handshake_failure_class_counts() + .into_iter() + .map(|(class, total)| ClassCount { class, total }) + .collect(); let handshake_error_codes = stats .get_me_handshake_error_code_counts() .into_iter() @@ -38,6 +48,8 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer uptime_seconds: stats.uptime_secs(), connections_total: stats.get_connects_all(), connections_bad_total: stats.get_connects_bad(), + connections_bad_by_class: bad_connection_classes, + handshake_failures_by_class: handshake_failure_classes, handshake_timeouts_total: stats.get_handshake_timeouts(), accept_permit_timeout_total: stats.get_accept_permit_timeout_total(), configured_users, diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 67fea54..2d4dd42 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -324,38 +324,38 @@ fn record_beobachten_class( beobachten.record(class, peer_ip, beobachten_ttl(config)); } +fn classify_expected_64_got_0(kind: std::io::ErrorKind) -> Option<&'static str> { + match kind { + std::io::ErrorKind::UnexpectedEof => Some("expected_64_got_0_unexpected_eof"), + std::io::ErrorKind::ConnectionReset => Some("expected_64_got_0_connection_reset"), + std::io::ErrorKind::ConnectionAborted => Some("expected_64_got_0_connection_aborted"), + std::io::ErrorKind::BrokenPipe => Some("expected_64_got_0_broken_pipe"), + std::io::ErrorKind::NotConnected => Some("expected_64_got_0_not_connected"), + _ => None, + } +} + +fn classify_handshake_failure_class(error: &ProxyError) -> &'static str { + match error { + ProxyError::Io(err) => classify_expected_64_got_0(err.kind()).unwrap_or("other"), + ProxyError::Stream(StreamError::UnexpectedEof) => "expected_64_got_0_unexpected_eof", + ProxyError::Stream(StreamError::Io(err)) => { + classify_expected_64_got_0(err.kind()).unwrap_or("other") + } + _ => "other", + } +} + fn record_handshake_failure_class( beobachten: &BeobachtenStore, config: &ProxyConfig, peer_ip: IpAddr, error: &ProxyError, ) { - let class = match error { - ProxyError::Io(err) - if matches!( - err.kind(), - std::io::ErrorKind::UnexpectedEof - | std::io::ErrorKind::ConnectionReset - | std::io::ErrorKind::ConnectionAborted - | std::io::ErrorKind::BrokenPipe - | std::io::ErrorKind::NotConnected - ) => - { - "expected_64_got_0" - } - ProxyError::Stream(StreamError::UnexpectedEof) => "expected_64_got_0", - ProxyError::Stream(StreamError::Io(err)) - if matches!( - err.kind(), - std::io::ErrorKind::UnexpectedEof - | std::io::ErrorKind::ConnectionReset - | std::io::ErrorKind::ConnectionAborted - | std::io::ErrorKind::BrokenPipe - | std::io::ErrorKind::NotConnected - ) => - { - "expected_64_got_0" - } + // Keep beobachten buckets stable while detailed per-kind classification + // is tracked in API counters. + let class = match classify_handshake_failure_class(error) { + value if value.starts_with("expected_64_got_0_") => "expected_64_got_0", _ => "other", }; record_beobachten_class(beobachten, config, peer_ip, class); @@ -364,7 +364,7 @@ fn record_handshake_failure_class( #[inline] fn increment_bad_on_unknown_tls_sni(stats: &Stats, error: &ProxyError) { if matches!(error, ProxyError::UnknownTlsSni) { - stats.increment_connects_bad(); + stats.increment_connects_bad_with_class("unknown_tls_sni"); } } @@ -465,7 +465,7 @@ where Ok(Ok(info)) => { if !is_trusted_proxy_source(peer.ip(), &config.server.proxy_protocol_trusted_cidrs) { - stats.increment_connects_bad(); + stats.increment_connects_bad_with_class("proxy_protocol_untrusted"); warn!( peer = %peer, trusted = ?config.server.proxy_protocol_trusted_cidrs, @@ -486,13 +486,13 @@ where } } Ok(Err(e)) => { - stats.increment_connects_bad(); + stats.increment_connects_bad_with_class("proxy_protocol_invalid_header"); warn!(peer = %peer, error = %e, "Invalid PROXY protocol header"); record_beobachten_class(&beobachten, &config, peer.ip(), "other"); return Err(e); } Err(_) => { - stats.increment_connects_bad(); + stats.increment_connects_bad_with_class("proxy_protocol_header_timeout"); warn!(peer = %peer, timeout_ms = proxy_header_timeout.as_millis(), "PROXY protocol header timeout"); record_beobachten_class(&beobachten, &config, peer.ip(), "other"); return Err(ProxyError::InvalidProxyProtocol); @@ -582,7 +582,7 @@ where // third-party clients or future Telegram versions. if !tls_clienthello_len_in_bounds(tls_len) { debug!(peer = %real_peer, tls_len = tls_len, max_tls_len = MAX_TLS_PLAINTEXT_SIZE, "TLS handshake length out of bounds"); - stats.increment_connects_bad(); + stats.increment_connects_bad_with_class("tls_clienthello_len_out_of_bounds"); maybe_apply_mask_reject_delay(&config).await; let (reader, writer) = tokio::io::split(stream); return Ok(masking_outcome( @@ -602,7 +602,7 @@ where Ok(n) => n, Err(e) => { debug!(peer = %real_peer, error = %e, tls_len = tls_len, "TLS ClientHello body read failed; engaging masking fallback"); - stats.increment_connects_bad(); + stats.increment_connects_bad_with_class("tls_clienthello_read_error"); maybe_apply_mask_reject_delay(&config).await; let initial_len = 5; let (reader, writer) = tokio::io::split(stream); @@ -620,7 +620,7 @@ where if body_read < tls_len { debug!(peer = %real_peer, got = body_read, expected = tls_len, "Truncated in-range TLS ClientHello; engaging masking fallback"); - stats.increment_connects_bad(); + stats.increment_connects_bad_with_class("tls_clienthello_truncated"); maybe_apply_mask_reject_delay(&config).await; let initial_len = 5 + body_read; let (reader, writer) = tokio::io::split(stream); @@ -644,7 +644,7 @@ where ).await { HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { - stats.increment_connects_bad(); + stats.increment_connects_bad_with_class("tls_handshake_bad_client"); return Ok(masking_outcome( reader, writer, @@ -684,7 +684,7 @@ where wrap_tls_application_record(&pending_plaintext) }; let reader = tokio::io::AsyncReadExt::chain(std::io::Cursor::new(pending_record), reader); - stats.increment_connects_bad(); + stats.increment_connects_bad_with_class("tls_mtproto_bad_client"); debug!( peer = %peer, "Authenticated TLS session failed MTProto validation; engaging masking fallback" @@ -714,7 +714,7 @@ where } else { if !config.general.modes.classic && !config.general.modes.secure { debug!(peer = %real_peer, "Non-TLS modes disabled"); - stats.increment_connects_bad(); + stats.increment_connects_bad_with_class("direct_modes_disabled"); maybe_apply_mask_reject_delay(&config).await; let (reader, writer) = tokio::io::split(stream); return Ok(masking_outcome( @@ -741,7 +741,7 @@ where ).await { HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { - stats.increment_connects_bad(); + stats.increment_connects_bad_with_class("direct_mtproto_bad_client"); return Ok(masking_outcome( reader, writer, @@ -778,6 +778,7 @@ where Ok(Ok(outcome)) => outcome, Ok(Err(e)) => { debug!(peer = %peer, error = %e, "Handshake failed"); + stats_for_timeout.increment_handshake_failure_class(classify_handshake_failure_class(&e)); record_handshake_failure_class( &beobachten_for_timeout, &config_for_timeout, @@ -788,6 +789,7 @@ where } Err(_) => { stats_for_timeout.increment_handshake_timeouts(); + stats_for_timeout.increment_handshake_failure_class("timeout"); debug!(peer = %peer, "Handshake timeout"); record_beobachten_class( &beobachten_for_timeout, @@ -977,7 +979,8 @@ impl RunningClientHandler { self.peer.ip(), &self.config.server.proxy_protocol_trusted_cidrs, ) { - self.stats.increment_connects_bad(); + self.stats + .increment_connects_bad_with_class("proxy_protocol_untrusted"); warn!( peer = %self.peer, trusted = ?self.config.server.proxy_protocol_trusted_cidrs, @@ -1007,7 +1010,8 @@ impl RunningClientHandler { } } Ok(Err(e)) => { - self.stats.increment_connects_bad(); + self.stats + .increment_connects_bad_with_class("proxy_protocol_invalid_header"); warn!(peer = %self.peer, error = %e, "Invalid PROXY protocol header"); record_beobachten_class( &self.beobachten, @@ -1018,7 +1022,8 @@ impl RunningClientHandler { return Err(e); } Err(_) => { - self.stats.increment_connects_bad(); + self.stats + .increment_connects_bad_with_class("proxy_protocol_header_timeout"); warn!( peer = %self.peer, timeout_ms = proxy_header_timeout.as_millis(), @@ -1116,6 +1121,7 @@ impl RunningClientHandler { Ok(Ok(outcome)) => outcome, Ok(Err(e)) => { debug!(peer = %peer_for_log, error = %e, "Handshake failed"); + stats.increment_handshake_failure_class(classify_handshake_failure_class(&e)); record_handshake_failure_class( &beobachten_for_timeout, &config_for_timeout, @@ -1126,6 +1132,7 @@ impl RunningClientHandler { } Err(_) => { stats.increment_handshake_timeouts(); + stats.increment_handshake_failure_class("timeout"); debug!(peer = %peer_for_log, "Handshake timeout"); record_beobachten_class( &beobachten_for_timeout, @@ -1161,7 +1168,8 @@ impl RunningClientHandler { // third-party clients or future Telegram versions. if !tls_clienthello_len_in_bounds(tls_len) { debug!(peer = %peer, tls_len = tls_len, max_tls_len = MAX_TLS_PLAINTEXT_SIZE, "TLS handshake length out of bounds"); - self.stats.increment_connects_bad(); + self.stats + .increment_connects_bad_with_class("tls_clienthello_len_out_of_bounds"); maybe_apply_mask_reject_delay(&self.config).await; let (reader, writer) = self.stream.into_split(); return Ok(masking_outcome( @@ -1181,7 +1189,8 @@ impl RunningClientHandler { Ok(n) => n, Err(e) => { debug!(peer = %peer, error = %e, tls_len = tls_len, "TLS ClientHello body read failed; engaging masking fallback"); - self.stats.increment_connects_bad(); + self.stats + .increment_connects_bad_with_class("tls_clienthello_read_error"); maybe_apply_mask_reject_delay(&self.config).await; let (reader, writer) = self.stream.into_split(); return Ok(masking_outcome( @@ -1198,7 +1207,8 @@ impl RunningClientHandler { if body_read < tls_len { debug!(peer = %peer, got = body_read, expected = tls_len, "Truncated in-range TLS ClientHello; engaging masking fallback"); - self.stats.increment_connects_bad(); + self.stats + .increment_connects_bad_with_class("tls_clienthello_truncated"); maybe_apply_mask_reject_delay(&self.config).await; let initial_len = 5 + body_read; let (reader, writer) = self.stream.into_split(); @@ -1235,7 +1245,7 @@ impl RunningClientHandler { { HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { - stats.increment_connects_bad(); + stats.increment_connects_bad_with_class("tls_handshake_bad_client"); return Ok(masking_outcome( reader, writer, @@ -1285,7 +1295,7 @@ impl RunningClientHandler { }; let reader = tokio::io::AsyncReadExt::chain(std::io::Cursor::new(pending_record), reader); - stats.increment_connects_bad(); + stats.increment_connects_bad_with_class("tls_mtproto_bad_client"); debug!( peer = %peer, "Authenticated TLS session failed MTProto validation; engaging masking fallback" @@ -1332,7 +1342,8 @@ impl RunningClientHandler { if !self.config.general.modes.classic && !self.config.general.modes.secure { debug!(peer = %peer, "Non-TLS modes disabled"); - self.stats.increment_connects_bad(); + self.stats + .increment_connects_bad_with_class("direct_modes_disabled"); maybe_apply_mask_reject_delay(&self.config).await; let (reader, writer) = self.stream.into_split(); return Ok(masking_outcome( @@ -1372,7 +1383,7 @@ impl RunningClientHandler { { HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { - stats.increment_connects_bad(); + stats.increment_connects_bad_with_class("direct_mtproto_bad_client"); return Ok(masking_outcome( reader, writer, diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 9609e19..9678f2a 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -88,6 +88,8 @@ impl Drop for RouteConnectionLease { pub struct Stats { connects_all: AtomicU64, connects_bad: AtomicU64, + connects_bad_classes: DashMap<&'static str, AtomicU64>, + handshake_failure_classes: DashMap<&'static str, AtomicU64>, current_connections_direct: AtomicU64, current_connections_me: AtomicU64, handshake_timeouts: AtomicU64, @@ -518,10 +520,32 @@ impl Stats { self.connects_all.fetch_add(1, Ordering::Relaxed); } } - pub fn increment_connects_bad(&self) { - if self.telemetry_core_enabled() { - self.connects_bad.fetch_add(1, Ordering::Relaxed); + + pub fn increment_connects_bad_with_class(&self, class: &'static str) { + if !self.telemetry_core_enabled() { + return; } + self.connects_bad.fetch_add(1, Ordering::Relaxed); + let entry = self + .connects_bad_classes + .entry(class) + .or_insert_with(|| AtomicU64::new(0)); + entry.fetch_add(1, Ordering::Relaxed); + } + + pub fn increment_connects_bad(&self) { + self.increment_connects_bad_with_class("other"); + } + + pub fn increment_handshake_failure_class(&self, class: &'static str) { + if !self.telemetry_core_enabled() { + return; + } + let entry = self + .handshake_failure_classes + .entry(class) + .or_insert_with(|| AtomicU64::new(0)); + entry.fetch_add(1, Ordering::Relaxed); } pub fn increment_current_connections_direct(&self) { self.current_connections_direct @@ -1640,6 +1664,37 @@ impl Stats { pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } + + pub fn get_connects_bad_class_counts(&self) -> Vec<(String, u64)> { + let mut out: Vec<(String, u64)> = self + .connects_bad_classes + .iter() + .map(|entry| { + ( + entry.key().to_string(), + entry.value().load(Ordering::Relaxed), + ) + }) + .collect(); + out.sort_by(|a, b| a.0.cmp(&b.0)); + out + } + + pub fn get_handshake_failure_class_counts(&self) -> Vec<(String, u64)> { + let mut out: Vec<(String, u64)> = self + .handshake_failure_classes + .iter() + .map(|entry| { + ( + entry.key().to_string(), + entry.value().load(Ordering::Relaxed), + ) + }) + .collect(); + out.sort_by(|a, b| a.0.cmp(&b.0)); + out + } + pub fn get_accept_permit_timeout_total(&self) -> u64 { self.accept_permit_timeout_total.load(Ordering::Relaxed) } diff --git a/src/tls_front/fetcher.rs b/src/tls_front/fetcher.rs index 5a1ca91..53ec803 100644 --- a/src/tls_front/fetcher.rs +++ b/src/tls_front/fetcher.rs @@ -436,7 +436,10 @@ fn build_client_hello( let session_id = if session_id_len == 0 { Vec::new() } else if deterministic { - deterministic_bytes(&format!("tls-fetch-session:{sni}:{}", profile.as_str()), session_id_len) + deterministic_bytes( + &format!("tls-fetch-session:{sni}:{}", profile.as_str()), + session_id_len, + ) } else { rng.bytes(session_id_len) }; @@ -1480,17 +1483,13 @@ mod tests { out } - async fn capture_rustls_client_hello_record(alpn_protocols: &'static [&'static [u8]]) -> Vec { + async fn capture_rustls_client_hello_record( + alpn_protocols: &'static [&'static [u8]], + ) -> Vec { let (client, mut server) = tokio::io::duplex(32 * 1024); let fetch_task = tokio::spawn(async move { - fetch_via_rustls_stream( - client, - "example.com", - "example.com", - None, - alpn_protocols, - ) - .await + fetch_via_rustls_stream(client, "example.com", "example.com", None, alpn_protocols) + .await }); let mut header = [0u8; 5]; @@ -1507,7 +1506,10 @@ mod tests { drop(server); let result = fetch_task.await.expect("fetch task must join"); - assert!(result.is_err(), "capture task should end with handshake error"); + assert!( + result.is_err(), + "capture task should end with handshake error" + ); let mut record = Vec::with_capacity(5 + body_len); record.extend_from_slice(&header); @@ -1685,14 +1687,20 @@ mod tests { true, ); let parsed = parse_client_hello_for_test(&hello); - assert_eq!(parsed.session_id.len(), 32, "modern chrome must use non-empty session id"); + assert_eq!( + parsed.session_id.len(), + 32, + "modern chrome must use non-empty session id" + ); let extension_ids = parsed .extensions .iter() .map(|(ext_type, _)| *ext_type) .collect::>(); - let expected_prefix = [0x0000, 0x000b, 0x000a, 0x0023, 0x000d, 0x002b, 0x002d, 0x0033, 0x0010]; + let expected_prefix = [ + 0x0000, 0x000b, 0x000a, 0x0023, 0x000d, 0x002b, 0x002d, 0x0033, 0x0010, + ]; assert!( extension_ids.as_slice().starts_with(&expected_prefix), "unexpected extension order: {extension_ids:?}" @@ -1713,13 +1721,20 @@ mod tests { "key_share payload is too short" ); let entry_len = u16::from_be_bytes([key_share_data[0], key_share_data[1]]) as usize; - assert_eq!(entry_len, key_share_data.len() - 2, "key_share list length mismatch"); + assert_eq!( + entry_len, + key_share_data.len() - 2, + "key_share list length mismatch" + ); let group = u16::from_be_bytes([key_share_data[2], key_share_data[3]]); let key_len = u16::from_be_bytes([key_share_data[4], key_share_data[5]]) as usize; let key = &key_share_data[6..6 + key_len]; assert_eq!(group, 0x001d, "key_share group must be x25519"); assert_eq!(key_len, 32, "x25519 key length must be 32"); - assert!(key.iter().any(|b| *b != 0), "x25519 key must not be all zero"); + assert!( + key.iter().any(|b| *b != 0), + "x25519 key must not be all zero" + ); } #[test]