diff --git a/src/api/mod.rs b/src/api/mod.rs index 7a03346..4239b59 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -55,7 +55,7 @@ use model::{ use patch::Patch; use runtime_edge::{ EdgeConnectionsCacheEntry, build_runtime_connections_summary_data, - build_runtime_events_recent_data, + build_runtime_events_recent_data, build_runtime_tls_fingerprints_data, }; use runtime_init::build_runtime_initialization_data; use runtime_min::{ @@ -169,6 +169,7 @@ fn allowed_methods_for_path(path: &str) -> Option<&'static str> { | "/v1/runtime/me-selftest" | "/v1/runtime/connections/summary" | "/v1/runtime/events/recent" + | "/v1/runtime/tls-fingerprints" | "/v1/stats/users/active-ips" | "/v1/stats/users/quota" | "/v1/stats/users" => Some(ALLOW_GET), @@ -540,6 +541,15 @@ async fn handle( ); Ok(success_response(StatusCode::OK, data, revision)) } + ("GET", "/v1/runtime/tls-fingerprints") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_tls_fingerprints_data( + shared.as_ref(), + cfg.as_ref(), + query.as_deref(), + ); + Ok(success_response(StatusCode::OK, data, revision)) + } ("GET", "/v1/stats/users/active-ips") => { let revision = current_revision(&shared.config_path).await?; let usernames: Vec<_> = cfg.access.users.keys().cloned().collect(); diff --git a/src/api/runtime_edge.rs b/src/api/runtime_edge.rs index b61f504..639dbe0 100644 --- a/src/api/runtime_edge.rs +++ b/src/api/runtime_edge.rs @@ -12,6 +12,8 @@ const FEATURE_DISABLED_REASON: &str = "feature_disabled"; const SOURCE_UNAVAILABLE_REASON: &str = "source_unavailable"; const EVENTS_DEFAULT_LIMIT: usize = 50; const EVENTS_MAX_LIMIT: usize = 1000; +const TLS_FINGERPRINTS_MAX_LIMIT: usize = 1000; +const RUNTIME_EDGE_RETENTION_MAX_MINUTES: u64 = 24 * 60; #[derive(Clone, Serialize)] pub(super) struct RuntimeEdgeConnectionUserData { @@ -90,6 +92,44 @@ pub(super) struct RuntimeEdgeEventsData { pub(super) data: Option, } +#[derive(Serialize)] +pub(super) struct RuntimeEdgeTlsFingerprintRow { + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) scope: Option, + pub(super) ja3: String, + pub(super) ja3_raw: String, + pub(super) ja4: String, + pub(super) ja4_raw: String, + pub(super) total: u64, + pub(super) auth_success: u64, + pub(super) bad_or_probe: u64, + pub(super) first_seen_epoch_secs: u64, + pub(super) last_seen_epoch_secs: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeEdgeTlsFingerprintsPayload { + pub(super) limit: usize, + pub(super) retention_secs: u64, + pub(super) capacity: usize, + pub(super) dropped_total: u64, + pub(super) parse_error_total: u64, + pub(super) by_fingerprint: Vec, + pub(super) by_ip: Vec, + pub(super) by_cidr: Vec, + pub(super) by_user: Vec, +} + +#[derive(Serialize)] +pub(super) struct RuntimeEdgeTlsFingerprintsData { + pub(super) enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) reason: Option<&'static str>, + pub(super) generated_at_epoch_secs: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) data: Option, +} + pub(super) async fn build_runtime_connections_summary_data( shared: &ApiShared, cfg: &ProxyConfig, @@ -162,6 +202,65 @@ pub(super) fn build_runtime_events_recent_data( } } +pub(super) fn build_runtime_tls_fingerprints_data( + shared: &ApiShared, + cfg: &ProxyConfig, + query: Option<&str>, +) -> RuntimeEdgeTlsFingerprintsData { + let now_epoch_secs = now_epoch_secs(); + let api_cfg = &cfg.server.api; + if !api_cfg.runtime_edge_enabled { + return RuntimeEdgeTlsFingerprintsData { + enabled: false, + reason: Some(FEATURE_DISABLED_REASON), + generated_at_epoch_secs: now_epoch_secs, + data: None, + }; + } + + let limit = parse_recent_events_limit( + query, + api_cfg.runtime_edge_top_n.max(1), + TLS_FINGERPRINTS_MAX_LIMIT, + ); + let snapshot = shared + .stats + .tls_fingerprint_snapshot(runtime_edge_retention(cfg), limit); + + RuntimeEdgeTlsFingerprintsData { + enabled: true, + reason: None, + generated_at_epoch_secs: now_epoch_secs, + data: Some(RuntimeEdgeTlsFingerprintsPayload { + limit, + retention_secs: snapshot.retention_secs, + capacity: snapshot.capacity, + dropped_total: snapshot.dropped_total, + parse_error_total: snapshot.parse_error_total, + by_fingerprint: snapshot + .by_fingerprint + .into_iter() + .map(runtime_tls_fingerprint_row) + .collect(), + by_ip: snapshot + .by_ip + .into_iter() + .map(runtime_tls_fingerprint_row) + .collect(), + by_cidr: snapshot + .by_cidr + .into_iter() + .map(runtime_tls_fingerprint_row) + .collect(), + by_user: snapshot + .by_user + .into_iter() + .map(runtime_tls_fingerprint_row) + .collect(), + }), + } +} + async fn get_connections_payload_cached( shared: &ApiShared, cache_ttl_ms: u64, @@ -286,6 +385,35 @@ fn parse_recent_events_limit(query: Option<&str>, default_limit: usize, max_limi default_limit } +fn runtime_edge_retention(cfg: &ProxyConfig) -> Duration { + let minutes = cfg + .general + .beobachten_minutes + .clamp(1, RUNTIME_EDGE_RETENTION_MAX_MINUTES); + Duration::from_secs(minutes.saturating_mul(60)) +} + +fn runtime_tls_fingerprint_row( + row: crate::stats::TlsFingerprintSnapshotRow, +) -> RuntimeEdgeTlsFingerprintRow { + RuntimeEdgeTlsFingerprintRow { + scope: if row.scope_key.is_empty() { + None + } else { + Some(row.scope_key) + }, + ja3: row.ja3, + ja3_raw: row.ja3_raw, + ja4: row.ja4, + ja4_raw: row.ja4_raw, + total: row.total, + auth_success: row.auth_success, + bad_or_probe: row.bad_or_probe, + first_seen_epoch_secs: row.first_seen_epoch_secs, + last_seen_epoch_secs: row.last_seen_epoch_secs, + } +} + fn now_epoch_secs() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/src/metrics.rs b/src/metrics.rs index 36981d3..b4baad0 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -288,7 +288,7 @@ async fn handle( } if req.uri().path() == "/beobachten" { - let body = render_beobachten(beobachten, config); + let body = render_beobachten(stats, beobachten, config); let resp = Response::builder() .status(StatusCode::OK) .header("content-type", "text/plain; charset=utf-8") @@ -304,13 +304,22 @@ async fn handle( Ok(resp) } -fn render_beobachten(beobachten: &BeobachtenStore, config: &ProxyConfig) -> String { +fn render_beobachten(stats: &Stats, beobachten: &BeobachtenStore, config: &ProxyConfig) -> String { if !config.general.beobachten { return "beobachten disabled\n".to_string(); } let ttl = Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60)); - beobachten.snapshot_text(ttl) + let mut body = beobachten.snapshot_text(ttl); + let tls_text = stats.tls_fingerprint_snapshot_text(ttl, 20); + if !tls_text.is_empty() { + if !body.ends_with('\n') { + body.push('\n'); + } + body.push('\n'); + body.push_str(&tls_text); + } + body } fn tls_front_domains(config: &ProxyConfig) -> Vec { diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index f0b3a1a..9ffff7c 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -4,6 +4,7 @@ pub mod constants; pub mod frame; pub mod obfuscation; pub mod tls; +pub mod tls_fingerprint; #[allow(unused_imports)] pub use constants::*; @@ -13,3 +14,5 @@ pub use frame::*; pub use obfuscation::*; #[allow(unused_imports)] pub use tls::*; +#[allow(unused_imports)] +pub use tls_fingerprint::*; diff --git a/src/protocol/tls_fingerprint.rs b/src/protocol/tls_fingerprint.rs new file mode 100644 index 0000000..4598d45 --- /dev/null +++ b/src/protocol/tls_fingerprint.rs @@ -0,0 +1,450 @@ +//! Passive JA3 / JA4 TLS ClientHello fingerprinting. + +use crate::crypto::hash::md5; +use crate::crypto::sha256; +use crate::protocol::constants::TLS_RECORD_HANDSHAKE; + +const EXT_SNI: u16 = 0x0000; +const EXT_SUPPORTED_GROUPS: u16 = 0x000a; +const EXT_EC_POINT_FORMATS: u16 = 0x000b; +const EXT_SIGNATURE_ALGORITHMS: u16 = 0x000d; +const EXT_ALPN: u16 = 0x0010; +const EXT_SUPPORTED_VERSIONS: u16 = 0x002b; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct TlsClientFingerprint { + pub ja3: String, + pub ja3_raw: String, + pub ja4: String, + pub ja4_raw: String, +} + +#[derive(Default)] +struct ParsedClientHello { + legacy_version: u16, + ciphers: Vec, + extensions: Vec, + supported_groups: Vec, + ec_point_formats: Vec, + signature_algorithms: Vec, + supported_versions: Vec, + alpn_first: Option>, + sni_present: bool, +} + +pub fn fingerprint_client_hello(handshake: &[u8]) -> Option { + let parsed = parse_client_hello(handshake)?; + let ja3_raw = ja3_raw(&parsed); + let ja3 = hex::encode(md5(ja3_raw.as_bytes())); + let (ja4, ja4_raw) = ja4(&parsed); + + Some(TlsClientFingerprint { + ja3, + ja3_raw, + ja4, + ja4_raw, + }) +} + +fn parse_client_hello(handshake: &[u8]) -> Option { + if handshake.len() < 5 || handshake[0] != TLS_RECORD_HANDSHAKE { + return None; + } + + let record_len = read_u16_at(handshake, 3)? as usize; + let record_end = 5usize.checked_add(record_len)?; + if record_end > handshake.len() { + return None; + } + + let mut pos = 5usize; + if *handshake.get(pos)? != 0x01 { + return None; + } + pos = pos.checked_add(1)?; + + if pos + 3 > record_end { + return None; + } + let handshake_len = ((usize::from(handshake[pos])) << 16) + | ((usize::from(handshake[pos + 1])) << 8) + | usize::from(handshake[pos + 2]); + pos = pos.checked_add(3)?; + let handshake_end = pos.checked_add(handshake_len)?; + if handshake_end > record_end { + return None; + } + + if pos + 2 + 32 > handshake_end { + return None; + } + let legacy_version = read_u16_at(handshake, pos)?; + pos = pos.checked_add(2 + 32)?; + + let session_id_len = usize::from(*handshake.get(pos)?); + pos = pos.checked_add(1)?.checked_add(session_id_len)?; + if pos + 2 > handshake_end { + return None; + } + + let cipher_len = read_u16_at(handshake, pos)? as usize; + pos = pos.checked_add(2)?; + let cipher_end = pos.checked_add(cipher_len)?; + if cipher_end > handshake_end || cipher_len % 2 != 0 { + return None; + } + let mut ciphers = Vec::with_capacity(cipher_len / 2); + while pos + 1 < cipher_end { + let value = read_u16_at(handshake, pos)?; + if !is_grease(value) { + ciphers.push(value); + } + pos = pos.checked_add(2)?; + } + + let comp_len = usize::from(*handshake.get(pos)?); + pos = pos.checked_add(1)?.checked_add(comp_len)?; + if pos > handshake_end { + return None; + } + + let mut parsed = ParsedClientHello { + legacy_version, + ciphers, + ..ParsedClientHello::default() + }; + + if pos == handshake_end { + return Some(parsed); + } + if pos + 2 > handshake_end { + return None; + } + + let ext_len = read_u16_at(handshake, pos)? as usize; + pos = pos.checked_add(2)?; + let ext_end = pos.checked_add(ext_len)?; + if ext_end > handshake_end { + return None; + } + + while pos + 4 <= ext_end { + let etype = read_u16_at(handshake, pos)?; + let elen = read_u16_at(handshake, pos + 2)? as usize; + pos = pos.checked_add(4)?; + let data_end = pos.checked_add(elen)?; + if data_end > ext_end { + return None; + } + let data = handshake.get(pos..data_end)?; + + if !is_grease(etype) { + parsed.extensions.push(etype); + match etype { + EXT_SNI => parsed.sni_present = true, + EXT_SUPPORTED_GROUPS => { + parsed.supported_groups = parse_u16_vector(data, 2)?; + } + EXT_EC_POINT_FORMATS => { + parsed.ec_point_formats = parse_u8_vector(data)?; + } + EXT_SIGNATURE_ALGORITHMS => { + parsed.signature_algorithms = parse_u16_vector(data, 2)?; + } + EXT_ALPN => { + parsed.alpn_first = parse_alpn_first(data)?; + } + EXT_SUPPORTED_VERSIONS => { + parsed.supported_versions = parse_u16_vector(data, 1)?; + } + _ => {} + } + } + + pos = data_end; + } + + if pos != ext_end { + return None; + } + + Some(parsed) +} + +fn parse_u16_vector(data: &[u8], len_prefix_len: usize) -> Option> { + let (list_len, mut pos) = match len_prefix_len { + 1 => (usize::from(*data.first()?), 1usize), + 2 => (read_u16_at(data, 0)? as usize, 2usize), + _ => return None, + }; + let list_end = pos.checked_add(list_len)?; + if list_end > data.len() || list_len % 2 != 0 { + return None; + } + + let mut out = Vec::with_capacity(list_len / 2); + while pos + 1 < list_end { + let value = read_u16_at(data, pos)?; + if !is_grease(value) { + out.push(value); + } + pos = pos.checked_add(2)?; + } + Some(out) +} + +fn parse_u8_vector(data: &[u8]) -> Option> { + let list_len = usize::from(*data.first()?); + let list_start = 1usize; + let list_end = list_start.checked_add(list_len)?; + if list_end > data.len() { + return None; + } + Some(data.get(list_start..list_end)?.to_vec()) +} + +fn parse_alpn_first(data: &[u8]) -> Option>> { + if data.len() < 2 { + return None; + } + let list_len = read_u16_at(data, 0)? as usize; + let mut pos = 2usize; + let list_end = pos.checked_add(list_len)?; + if list_end > data.len() { + return None; + } + if pos == list_end { + return Some(None); + } + + let protocol_len = usize::from(*data.get(pos)?); + pos = pos.checked_add(1)?; + let protocol_end = pos.checked_add(protocol_len)?; + if protocol_end > list_end { + return None; + } + if protocol_len == 0 { + return Some(None); + } + Some(Some(data.get(pos..protocol_end)?.to_vec())) +} + +fn ja3_raw(parsed: &ParsedClientHello) -> String { + format!( + "{},{},{},{},{}", + parsed.legacy_version, + join_decimal_u16(&parsed.ciphers), + join_decimal_u16(&parsed.extensions), + join_decimal_u16(&parsed.supported_groups), + join_decimal_u8(&parsed.ec_point_formats) + ) +} + +fn ja4(parsed: &ParsedClientHello) -> (String, String) { + let a = format!( + "t{}{}{:02}{:02}{}", + ja4_version_code(parsed), + if parsed.sni_present { "d" } else { "i" }, + count_ja4(parsed.ciphers.len()), + count_ja4(parsed.extensions.len()), + ja4_alpn_marker(parsed.alpn_first.as_deref()) + ); + + let mut ciphers = parsed.ciphers.clone(); + ciphers.sort_unstable(); + let cipher_raw = join_hex_u16(&ciphers); + let cipher_hash = if ciphers.is_empty() { + "000000000000".to_string() + } else { + sha256_truncated_12(&cipher_raw) + }; + + let mut extensions_for_hash = parsed + .extensions + .iter() + .copied() + .filter(|value| *value != EXT_SNI && *value != EXT_ALPN) + .collect::>(); + extensions_for_hash.sort_unstable(); + let extension_raw = join_hex_u16(&extensions_for_hash); + let signature_raw = join_hex_u16(&parsed.signature_algorithms); + let extension_hash_input = if signature_raw.is_empty() { + extension_raw.clone() + } else { + format!("{extension_raw}_{signature_raw}") + }; + let extension_hash = if extensions_for_hash.is_empty() { + "000000000000".to_string() + } else { + sha256_truncated_12(&extension_hash_input) + }; + + ( + format!("{a}_{cipher_hash}_{extension_hash}"), + format!("{a}_{cipher_raw}_{extension_hash_input}"), + ) +} + +fn ja4_version_code(parsed: &ParsedClientHello) -> &'static str { + let version = parsed + .supported_versions + .iter() + .copied() + .max() + .unwrap_or(parsed.legacy_version); + match version { + 0x0304 => "13", + 0x0303 => "12", + 0x0302 => "11", + 0x0301 => "10", + 0x0300 => "s3", + 0x0002 => "s2", + 0xfeff => "d1", + 0xfefd => "d2", + 0xfefc => "d3", + _ => "00", + } +} + +fn ja4_alpn_marker(alpn_first: Option<&[u8]>) -> String { + let Some(value) = alpn_first else { + return "00".to_string(); + }; + let Some(first) = value.first().copied() else { + return "00".to_string(); + }; + let last = value.last().copied().unwrap_or(first); + if first.is_ascii_alphanumeric() && last.is_ascii_alphanumeric() { + return format!("{}{}", first as char, last as char); + } + + let encoded = hex::encode(value); + if encoded.is_empty() { + return "00".to_string(); + } + let first_hex = encoded.as_bytes()[0] as char; + let last_hex = encoded.as_bytes()[encoded.len().saturating_sub(1)] as char; + format!("{first_hex}{last_hex}") +} + +fn count_ja4(count: usize) -> usize { + count.min(99) +} + +fn sha256_truncated_12(input: &str) -> String { + let mut encoded = hex::encode(sha256(input.as_bytes())); + encoded.truncate(12); + encoded +} + +fn join_decimal_u16(values: &[u16]) -> String { + values + .iter() + .map(u16::to_string) + .collect::>() + .join("-") +} + +fn join_decimal_u8(values: &[u8]) -> String { + values + .iter() + .map(u8::to_string) + .collect::>() + .join("-") +} + +fn join_hex_u16(values: &[u16]) -> String { + values + .iter() + .map(|value| format!("{value:04x}")) + .collect::>() + .join(",") +} + +fn read_u16_at(buf: &[u8], pos: usize) -> Option { + Some(u16::from_be_bytes([ + *buf.get(pos)?, + *buf.get(pos.checked_add(1)?)?, + ])) +} + +fn is_grease(value: u16) -> bool { + let high = (value >> 8) as u8; + let low = value as u8; + high == low && (high & 0x0f) == 0x0a +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample_client_hello() -> Vec { + let mut body = Vec::new(); + body.extend_from_slice(&[0x03, 0x03]); + body.extend_from_slice(&[0x11; 32]); + body.push(0); + body.extend_from_slice(&10u16.to_be_bytes()); + body.extend_from_slice(&[0x0a, 0x0a, 0x13, 0x01, 0x13, 0x02, 0xc0, 0x2f, 0x00, 0xff]); + body.push(1); + body.push(0); + + let mut extensions = Vec::new(); + append_ext(&mut extensions, EXT_SNI, &[0, 0]); + append_ext(&mut extensions, EXT_ALPN, &[0, 3, 2, b'h', b'2']); + append_ext( + &mut extensions, + EXT_SUPPORTED_GROUPS, + &[0, 6, 0x0a, 0x0a, 0x00, 0x17, 0x00, 0x1d], + ); + append_ext(&mut extensions, EXT_EC_POINT_FORMATS, &[1, 0]); + append_ext( + &mut extensions, + EXT_SIGNATURE_ALGORITHMS, + &[0, 4, 0x04, 0x03, 0x08, 0x04], + ); + append_ext( + &mut extensions, + EXT_SUPPORTED_VERSIONS, + &[4, 0x03, 0x04, 0x03, 0x03], + ); + body.extend_from_slice(&(extensions.len() as u16).to_be_bytes()); + body.extend_from_slice(&extensions); + + let mut record = Vec::new(); + record.push(TLS_RECORD_HANDSHAKE); + record.extend_from_slice(&[0x03, 0x01]); + record.extend_from_slice(&((body.len() + 4) as u16).to_be_bytes()); + record.push(0x01); + record.extend_from_slice(&[ + ((body.len() >> 16) & 0xff) as u8, + ((body.len() >> 8) & 0xff) as u8, + (body.len() & 0xff) as u8, + ]); + record.extend_from_slice(&body); + record + } + + fn append_ext(out: &mut Vec, etype: u16, data: &[u8]) { + out.extend_from_slice(&etype.to_be_bytes()); + out.extend_from_slice(&(data.len() as u16).to_be_bytes()); + out.extend_from_slice(data); + } + + #[test] + fn ja3_and_ja4_ignore_grease_and_remain_stable() { + let fp = fingerprint_client_hello(&sample_client_hello()) + .expect("sample ClientHello must fingerprint"); + assert_eq!( + fp.ja3_raw, + "771,4865-4866-49199-255,0-16-10-11-13-43,23-29,0" + ); + assert!(fp.ja4.starts_with("t13d0406h2_")); + } + + #[test] + fn malformed_client_hello_returns_none() { + let mut hello = sample_client_hello(); + hello.truncate(12); + assert!(fingerprint_client_hello(&hello).is_none()); + } +} diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 5700d40..34b540b 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -98,6 +98,7 @@ use crate::error::{HandshakeResult, ProxyError, Result, StreamError}; use crate::ip_tracker::UserIpTracker; use crate::protocol::constants::*; use crate::protocol::tls; +use crate::protocol::tls_fingerprint::{self, TlsClientFingerprint}; use crate::stats::beobachten::BeobachtenStore; use crate::stats::{ReplayChecker, Stats}; use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; @@ -350,6 +351,60 @@ fn record_beobachten_class( beobachten.record(class, peer_ip, beobachten_ttl(config)); } +fn tls_fingerprint_collection_enabled(config: &ProxyConfig) -> bool { + config.general.beobachten || config.server.api.runtime_edge_enabled +} + +fn observe_tls_client_fingerprint( + stats: &Stats, + config: &ProxyConfig, + peer_ip: IpAddr, + handshake: &[u8], +) -> Option { + if !tls_fingerprint_collection_enabled(config) { + return None; + } + + match tls_fingerprint::fingerprint_client_hello(handshake) { + Some(fingerprint) => { + stats.record_tls_fingerprint_observed(&fingerprint, peer_ip, beobachten_ttl(config)); + Some(fingerprint) + } + None => { + stats.increment_tls_fingerprint_parse_error(); + None + } + } +} + +fn record_tls_fingerprint_auth_success( + stats: &Stats, + config: &ProxyConfig, + peer_ip: IpAddr, + fingerprint: Option<&TlsClientFingerprint>, + user: &str, +) { + if let Some(fingerprint) = fingerprint { + stats.record_tls_fingerprint_auth_success( + fingerprint, + peer_ip, + user, + beobachten_ttl(config), + ); + } +} + +fn record_tls_fingerprint_bad_or_probe( + stats: &Stats, + config: &ProxyConfig, + peer_ip: IpAddr, + fingerprint: Option<&TlsClientFingerprint>, +) { + if let Some(fingerprint) = fingerprint { + stats.record_tls_fingerprint_bad_or_probe(fingerprint, peer_ip, beobachten_ttl(config)); + } +} + fn classify_expected_64_got_0(kind: std::io::ErrorKind) -> Option<&'static str> { match kind { std::io::ErrorKind::UnexpectedEof => Some("expected_64_got_0_unexpected_eof"), @@ -705,6 +760,9 @@ where )); } + let tls_fingerprint = + observe_tls_client_fingerprint(stats.as_ref(), &config, real_peer.ip(), &handshake); + let (read_half, write_half) = tokio::io::split(stream); let (mut tls_reader, tls_writer, tls_user) = match handle_tls_handshake_with_shared( @@ -715,6 +773,12 @@ where HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { stats.increment_connects_bad_with_class("tls_handshake_bad_client"); + record_tls_fingerprint_bad_or_probe( + stats.as_ref(), + &config, + real_peer.ip(), + tls_fingerprint.as_ref(), + ); return Ok(masking_outcome( reader, writer, @@ -726,10 +790,23 @@ where )); } HandshakeResult::Error(e) => { + record_tls_fingerprint_bad_or_probe( + stats.as_ref(), + &config, + real_peer.ip(), + tls_fingerprint.as_ref(), + ); increment_bad_on_unknown_tls_sni(stats.as_ref(), &e); return Err(e); } }; + record_tls_fingerprint_auth_success( + stats.as_ref(), + &config, + real_peer.ip(), + tls_fingerprint.as_ref(), + tls_user.as_str(), + ); debug!(peer = %peer, "Reading MTProto handshake through TLS"); let mtproto_data = tls_reader.read_exact(HANDSHAKE_LEN).await?; @@ -1295,6 +1372,13 @@ impl RunningClientHandler { )); } + let tls_fingerprint = observe_tls_client_fingerprint( + self.stats.as_ref(), + &self.config, + peer.ip(), + &handshake, + ); + let config = self.config.clone(); let replay_checker = self.replay_checker.clone(); let stats = self.stats.clone(); @@ -1318,6 +1402,12 @@ impl RunningClientHandler { HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { stats.increment_connects_bad_with_class("tls_handshake_bad_client"); + record_tls_fingerprint_bad_or_probe( + stats.as_ref(), + &config, + peer.ip(), + tls_fingerprint.as_ref(), + ); return Ok(masking_outcome( reader, writer, @@ -1329,10 +1419,23 @@ impl RunningClientHandler { )); } HandshakeResult::Error(e) => { + record_tls_fingerprint_bad_or_probe( + stats.as_ref(), + &config, + peer.ip(), + tls_fingerprint.as_ref(), + ); increment_bad_on_unknown_tls_sni(stats.as_ref(), &e); return Err(e); } }; + record_tls_fingerprint_auth_success( + stats.as_ref(), + &config, + peer.ip(), + tls_fingerprint.as_ref(), + tls_user.as_str(), + ); debug!(peer = %peer, "Reading MTProto handshake through TLS"); let mtproto_data = tls_reader.read_exact(HANDSHAKE_LEN).await?; diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 7360fc1..7f05bbf 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -10,6 +10,7 @@ mod me_counters; mod me_getters; mod replay; pub mod telemetry; +pub mod tls_fingerprints; mod users; mod writer_counters; @@ -22,6 +23,7 @@ use std::time::Instant; #[allow(unused_imports)] pub use self::replay::{ReplayChecker, ReplayStats}; use self::telemetry::TelemetryPolicy; +pub use self::tls_fingerprints::TlsFingerprintSnapshotRow; use crate::config::MeWriterPickMode; #[derive(Clone, Copy)] @@ -333,6 +335,7 @@ pub struct Stats { telemetry_user_enabled: AtomicBool, telemetry_me_level: AtomicU8, cached_epoch_secs: AtomicU64, + tls_fingerprints: tls_fingerprints::TlsFingerprintCollector, user_stats: DashMap>, user_stats_last_cleanup_epoch_secs: AtomicU64, start_time: parking_lot::RwLock>, diff --git a/src/stats/tls_fingerprints.rs b/src/stats/tls_fingerprints.rs new file mode 100644 index 0000000..424aa4e --- /dev/null +++ b/src/stats/tls_fingerprints.rs @@ -0,0 +1,556 @@ +//! Bounded TLS JA3/JA4 fingerprint aggregation. + +use std::cmp::Reverse; +use std::hash::Hash; +use std::net::{IpAddr, Ipv6Addr}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; + +use dashmap::DashMap; +use dashmap::mapref::entry::Entry; + +use crate::protocol::tls_fingerprint::TlsClientFingerprint; + +use super::Stats; + +const CLEANUP_INTERVAL_SECS: u64 = 30; +const MAX_TLS_FINGERPRINT_BUCKETS: usize = 65_536; + +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub enum TlsFingerprintScopeKind { + Fingerprint, + Ip, + Cidr, + User, +} + +#[derive(Clone, Debug)] +pub struct TlsFingerprintSnapshotRow { + pub scope_key: String, + pub ja3: String, + pub ja3_raw: String, + pub ja4: String, + pub ja4_raw: String, + pub total: u64, + pub auth_success: u64, + pub bad_or_probe: u64, + pub first_seen_epoch_secs: u64, + pub last_seen_epoch_secs: u64, +} + +#[derive(Clone, Debug)] +pub struct TlsFingerprintSnapshot { + pub retention_secs: u64, + pub capacity: usize, + pub dropped_total: u64, + pub parse_error_total: u64, + pub by_fingerprint: Vec, + pub by_ip: Vec, + pub by_cidr: Vec, + pub by_user: Vec, +} + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +struct TlsFingerprintKey { + scope_kind: TlsFingerprintScopeKind, + scope_key: String, + ja3: String, + ja3_raw: String, + ja4: String, + ja4_raw: String, +} + +struct TlsFingerprintEntry { + first_seen_epoch_secs: AtomicU64, + last_seen_epoch_secs: AtomicU64, + total: AtomicU64, + auth_success: AtomicU64, + bad_or_probe: AtomicU64, +} + +#[derive(Default)] +pub struct TlsFingerprintCollector { + entries: DashMap, + dropped_total: AtomicU64, + parse_error_total: AtomicU64, + last_cleanup_epoch_secs: AtomicU64, +} + +impl TlsFingerprintCollector { + pub fn record_observed( + &self, + fingerprint: &TlsClientFingerprint, + peer_ip: IpAddr, + ttl: Duration, + ) { + if ttl.is_zero() { + return; + } + let now = now_epoch_secs(); + self.cleanup_if_needed(now, ttl.as_secs()); + self.record_scoped( + scope_key(TlsFingerprintScopeKind::Fingerprint, ""), + fingerprint, + now, + true, + false, + false, + ); + self.record_scoped( + scope_key(TlsFingerprintScopeKind::Ip, &peer_ip.to_string()), + fingerprint, + now, + true, + false, + false, + ); + self.record_scoped( + scope_key(TlsFingerprintScopeKind::Cidr, &cidr_bucket(peer_ip)), + fingerprint, + now, + true, + false, + false, + ); + } + + pub fn record_auth_success( + &self, + fingerprint: &TlsClientFingerprint, + peer_ip: IpAddr, + user: &str, + ttl: Duration, + ) { + if ttl.is_zero() || user.is_empty() { + return; + } + let now = now_epoch_secs(); + self.cleanup_if_needed(now, ttl.as_secs()); + self.record_scoped( + scope_key(TlsFingerprintScopeKind::Fingerprint, ""), + fingerprint, + now, + false, + true, + false, + ); + self.record_scoped( + scope_key(TlsFingerprintScopeKind::Ip, &peer_ip.to_string()), + fingerprint, + now, + false, + true, + false, + ); + self.record_scoped( + scope_key(TlsFingerprintScopeKind::Cidr, &cidr_bucket(peer_ip)), + fingerprint, + now, + false, + true, + false, + ); + self.record_scoped( + scope_key(TlsFingerprintScopeKind::User, user), + fingerprint, + now, + true, + true, + false, + ); + } + + pub fn record_bad_or_probe( + &self, + fingerprint: &TlsClientFingerprint, + peer_ip: IpAddr, + ttl: Duration, + ) { + if ttl.is_zero() { + return; + } + let now = now_epoch_secs(); + self.cleanup_if_needed(now, ttl.as_secs()); + self.record_scoped( + scope_key(TlsFingerprintScopeKind::Fingerprint, ""), + fingerprint, + now, + false, + false, + true, + ); + self.record_scoped( + scope_key(TlsFingerprintScopeKind::Ip, &peer_ip.to_string()), + fingerprint, + now, + false, + false, + true, + ); + self.record_scoped( + scope_key(TlsFingerprintScopeKind::Cidr, &cidr_bucket(peer_ip)), + fingerprint, + now, + false, + false, + true, + ); + } + + pub fn increment_parse_error(&self) { + self.parse_error_total.fetch_add(1, Ordering::Relaxed); + } + + pub fn snapshot(&self, ttl: Duration, limit: usize) -> TlsFingerprintSnapshot { + let now = now_epoch_secs(); + self.cleanup(now, ttl.as_secs()); + + let limit = limit.clamp(1, 1000); + let mut by_fingerprint = Vec::new(); + let mut by_ip = Vec::new(); + let mut by_cidr = Vec::new(); + let mut by_user = Vec::new(); + + for entry in self.entries.iter() { + let row = snapshot_row(entry.key(), entry.value()); + match entry.key().scope_kind { + TlsFingerprintScopeKind::Fingerprint => by_fingerprint.push(row), + TlsFingerprintScopeKind::Ip => by_ip.push(row), + TlsFingerprintScopeKind::Cidr => by_cidr.push(row), + TlsFingerprintScopeKind::User => by_user.push(row), + } + } + + sort_and_truncate(&mut by_fingerprint, limit); + sort_and_truncate(&mut by_ip, limit); + sort_and_truncate(&mut by_cidr, limit); + sort_and_truncate(&mut by_user, limit); + + TlsFingerprintSnapshot { + retention_secs: ttl.as_secs(), + capacity: MAX_TLS_FINGERPRINT_BUCKETS, + dropped_total: self.dropped_total.load(Ordering::Relaxed), + parse_error_total: self.parse_error_total.load(Ordering::Relaxed), + by_fingerprint, + by_ip, + by_cidr, + by_user, + } + } + + pub fn snapshot_text(&self, ttl: Duration, limit: usize) -> String { + let snapshot = self.snapshot(ttl, limit); + if snapshot.by_fingerprint.is_empty() + && snapshot.by_ip.is_empty() + && snapshot.by_cidr.is_empty() + && snapshot.by_user.is_empty() + { + return String::new(); + } + + let mut out = String::new(); + out.push_str("[tls_fingerprints]\n"); + out.push_str(&format!( + "retention_secs={} capacity={} dropped_total={} parse_error_total={}\n", + snapshot.retention_secs, + snapshot.capacity, + snapshot.dropped_total, + snapshot.parse_error_total + )); + append_rows( + &mut out, + "tls_fingerprints.by_fingerprint", + &snapshot.by_fingerprint, + ); + append_rows(&mut out, "tls_fingerprints.by_ip", &snapshot.by_ip); + append_rows(&mut out, "tls_fingerprints.by_cidr", &snapshot.by_cidr); + append_rows(&mut out, "tls_fingerprints.by_user", &snapshot.by_user); + out + } + + fn record_scoped( + &self, + scope: (TlsFingerprintScopeKind, String), + fingerprint: &TlsClientFingerprint, + now_epoch_secs: u64, + count_total: bool, + count_auth_success: bool, + count_bad_or_probe: bool, + ) { + let key = TlsFingerprintKey { + scope_kind: scope.0, + scope_key: scope.1, + ja3: fingerprint.ja3.clone(), + ja3_raw: fingerprint.ja3_raw.clone(), + ja4: fingerprint.ja4.clone(), + ja4_raw: fingerprint.ja4_raw.clone(), + }; + + if let Some(entry) = self.entries.get(&key) { + update_entry( + entry.value(), + now_epoch_secs, + count_total, + count_auth_success, + count_bad_or_probe, + ); + return; + } + + if self.entries.len() >= MAX_TLS_FINGERPRINT_BUCKETS { + self.dropped_total.fetch_add(1, Ordering::Relaxed); + return; + } + + match self.entries.entry(key) { + Entry::Occupied(entry) => { + update_entry( + entry.get(), + now_epoch_secs, + count_total, + count_auth_success, + count_bad_or_probe, + ); + } + Entry::Vacant(entry) => { + entry.insert(TlsFingerprintEntry::new( + now_epoch_secs, + if count_total { 1 } else { 0 }, + if count_auth_success { 1 } else { 0 }, + if count_bad_or_probe { 1 } else { 0 }, + )); + } + } + } + + fn cleanup_if_needed(&self, now_epoch_secs: u64, ttl_secs: u64) { + let last = self.last_cleanup_epoch_secs.load(Ordering::Relaxed); + if now_epoch_secs.saturating_sub(last) < CLEANUP_INTERVAL_SECS { + return; + } + if self + .last_cleanup_epoch_secs + .compare_exchange(last, now_epoch_secs, Ordering::AcqRel, Ordering::Relaxed) + .is_err() + { + return; + } + self.cleanup(now_epoch_secs, ttl_secs); + } + + fn cleanup(&self, now_epoch_secs: u64, ttl_secs: u64) { + if ttl_secs == 0 { + self.entries.clear(); + return; + } + self.entries.retain(|_, entry| { + let last_seen = entry.last_seen_epoch_secs.load(Ordering::Relaxed); + now_epoch_secs.saturating_sub(last_seen) <= ttl_secs + }); + } +} + +impl TlsFingerprintEntry { + fn new(now_epoch_secs: u64, total: u64, auth_success: u64, bad_or_probe: u64) -> Self { + Self { + first_seen_epoch_secs: AtomicU64::new(now_epoch_secs), + last_seen_epoch_secs: AtomicU64::new(now_epoch_secs), + total: AtomicU64::new(total), + auth_success: AtomicU64::new(auth_success), + bad_or_probe: AtomicU64::new(bad_or_probe), + } + } +} + +fn update_entry( + entry: &TlsFingerprintEntry, + now_epoch_secs: u64, + count_total: bool, + count_auth_success: bool, + count_bad_or_probe: bool, +) { + entry + .last_seen_epoch_secs + .store(now_epoch_secs, Ordering::Relaxed); + if count_total { + entry.total.fetch_add(1, Ordering::Relaxed); + } + if count_auth_success { + entry.auth_success.fetch_add(1, Ordering::Relaxed); + } + if count_bad_or_probe { + entry.bad_or_probe.fetch_add(1, Ordering::Relaxed); + } +} + +fn snapshot_row(key: &TlsFingerprintKey, entry: &TlsFingerprintEntry) -> TlsFingerprintSnapshotRow { + TlsFingerprintSnapshotRow { + scope_key: key.scope_key.clone(), + ja3: key.ja3.clone(), + ja3_raw: key.ja3_raw.clone(), + ja4: key.ja4.clone(), + ja4_raw: key.ja4_raw.clone(), + total: entry.total.load(Ordering::Relaxed), + auth_success: entry.auth_success.load(Ordering::Relaxed), + bad_or_probe: entry.bad_or_probe.load(Ordering::Relaxed), + first_seen_epoch_secs: entry.first_seen_epoch_secs.load(Ordering::Relaxed), + last_seen_epoch_secs: entry.last_seen_epoch_secs.load(Ordering::Relaxed), + } +} + +fn sort_and_truncate(rows: &mut Vec, limit: usize) { + rows.sort_by_key(|row| { + ( + Reverse(row.total), + row.scope_key.clone(), + row.ja4.clone(), + row.ja3.clone(), + ) + }); + rows.truncate(limit); +} + +fn append_rows(out: &mut String, section: &str, rows: &[TlsFingerprintSnapshotRow]) { + if rows.is_empty() { + return; + } + out.push('['); + out.push_str(section); + out.push_str("]\n"); + for row in rows { + if row.scope_key.is_empty() { + out.push_str(&format!( + "ja4={} ja3={} total={} auth_success={} bad_or_probe={} first_seen={} last_seen={}\n", + row.ja4, + row.ja3, + row.total, + row.auth_success, + row.bad_or_probe, + row.first_seen_epoch_secs, + row.last_seen_epoch_secs + )); + } else { + out.push_str(&format!( + "scope={} ja4={} ja3={} total={} auth_success={} bad_or_probe={} first_seen={} last_seen={}\n", + row.scope_key, + row.ja4, + row.ja3, + row.total, + row.auth_success, + row.bad_or_probe, + row.first_seen_epoch_secs, + row.last_seen_epoch_secs + )); + } + } +} + +fn scope_key(kind: TlsFingerprintScopeKind, key: &str) -> (TlsFingerprintScopeKind, String) { + (kind, key.to_string()) +} + +fn cidr_bucket(ip: IpAddr) -> String { + match ip { + IpAddr::V4(ip) => { + let [a, b, c, _] = ip.octets(); + format!("{a}.{b}.{c}.0/24") + } + IpAddr::V6(ip) => { + let mut octets = ip.octets(); + for byte in &mut octets[7..] { + *byte = 0; + } + format!("{}/56", Ipv6Addr::from(octets)) + } + } +} + +fn now_epoch_secs() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +impl Stats { + pub fn record_tls_fingerprint_observed( + &self, + fingerprint: &TlsClientFingerprint, + peer_ip: IpAddr, + ttl: Duration, + ) { + if self.telemetry_core_enabled() { + self.tls_fingerprints + .record_observed(fingerprint, peer_ip, ttl); + } + } + + pub fn record_tls_fingerprint_auth_success( + &self, + fingerprint: &TlsClientFingerprint, + peer_ip: IpAddr, + user: &str, + ttl: Duration, + ) { + if self.telemetry_core_enabled() { + self.tls_fingerprints + .record_auth_success(fingerprint, peer_ip, user, ttl); + } + } + + pub fn record_tls_fingerprint_bad_or_probe( + &self, + fingerprint: &TlsClientFingerprint, + peer_ip: IpAddr, + ttl: Duration, + ) { + if self.telemetry_core_enabled() { + self.tls_fingerprints + .record_bad_or_probe(fingerprint, peer_ip, ttl); + } + } + + pub fn increment_tls_fingerprint_parse_error(&self) { + if self.telemetry_core_enabled() { + self.tls_fingerprints.increment_parse_error(); + } + } + + pub fn tls_fingerprint_snapshot(&self, ttl: Duration, limit: usize) -> TlsFingerprintSnapshot { + self.tls_fingerprints.snapshot(ttl, limit) + } + + pub fn tls_fingerprint_snapshot_text(&self, ttl: Duration, limit: usize) -> String { + self.tls_fingerprints.snapshot_text(ttl, limit) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn fp() -> TlsClientFingerprint { + TlsClientFingerprint { + ja3: "ja3".to_string(), + ja3_raw: "771,4865,,,0".to_string(), + ja4: "t13d010100_hash_hash".to_string(), + ja4_raw: "raw".to_string(), + } + } + + #[test] + fn aggregates_ip_cidr_and_user_scopes() { + let collector = TlsFingerprintCollector::default(); + let ip: IpAddr = "192.0.2.15".parse().expect("test IP parses"); + collector.record_observed(&fp(), ip, Duration::from_secs(60)); + collector.record_auth_success(&fp(), ip, "alice", Duration::from_secs(60)); + let snapshot = collector.snapshot(Duration::from_secs(60), 10); + + assert_eq!(snapshot.by_fingerprint[0].total, 1); + assert_eq!(snapshot.by_fingerprint[0].auth_success, 1); + assert_eq!(snapshot.by_ip[0].scope_key, "192.0.2.15"); + assert_eq!(snapshot.by_cidr[0].scope_key, "192.0.2.0/24"); + assert_eq!(snapshot.by_user[0].scope_key, "alice"); + assert_eq!(snapshot.by_user[0].total, 1); + } +}