JA3+JA4 Pitfall in API + Beobachten

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-06-02 08:17:56 +03:00
parent 5c573a926b
commit 34b48325fd
8 changed files with 1266 additions and 4 deletions

View File

@@ -55,7 +55,7 @@ use model::{
use patch::Patch; use patch::Patch;
use runtime_edge::{ use runtime_edge::{
EdgeConnectionsCacheEntry, build_runtime_connections_summary_data, EdgeConnectionsCacheEntry, build_runtime_connections_summary_data,
build_runtime_events_recent_data, build_runtime_events_recent_data, build_runtime_tls_fingerprints_data,
}; };
use runtime_init::build_runtime_initialization_data; use runtime_init::build_runtime_initialization_data;
use runtime_min::{ use runtime_min::{
@@ -169,6 +169,7 @@ fn allowed_methods_for_path(path: &str) -> Option<&'static str> {
| "/v1/runtime/me-selftest" | "/v1/runtime/me-selftest"
| "/v1/runtime/connections/summary" | "/v1/runtime/connections/summary"
| "/v1/runtime/events/recent" | "/v1/runtime/events/recent"
| "/v1/runtime/tls-fingerprints"
| "/v1/stats/users/active-ips" | "/v1/stats/users/active-ips"
| "/v1/stats/users/quota" | "/v1/stats/users/quota"
| "/v1/stats/users" => Some(ALLOW_GET), | "/v1/stats/users" => Some(ALLOW_GET),
@@ -540,6 +541,15 @@ async fn handle(
); );
Ok(success_response(StatusCode::OK, data, revision)) Ok(success_response(StatusCode::OK, data, revision))
} }
("GET", "/v1/runtime/tls-fingerprints") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_runtime_tls_fingerprints_data(
shared.as_ref(),
cfg.as_ref(),
query.as_deref(),
);
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/stats/users/active-ips") => { ("GET", "/v1/stats/users/active-ips") => {
let revision = current_revision(&shared.config_path).await?; let revision = current_revision(&shared.config_path).await?;
let usernames: Vec<_> = cfg.access.users.keys().cloned().collect(); let usernames: Vec<_> = cfg.access.users.keys().cloned().collect();

View File

@@ -12,6 +12,8 @@ const FEATURE_DISABLED_REASON: &str = "feature_disabled";
const SOURCE_UNAVAILABLE_REASON: &str = "source_unavailable"; const SOURCE_UNAVAILABLE_REASON: &str = "source_unavailable";
const EVENTS_DEFAULT_LIMIT: usize = 50; const EVENTS_DEFAULT_LIMIT: usize = 50;
const EVENTS_MAX_LIMIT: usize = 1000; const EVENTS_MAX_LIMIT: usize = 1000;
const TLS_FINGERPRINTS_MAX_LIMIT: usize = 1000;
const RUNTIME_EDGE_RETENTION_MAX_MINUTES: u64 = 24 * 60;
#[derive(Clone, Serialize)] #[derive(Clone, Serialize)]
pub(super) struct RuntimeEdgeConnectionUserData { pub(super) struct RuntimeEdgeConnectionUserData {
@@ -90,6 +92,44 @@ pub(super) struct RuntimeEdgeEventsData {
pub(super) data: Option<RuntimeEdgeEventsPayload>, pub(super) data: Option<RuntimeEdgeEventsPayload>,
} }
#[derive(Serialize)]
pub(super) struct RuntimeEdgeTlsFingerprintRow {
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) scope: Option<String>,
pub(super) ja3: String,
pub(super) ja3_raw: String,
pub(super) ja4: String,
pub(super) ja4_raw: String,
pub(super) total: u64,
pub(super) auth_success: u64,
pub(super) bad_or_probe: u64,
pub(super) first_seen_epoch_secs: u64,
pub(super) last_seen_epoch_secs: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeEdgeTlsFingerprintsPayload {
pub(super) limit: usize,
pub(super) retention_secs: u64,
pub(super) capacity: usize,
pub(super) dropped_total: u64,
pub(super) parse_error_total: u64,
pub(super) by_fingerprint: Vec<RuntimeEdgeTlsFingerprintRow>,
pub(super) by_ip: Vec<RuntimeEdgeTlsFingerprintRow>,
pub(super) by_cidr: Vec<RuntimeEdgeTlsFingerprintRow>,
pub(super) by_user: Vec<RuntimeEdgeTlsFingerprintRow>,
}
#[derive(Serialize)]
pub(super) struct RuntimeEdgeTlsFingerprintsData {
pub(super) enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) reason: Option<&'static str>,
pub(super) generated_at_epoch_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) data: Option<RuntimeEdgeTlsFingerprintsPayload>,
}
pub(super) async fn build_runtime_connections_summary_data( pub(super) async fn build_runtime_connections_summary_data(
shared: &ApiShared, shared: &ApiShared,
cfg: &ProxyConfig, cfg: &ProxyConfig,
@@ -162,6 +202,65 @@ pub(super) fn build_runtime_events_recent_data(
} }
} }
pub(super) fn build_runtime_tls_fingerprints_data(
shared: &ApiShared,
cfg: &ProxyConfig,
query: Option<&str>,
) -> RuntimeEdgeTlsFingerprintsData {
let now_epoch_secs = now_epoch_secs();
let api_cfg = &cfg.server.api;
if !api_cfg.runtime_edge_enabled {
return RuntimeEdgeTlsFingerprintsData {
enabled: false,
reason: Some(FEATURE_DISABLED_REASON),
generated_at_epoch_secs: now_epoch_secs,
data: None,
};
}
let limit = parse_recent_events_limit(
query,
api_cfg.runtime_edge_top_n.max(1),
TLS_FINGERPRINTS_MAX_LIMIT,
);
let snapshot = shared
.stats
.tls_fingerprint_snapshot(runtime_edge_retention(cfg), limit);
RuntimeEdgeTlsFingerprintsData {
enabled: true,
reason: None,
generated_at_epoch_secs: now_epoch_secs,
data: Some(RuntimeEdgeTlsFingerprintsPayload {
limit,
retention_secs: snapshot.retention_secs,
capacity: snapshot.capacity,
dropped_total: snapshot.dropped_total,
parse_error_total: snapshot.parse_error_total,
by_fingerprint: snapshot
.by_fingerprint
.into_iter()
.map(runtime_tls_fingerprint_row)
.collect(),
by_ip: snapshot
.by_ip
.into_iter()
.map(runtime_tls_fingerprint_row)
.collect(),
by_cidr: snapshot
.by_cidr
.into_iter()
.map(runtime_tls_fingerprint_row)
.collect(),
by_user: snapshot
.by_user
.into_iter()
.map(runtime_tls_fingerprint_row)
.collect(),
}),
}
}
async fn get_connections_payload_cached( async fn get_connections_payload_cached(
shared: &ApiShared, shared: &ApiShared,
cache_ttl_ms: u64, cache_ttl_ms: u64,
@@ -286,6 +385,35 @@ fn parse_recent_events_limit(query: Option<&str>, default_limit: usize, max_limi
default_limit default_limit
} }
fn runtime_edge_retention(cfg: &ProxyConfig) -> Duration {
let minutes = cfg
.general
.beobachten_minutes
.clamp(1, RUNTIME_EDGE_RETENTION_MAX_MINUTES);
Duration::from_secs(minutes.saturating_mul(60))
}
fn runtime_tls_fingerprint_row(
row: crate::stats::TlsFingerprintSnapshotRow,
) -> RuntimeEdgeTlsFingerprintRow {
RuntimeEdgeTlsFingerprintRow {
scope: if row.scope_key.is_empty() {
None
} else {
Some(row.scope_key)
},
ja3: row.ja3,
ja3_raw: row.ja3_raw,
ja4: row.ja4,
ja4_raw: row.ja4_raw,
total: row.total,
auth_success: row.auth_success,
bad_or_probe: row.bad_or_probe,
first_seen_epoch_secs: row.first_seen_epoch_secs,
last_seen_epoch_secs: row.last_seen_epoch_secs,
}
}
fn now_epoch_secs() -> u64 { fn now_epoch_secs() -> u64 {
SystemTime::now() SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)

View File

@@ -288,7 +288,7 @@ async fn handle<B>(
} }
if req.uri().path() == "/beobachten" { if req.uri().path() == "/beobachten" {
let body = render_beobachten(beobachten, config); let body = render_beobachten(stats, beobachten, config);
let resp = Response::builder() let resp = Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.header("content-type", "text/plain; charset=utf-8") .header("content-type", "text/plain; charset=utf-8")
@@ -304,13 +304,22 @@ async fn handle<B>(
Ok(resp) Ok(resp)
} }
fn render_beobachten(beobachten: &BeobachtenStore, config: &ProxyConfig) -> String { fn render_beobachten(stats: &Stats, beobachten: &BeobachtenStore, config: &ProxyConfig) -> String {
if !config.general.beobachten { if !config.general.beobachten {
return "beobachten disabled\n".to_string(); return "beobachten disabled\n".to_string();
} }
let ttl = Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60)); let ttl = Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60));
beobachten.snapshot_text(ttl) let mut body = beobachten.snapshot_text(ttl);
let tls_text = stats.tls_fingerprint_snapshot_text(ttl, 20);
if !tls_text.is_empty() {
if !body.ends_with('\n') {
body.push('\n');
}
body.push('\n');
body.push_str(&tls_text);
}
body
} }
fn tls_front_domains(config: &ProxyConfig) -> Vec<String> { fn tls_front_domains(config: &ProxyConfig) -> Vec<String> {

View File

@@ -4,6 +4,7 @@ pub mod constants;
pub mod frame; pub mod frame;
pub mod obfuscation; pub mod obfuscation;
pub mod tls; pub mod tls;
pub mod tls_fingerprint;
#[allow(unused_imports)] #[allow(unused_imports)]
pub use constants::*; pub use constants::*;
@@ -13,3 +14,5 @@ pub use frame::*;
pub use obfuscation::*; pub use obfuscation::*;
#[allow(unused_imports)] #[allow(unused_imports)]
pub use tls::*; pub use tls::*;
#[allow(unused_imports)]
pub use tls_fingerprint::*;

View File

@@ -0,0 +1,450 @@
//! Passive JA3 / JA4 TLS ClientHello fingerprinting.
use crate::crypto::hash::md5;
use crate::crypto::sha256;
use crate::protocol::constants::TLS_RECORD_HANDSHAKE;
const EXT_SNI: u16 = 0x0000;
const EXT_SUPPORTED_GROUPS: u16 = 0x000a;
const EXT_EC_POINT_FORMATS: u16 = 0x000b;
const EXT_SIGNATURE_ALGORITHMS: u16 = 0x000d;
const EXT_ALPN: u16 = 0x0010;
const EXT_SUPPORTED_VERSIONS: u16 = 0x002b;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TlsClientFingerprint {
pub ja3: String,
pub ja3_raw: String,
pub ja4: String,
pub ja4_raw: String,
}
#[derive(Default)]
struct ParsedClientHello {
legacy_version: u16,
ciphers: Vec<u16>,
extensions: Vec<u16>,
supported_groups: Vec<u16>,
ec_point_formats: Vec<u8>,
signature_algorithms: Vec<u16>,
supported_versions: Vec<u16>,
alpn_first: Option<Vec<u8>>,
sni_present: bool,
}
pub fn fingerprint_client_hello(handshake: &[u8]) -> Option<TlsClientFingerprint> {
let parsed = parse_client_hello(handshake)?;
let ja3_raw = ja3_raw(&parsed);
let ja3 = hex::encode(md5(ja3_raw.as_bytes()));
let (ja4, ja4_raw) = ja4(&parsed);
Some(TlsClientFingerprint {
ja3,
ja3_raw,
ja4,
ja4_raw,
})
}
fn parse_client_hello(handshake: &[u8]) -> Option<ParsedClientHello> {
if handshake.len() < 5 || handshake[0] != TLS_RECORD_HANDSHAKE {
return None;
}
let record_len = read_u16_at(handshake, 3)? as usize;
let record_end = 5usize.checked_add(record_len)?;
if record_end > handshake.len() {
return None;
}
let mut pos = 5usize;
if *handshake.get(pos)? != 0x01 {
return None;
}
pos = pos.checked_add(1)?;
if pos + 3 > record_end {
return None;
}
let handshake_len = ((usize::from(handshake[pos])) << 16)
| ((usize::from(handshake[pos + 1])) << 8)
| usize::from(handshake[pos + 2]);
pos = pos.checked_add(3)?;
let handshake_end = pos.checked_add(handshake_len)?;
if handshake_end > record_end {
return None;
}
if pos + 2 + 32 > handshake_end {
return None;
}
let legacy_version = read_u16_at(handshake, pos)?;
pos = pos.checked_add(2 + 32)?;
let session_id_len = usize::from(*handshake.get(pos)?);
pos = pos.checked_add(1)?.checked_add(session_id_len)?;
if pos + 2 > handshake_end {
return None;
}
let cipher_len = read_u16_at(handshake, pos)? as usize;
pos = pos.checked_add(2)?;
let cipher_end = pos.checked_add(cipher_len)?;
if cipher_end > handshake_end || cipher_len % 2 != 0 {
return None;
}
let mut ciphers = Vec::with_capacity(cipher_len / 2);
while pos + 1 < cipher_end {
let value = read_u16_at(handshake, pos)?;
if !is_grease(value) {
ciphers.push(value);
}
pos = pos.checked_add(2)?;
}
let comp_len = usize::from(*handshake.get(pos)?);
pos = pos.checked_add(1)?.checked_add(comp_len)?;
if pos > handshake_end {
return None;
}
let mut parsed = ParsedClientHello {
legacy_version,
ciphers,
..ParsedClientHello::default()
};
if pos == handshake_end {
return Some(parsed);
}
if pos + 2 > handshake_end {
return None;
}
let ext_len = read_u16_at(handshake, pos)? as usize;
pos = pos.checked_add(2)?;
let ext_end = pos.checked_add(ext_len)?;
if ext_end > handshake_end {
return None;
}
while pos + 4 <= ext_end {
let etype = read_u16_at(handshake, pos)?;
let elen = read_u16_at(handshake, pos + 2)? as usize;
pos = pos.checked_add(4)?;
let data_end = pos.checked_add(elen)?;
if data_end > ext_end {
return None;
}
let data = handshake.get(pos..data_end)?;
if !is_grease(etype) {
parsed.extensions.push(etype);
match etype {
EXT_SNI => parsed.sni_present = true,
EXT_SUPPORTED_GROUPS => {
parsed.supported_groups = parse_u16_vector(data, 2)?;
}
EXT_EC_POINT_FORMATS => {
parsed.ec_point_formats = parse_u8_vector(data)?;
}
EXT_SIGNATURE_ALGORITHMS => {
parsed.signature_algorithms = parse_u16_vector(data, 2)?;
}
EXT_ALPN => {
parsed.alpn_first = parse_alpn_first(data)?;
}
EXT_SUPPORTED_VERSIONS => {
parsed.supported_versions = parse_u16_vector(data, 1)?;
}
_ => {}
}
}
pos = data_end;
}
if pos != ext_end {
return None;
}
Some(parsed)
}
fn parse_u16_vector(data: &[u8], len_prefix_len: usize) -> Option<Vec<u16>> {
let (list_len, mut pos) = match len_prefix_len {
1 => (usize::from(*data.first()?), 1usize),
2 => (read_u16_at(data, 0)? as usize, 2usize),
_ => return None,
};
let list_end = pos.checked_add(list_len)?;
if list_end > data.len() || list_len % 2 != 0 {
return None;
}
let mut out = Vec::with_capacity(list_len / 2);
while pos + 1 < list_end {
let value = read_u16_at(data, pos)?;
if !is_grease(value) {
out.push(value);
}
pos = pos.checked_add(2)?;
}
Some(out)
}
fn parse_u8_vector(data: &[u8]) -> Option<Vec<u8>> {
let list_len = usize::from(*data.first()?);
let list_start = 1usize;
let list_end = list_start.checked_add(list_len)?;
if list_end > data.len() {
return None;
}
Some(data.get(list_start..list_end)?.to_vec())
}
fn parse_alpn_first(data: &[u8]) -> Option<Option<Vec<u8>>> {
if data.len() < 2 {
return None;
}
let list_len = read_u16_at(data, 0)? as usize;
let mut pos = 2usize;
let list_end = pos.checked_add(list_len)?;
if list_end > data.len() {
return None;
}
if pos == list_end {
return Some(None);
}
let protocol_len = usize::from(*data.get(pos)?);
pos = pos.checked_add(1)?;
let protocol_end = pos.checked_add(protocol_len)?;
if protocol_end > list_end {
return None;
}
if protocol_len == 0 {
return Some(None);
}
Some(Some(data.get(pos..protocol_end)?.to_vec()))
}
fn ja3_raw(parsed: &ParsedClientHello) -> String {
format!(
"{},{},{},{},{}",
parsed.legacy_version,
join_decimal_u16(&parsed.ciphers),
join_decimal_u16(&parsed.extensions),
join_decimal_u16(&parsed.supported_groups),
join_decimal_u8(&parsed.ec_point_formats)
)
}
fn ja4(parsed: &ParsedClientHello) -> (String, String) {
let a = format!(
"t{}{}{:02}{:02}{}",
ja4_version_code(parsed),
if parsed.sni_present { "d" } else { "i" },
count_ja4(parsed.ciphers.len()),
count_ja4(parsed.extensions.len()),
ja4_alpn_marker(parsed.alpn_first.as_deref())
);
let mut ciphers = parsed.ciphers.clone();
ciphers.sort_unstable();
let cipher_raw = join_hex_u16(&ciphers);
let cipher_hash = if ciphers.is_empty() {
"000000000000".to_string()
} else {
sha256_truncated_12(&cipher_raw)
};
let mut extensions_for_hash = parsed
.extensions
.iter()
.copied()
.filter(|value| *value != EXT_SNI && *value != EXT_ALPN)
.collect::<Vec<_>>();
extensions_for_hash.sort_unstable();
let extension_raw = join_hex_u16(&extensions_for_hash);
let signature_raw = join_hex_u16(&parsed.signature_algorithms);
let extension_hash_input = if signature_raw.is_empty() {
extension_raw.clone()
} else {
format!("{extension_raw}_{signature_raw}")
};
let extension_hash = if extensions_for_hash.is_empty() {
"000000000000".to_string()
} else {
sha256_truncated_12(&extension_hash_input)
};
(
format!("{a}_{cipher_hash}_{extension_hash}"),
format!("{a}_{cipher_raw}_{extension_hash_input}"),
)
}
fn ja4_version_code(parsed: &ParsedClientHello) -> &'static str {
let version = parsed
.supported_versions
.iter()
.copied()
.max()
.unwrap_or(parsed.legacy_version);
match version {
0x0304 => "13",
0x0303 => "12",
0x0302 => "11",
0x0301 => "10",
0x0300 => "s3",
0x0002 => "s2",
0xfeff => "d1",
0xfefd => "d2",
0xfefc => "d3",
_ => "00",
}
}
fn ja4_alpn_marker(alpn_first: Option<&[u8]>) -> String {
let Some(value) = alpn_first else {
return "00".to_string();
};
let Some(first) = value.first().copied() else {
return "00".to_string();
};
let last = value.last().copied().unwrap_or(first);
if first.is_ascii_alphanumeric() && last.is_ascii_alphanumeric() {
return format!("{}{}", first as char, last as char);
}
let encoded = hex::encode(value);
if encoded.is_empty() {
return "00".to_string();
}
let first_hex = encoded.as_bytes()[0] as char;
let last_hex = encoded.as_bytes()[encoded.len().saturating_sub(1)] as char;
format!("{first_hex}{last_hex}")
}
fn count_ja4(count: usize) -> usize {
count.min(99)
}
fn sha256_truncated_12(input: &str) -> String {
let mut encoded = hex::encode(sha256(input.as_bytes()));
encoded.truncate(12);
encoded
}
fn join_decimal_u16(values: &[u16]) -> String {
values
.iter()
.map(u16::to_string)
.collect::<Vec<_>>()
.join("-")
}
fn join_decimal_u8(values: &[u8]) -> String {
values
.iter()
.map(u8::to_string)
.collect::<Vec<_>>()
.join("-")
}
fn join_hex_u16(values: &[u16]) -> String {
values
.iter()
.map(|value| format!("{value:04x}"))
.collect::<Vec<_>>()
.join(",")
}
fn read_u16_at(buf: &[u8], pos: usize) -> Option<u16> {
Some(u16::from_be_bytes([
*buf.get(pos)?,
*buf.get(pos.checked_add(1)?)?,
]))
}
fn is_grease(value: u16) -> bool {
let high = (value >> 8) as u8;
let low = value as u8;
high == low && (high & 0x0f) == 0x0a
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_client_hello() -> Vec<u8> {
let mut body = Vec::new();
body.extend_from_slice(&[0x03, 0x03]);
body.extend_from_slice(&[0x11; 32]);
body.push(0);
body.extend_from_slice(&10u16.to_be_bytes());
body.extend_from_slice(&[0x0a, 0x0a, 0x13, 0x01, 0x13, 0x02, 0xc0, 0x2f, 0x00, 0xff]);
body.push(1);
body.push(0);
let mut extensions = Vec::new();
append_ext(&mut extensions, EXT_SNI, &[0, 0]);
append_ext(&mut extensions, EXT_ALPN, &[0, 3, 2, b'h', b'2']);
append_ext(
&mut extensions,
EXT_SUPPORTED_GROUPS,
&[0, 6, 0x0a, 0x0a, 0x00, 0x17, 0x00, 0x1d],
);
append_ext(&mut extensions, EXT_EC_POINT_FORMATS, &[1, 0]);
append_ext(
&mut extensions,
EXT_SIGNATURE_ALGORITHMS,
&[0, 4, 0x04, 0x03, 0x08, 0x04],
);
append_ext(
&mut extensions,
EXT_SUPPORTED_VERSIONS,
&[4, 0x03, 0x04, 0x03, 0x03],
);
body.extend_from_slice(&(extensions.len() as u16).to_be_bytes());
body.extend_from_slice(&extensions);
let mut record = Vec::new();
record.push(TLS_RECORD_HANDSHAKE);
record.extend_from_slice(&[0x03, 0x01]);
record.extend_from_slice(&((body.len() + 4) as u16).to_be_bytes());
record.push(0x01);
record.extend_from_slice(&[
((body.len() >> 16) & 0xff) as u8,
((body.len() >> 8) & 0xff) as u8,
(body.len() & 0xff) as u8,
]);
record.extend_from_slice(&body);
record
}
fn append_ext(out: &mut Vec<u8>, etype: u16, data: &[u8]) {
out.extend_from_slice(&etype.to_be_bytes());
out.extend_from_slice(&(data.len() as u16).to_be_bytes());
out.extend_from_slice(data);
}
#[test]
fn ja3_and_ja4_ignore_grease_and_remain_stable() {
let fp = fingerprint_client_hello(&sample_client_hello())
.expect("sample ClientHello must fingerprint");
assert_eq!(
fp.ja3_raw,
"771,4865-4866-49199-255,0-16-10-11-13-43,23-29,0"
);
assert!(fp.ja4.starts_with("t13d0406h2_"));
}
#[test]
fn malformed_client_hello_returns_none() {
let mut hello = sample_client_hello();
hello.truncate(12);
assert!(fingerprint_client_hello(&hello).is_none());
}
}

View File

@@ -98,6 +98,7 @@ use crate::error::{HandshakeResult, ProxyError, Result, StreamError};
use crate::ip_tracker::UserIpTracker; use crate::ip_tracker::UserIpTracker;
use crate::protocol::constants::*; use crate::protocol::constants::*;
use crate::protocol::tls; use crate::protocol::tls;
use crate::protocol::tls_fingerprint::{self, TlsClientFingerprint};
use crate::stats::beobachten::BeobachtenStore; use crate::stats::beobachten::BeobachtenStore;
use crate::stats::{ReplayChecker, Stats}; use crate::stats::{ReplayChecker, Stats};
use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
@@ -350,6 +351,60 @@ fn record_beobachten_class(
beobachten.record(class, peer_ip, beobachten_ttl(config)); beobachten.record(class, peer_ip, beobachten_ttl(config));
} }
fn tls_fingerprint_collection_enabled(config: &ProxyConfig) -> bool {
config.general.beobachten || config.server.api.runtime_edge_enabled
}
fn observe_tls_client_fingerprint(
stats: &Stats,
config: &ProxyConfig,
peer_ip: IpAddr,
handshake: &[u8],
) -> Option<TlsClientFingerprint> {
if !tls_fingerprint_collection_enabled(config) {
return None;
}
match tls_fingerprint::fingerprint_client_hello(handshake) {
Some(fingerprint) => {
stats.record_tls_fingerprint_observed(&fingerprint, peer_ip, beobachten_ttl(config));
Some(fingerprint)
}
None => {
stats.increment_tls_fingerprint_parse_error();
None
}
}
}
fn record_tls_fingerprint_auth_success(
stats: &Stats,
config: &ProxyConfig,
peer_ip: IpAddr,
fingerprint: Option<&TlsClientFingerprint>,
user: &str,
) {
if let Some(fingerprint) = fingerprint {
stats.record_tls_fingerprint_auth_success(
fingerprint,
peer_ip,
user,
beobachten_ttl(config),
);
}
}
fn record_tls_fingerprint_bad_or_probe(
stats: &Stats,
config: &ProxyConfig,
peer_ip: IpAddr,
fingerprint: Option<&TlsClientFingerprint>,
) {
if let Some(fingerprint) = fingerprint {
stats.record_tls_fingerprint_bad_or_probe(fingerprint, peer_ip, beobachten_ttl(config));
}
}
fn classify_expected_64_got_0(kind: std::io::ErrorKind) -> Option<&'static str> { fn classify_expected_64_got_0(kind: std::io::ErrorKind) -> Option<&'static str> {
match kind { match kind {
std::io::ErrorKind::UnexpectedEof => Some("expected_64_got_0_unexpected_eof"), std::io::ErrorKind::UnexpectedEof => Some("expected_64_got_0_unexpected_eof"),
@@ -705,6 +760,9 @@ where
)); ));
} }
let tls_fingerprint =
observe_tls_client_fingerprint(stats.as_ref(), &config, real_peer.ip(), &handshake);
let (read_half, write_half) = tokio::io::split(stream); let (read_half, write_half) = tokio::io::split(stream);
let (mut tls_reader, tls_writer, tls_user) = match handle_tls_handshake_with_shared( let (mut tls_reader, tls_writer, tls_user) = match handle_tls_handshake_with_shared(
@@ -715,6 +773,12 @@ where
HandshakeResult::Success(result) => result, HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader, writer } => { HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad_with_class("tls_handshake_bad_client"); stats.increment_connects_bad_with_class("tls_handshake_bad_client");
record_tls_fingerprint_bad_or_probe(
stats.as_ref(),
&config,
real_peer.ip(),
tls_fingerprint.as_ref(),
);
return Ok(masking_outcome( return Ok(masking_outcome(
reader, reader,
writer, writer,
@@ -726,10 +790,23 @@ where
)); ));
} }
HandshakeResult::Error(e) => { HandshakeResult::Error(e) => {
record_tls_fingerprint_bad_or_probe(
stats.as_ref(),
&config,
real_peer.ip(),
tls_fingerprint.as_ref(),
);
increment_bad_on_unknown_tls_sni(stats.as_ref(), &e); increment_bad_on_unknown_tls_sni(stats.as_ref(), &e);
return Err(e); return Err(e);
} }
}; };
record_tls_fingerprint_auth_success(
stats.as_ref(),
&config,
real_peer.ip(),
tls_fingerprint.as_ref(),
tls_user.as_str(),
);
debug!(peer = %peer, "Reading MTProto handshake through TLS"); debug!(peer = %peer, "Reading MTProto handshake through TLS");
let mtproto_data = tls_reader.read_exact(HANDSHAKE_LEN).await?; let mtproto_data = tls_reader.read_exact(HANDSHAKE_LEN).await?;
@@ -1295,6 +1372,13 @@ impl RunningClientHandler {
)); ));
} }
let tls_fingerprint = observe_tls_client_fingerprint(
self.stats.as_ref(),
&self.config,
peer.ip(),
&handshake,
);
let config = self.config.clone(); let config = self.config.clone();
let replay_checker = self.replay_checker.clone(); let replay_checker = self.replay_checker.clone();
let stats = self.stats.clone(); let stats = self.stats.clone();
@@ -1318,6 +1402,12 @@ impl RunningClientHandler {
HandshakeResult::Success(result) => result, HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader, writer } => { HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad_with_class("tls_handshake_bad_client"); stats.increment_connects_bad_with_class("tls_handshake_bad_client");
record_tls_fingerprint_bad_or_probe(
stats.as_ref(),
&config,
peer.ip(),
tls_fingerprint.as_ref(),
);
return Ok(masking_outcome( return Ok(masking_outcome(
reader, reader,
writer, writer,
@@ -1329,10 +1419,23 @@ impl RunningClientHandler {
)); ));
} }
HandshakeResult::Error(e) => { HandshakeResult::Error(e) => {
record_tls_fingerprint_bad_or_probe(
stats.as_ref(),
&config,
peer.ip(),
tls_fingerprint.as_ref(),
);
increment_bad_on_unknown_tls_sni(stats.as_ref(), &e); increment_bad_on_unknown_tls_sni(stats.as_ref(), &e);
return Err(e); return Err(e);
} }
}; };
record_tls_fingerprint_auth_success(
stats.as_ref(),
&config,
peer.ip(),
tls_fingerprint.as_ref(),
tls_user.as_str(),
);
debug!(peer = %peer, "Reading MTProto handshake through TLS"); debug!(peer = %peer, "Reading MTProto handshake through TLS");
let mtproto_data = tls_reader.read_exact(HANDSHAKE_LEN).await?; let mtproto_data = tls_reader.read_exact(HANDSHAKE_LEN).await?;

View File

@@ -10,6 +10,7 @@ mod me_counters;
mod me_getters; mod me_getters;
mod replay; mod replay;
pub mod telemetry; pub mod telemetry;
pub mod tls_fingerprints;
mod users; mod users;
mod writer_counters; mod writer_counters;
@@ -22,6 +23,7 @@ use std::time::Instant;
#[allow(unused_imports)] #[allow(unused_imports)]
pub use self::replay::{ReplayChecker, ReplayStats}; pub use self::replay::{ReplayChecker, ReplayStats};
use self::telemetry::TelemetryPolicy; use self::telemetry::TelemetryPolicy;
pub use self::tls_fingerprints::TlsFingerprintSnapshotRow;
use crate::config::MeWriterPickMode; use crate::config::MeWriterPickMode;
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
@@ -333,6 +335,7 @@ pub struct Stats {
telemetry_user_enabled: AtomicBool, telemetry_user_enabled: AtomicBool,
telemetry_me_level: AtomicU8, telemetry_me_level: AtomicU8,
cached_epoch_secs: AtomicU64, cached_epoch_secs: AtomicU64,
tls_fingerprints: tls_fingerprints::TlsFingerprintCollector,
user_stats: DashMap<String, Arc<UserStats>>, user_stats: DashMap<String, Arc<UserStats>>,
user_stats_last_cleanup_epoch_secs: AtomicU64, user_stats_last_cleanup_epoch_secs: AtomicU64,
start_time: parking_lot::RwLock<Option<Instant>>, start_time: parking_lot::RwLock<Option<Instant>>,

View File

@@ -0,0 +1,556 @@
//! Bounded TLS JA3/JA4 fingerprint aggregation.
use std::cmp::Reverse;
use std::hash::Hash;
use std::net::{IpAddr, Ipv6Addr};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use dashmap::DashMap;
use dashmap::mapref::entry::Entry;
use crate::protocol::tls_fingerprint::TlsClientFingerprint;
use super::Stats;
const CLEANUP_INTERVAL_SECS: u64 = 30;
const MAX_TLS_FINGERPRINT_BUCKETS: usize = 65_536;
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum TlsFingerprintScopeKind {
Fingerprint,
Ip,
Cidr,
User,
}
#[derive(Clone, Debug)]
pub struct TlsFingerprintSnapshotRow {
pub scope_key: String,
pub ja3: String,
pub ja3_raw: String,
pub ja4: String,
pub ja4_raw: String,
pub total: u64,
pub auth_success: u64,
pub bad_or_probe: u64,
pub first_seen_epoch_secs: u64,
pub last_seen_epoch_secs: u64,
}
#[derive(Clone, Debug)]
pub struct TlsFingerprintSnapshot {
pub retention_secs: u64,
pub capacity: usize,
pub dropped_total: u64,
pub parse_error_total: u64,
pub by_fingerprint: Vec<TlsFingerprintSnapshotRow>,
pub by_ip: Vec<TlsFingerprintSnapshotRow>,
pub by_cidr: Vec<TlsFingerprintSnapshotRow>,
pub by_user: Vec<TlsFingerprintSnapshotRow>,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct TlsFingerprintKey {
scope_kind: TlsFingerprintScopeKind,
scope_key: String,
ja3: String,
ja3_raw: String,
ja4: String,
ja4_raw: String,
}
struct TlsFingerprintEntry {
first_seen_epoch_secs: AtomicU64,
last_seen_epoch_secs: AtomicU64,
total: AtomicU64,
auth_success: AtomicU64,
bad_or_probe: AtomicU64,
}
#[derive(Default)]
pub struct TlsFingerprintCollector {
entries: DashMap<TlsFingerprintKey, TlsFingerprintEntry>,
dropped_total: AtomicU64,
parse_error_total: AtomicU64,
last_cleanup_epoch_secs: AtomicU64,
}
impl TlsFingerprintCollector {
pub fn record_observed(
&self,
fingerprint: &TlsClientFingerprint,
peer_ip: IpAddr,
ttl: Duration,
) {
if ttl.is_zero() {
return;
}
let now = now_epoch_secs();
self.cleanup_if_needed(now, ttl.as_secs());
self.record_scoped(
scope_key(TlsFingerprintScopeKind::Fingerprint, ""),
fingerprint,
now,
true,
false,
false,
);
self.record_scoped(
scope_key(TlsFingerprintScopeKind::Ip, &peer_ip.to_string()),
fingerprint,
now,
true,
false,
false,
);
self.record_scoped(
scope_key(TlsFingerprintScopeKind::Cidr, &cidr_bucket(peer_ip)),
fingerprint,
now,
true,
false,
false,
);
}
pub fn record_auth_success(
&self,
fingerprint: &TlsClientFingerprint,
peer_ip: IpAddr,
user: &str,
ttl: Duration,
) {
if ttl.is_zero() || user.is_empty() {
return;
}
let now = now_epoch_secs();
self.cleanup_if_needed(now, ttl.as_secs());
self.record_scoped(
scope_key(TlsFingerprintScopeKind::Fingerprint, ""),
fingerprint,
now,
false,
true,
false,
);
self.record_scoped(
scope_key(TlsFingerprintScopeKind::Ip, &peer_ip.to_string()),
fingerprint,
now,
false,
true,
false,
);
self.record_scoped(
scope_key(TlsFingerprintScopeKind::Cidr, &cidr_bucket(peer_ip)),
fingerprint,
now,
false,
true,
false,
);
self.record_scoped(
scope_key(TlsFingerprintScopeKind::User, user),
fingerprint,
now,
true,
true,
false,
);
}
pub fn record_bad_or_probe(
&self,
fingerprint: &TlsClientFingerprint,
peer_ip: IpAddr,
ttl: Duration,
) {
if ttl.is_zero() {
return;
}
let now = now_epoch_secs();
self.cleanup_if_needed(now, ttl.as_secs());
self.record_scoped(
scope_key(TlsFingerprintScopeKind::Fingerprint, ""),
fingerprint,
now,
false,
false,
true,
);
self.record_scoped(
scope_key(TlsFingerprintScopeKind::Ip, &peer_ip.to_string()),
fingerprint,
now,
false,
false,
true,
);
self.record_scoped(
scope_key(TlsFingerprintScopeKind::Cidr, &cidr_bucket(peer_ip)),
fingerprint,
now,
false,
false,
true,
);
}
pub fn increment_parse_error(&self) {
self.parse_error_total.fetch_add(1, Ordering::Relaxed);
}
pub fn snapshot(&self, ttl: Duration, limit: usize) -> TlsFingerprintSnapshot {
let now = now_epoch_secs();
self.cleanup(now, ttl.as_secs());
let limit = limit.clamp(1, 1000);
let mut by_fingerprint = Vec::new();
let mut by_ip = Vec::new();
let mut by_cidr = Vec::new();
let mut by_user = Vec::new();
for entry in self.entries.iter() {
let row = snapshot_row(entry.key(), entry.value());
match entry.key().scope_kind {
TlsFingerprintScopeKind::Fingerprint => by_fingerprint.push(row),
TlsFingerprintScopeKind::Ip => by_ip.push(row),
TlsFingerprintScopeKind::Cidr => by_cidr.push(row),
TlsFingerprintScopeKind::User => by_user.push(row),
}
}
sort_and_truncate(&mut by_fingerprint, limit);
sort_and_truncate(&mut by_ip, limit);
sort_and_truncate(&mut by_cidr, limit);
sort_and_truncate(&mut by_user, limit);
TlsFingerprintSnapshot {
retention_secs: ttl.as_secs(),
capacity: MAX_TLS_FINGERPRINT_BUCKETS,
dropped_total: self.dropped_total.load(Ordering::Relaxed),
parse_error_total: self.parse_error_total.load(Ordering::Relaxed),
by_fingerprint,
by_ip,
by_cidr,
by_user,
}
}
pub fn snapshot_text(&self, ttl: Duration, limit: usize) -> String {
let snapshot = self.snapshot(ttl, limit);
if snapshot.by_fingerprint.is_empty()
&& snapshot.by_ip.is_empty()
&& snapshot.by_cidr.is_empty()
&& snapshot.by_user.is_empty()
{
return String::new();
}
let mut out = String::new();
out.push_str("[tls_fingerprints]\n");
out.push_str(&format!(
"retention_secs={} capacity={} dropped_total={} parse_error_total={}\n",
snapshot.retention_secs,
snapshot.capacity,
snapshot.dropped_total,
snapshot.parse_error_total
));
append_rows(
&mut out,
"tls_fingerprints.by_fingerprint",
&snapshot.by_fingerprint,
);
append_rows(&mut out, "tls_fingerprints.by_ip", &snapshot.by_ip);
append_rows(&mut out, "tls_fingerprints.by_cidr", &snapshot.by_cidr);
append_rows(&mut out, "tls_fingerprints.by_user", &snapshot.by_user);
out
}
fn record_scoped(
&self,
scope: (TlsFingerprintScopeKind, String),
fingerprint: &TlsClientFingerprint,
now_epoch_secs: u64,
count_total: bool,
count_auth_success: bool,
count_bad_or_probe: bool,
) {
let key = TlsFingerprintKey {
scope_kind: scope.0,
scope_key: scope.1,
ja3: fingerprint.ja3.clone(),
ja3_raw: fingerprint.ja3_raw.clone(),
ja4: fingerprint.ja4.clone(),
ja4_raw: fingerprint.ja4_raw.clone(),
};
if let Some(entry) = self.entries.get(&key) {
update_entry(
entry.value(),
now_epoch_secs,
count_total,
count_auth_success,
count_bad_or_probe,
);
return;
}
if self.entries.len() >= MAX_TLS_FINGERPRINT_BUCKETS {
self.dropped_total.fetch_add(1, Ordering::Relaxed);
return;
}
match self.entries.entry(key) {
Entry::Occupied(entry) => {
update_entry(
entry.get(),
now_epoch_secs,
count_total,
count_auth_success,
count_bad_or_probe,
);
}
Entry::Vacant(entry) => {
entry.insert(TlsFingerprintEntry::new(
now_epoch_secs,
if count_total { 1 } else { 0 },
if count_auth_success { 1 } else { 0 },
if count_bad_or_probe { 1 } else { 0 },
));
}
}
}
fn cleanup_if_needed(&self, now_epoch_secs: u64, ttl_secs: u64) {
let last = self.last_cleanup_epoch_secs.load(Ordering::Relaxed);
if now_epoch_secs.saturating_sub(last) < CLEANUP_INTERVAL_SECS {
return;
}
if self
.last_cleanup_epoch_secs
.compare_exchange(last, now_epoch_secs, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
return;
}
self.cleanup(now_epoch_secs, ttl_secs);
}
fn cleanup(&self, now_epoch_secs: u64, ttl_secs: u64) {
if ttl_secs == 0 {
self.entries.clear();
return;
}
self.entries.retain(|_, entry| {
let last_seen = entry.last_seen_epoch_secs.load(Ordering::Relaxed);
now_epoch_secs.saturating_sub(last_seen) <= ttl_secs
});
}
}
impl TlsFingerprintEntry {
fn new(now_epoch_secs: u64, total: u64, auth_success: u64, bad_or_probe: u64) -> Self {
Self {
first_seen_epoch_secs: AtomicU64::new(now_epoch_secs),
last_seen_epoch_secs: AtomicU64::new(now_epoch_secs),
total: AtomicU64::new(total),
auth_success: AtomicU64::new(auth_success),
bad_or_probe: AtomicU64::new(bad_or_probe),
}
}
}
fn update_entry(
entry: &TlsFingerprintEntry,
now_epoch_secs: u64,
count_total: bool,
count_auth_success: bool,
count_bad_or_probe: bool,
) {
entry
.last_seen_epoch_secs
.store(now_epoch_secs, Ordering::Relaxed);
if count_total {
entry.total.fetch_add(1, Ordering::Relaxed);
}
if count_auth_success {
entry.auth_success.fetch_add(1, Ordering::Relaxed);
}
if count_bad_or_probe {
entry.bad_or_probe.fetch_add(1, Ordering::Relaxed);
}
}
fn snapshot_row(key: &TlsFingerprintKey, entry: &TlsFingerprintEntry) -> TlsFingerprintSnapshotRow {
TlsFingerprintSnapshotRow {
scope_key: key.scope_key.clone(),
ja3: key.ja3.clone(),
ja3_raw: key.ja3_raw.clone(),
ja4: key.ja4.clone(),
ja4_raw: key.ja4_raw.clone(),
total: entry.total.load(Ordering::Relaxed),
auth_success: entry.auth_success.load(Ordering::Relaxed),
bad_or_probe: entry.bad_or_probe.load(Ordering::Relaxed),
first_seen_epoch_secs: entry.first_seen_epoch_secs.load(Ordering::Relaxed),
last_seen_epoch_secs: entry.last_seen_epoch_secs.load(Ordering::Relaxed),
}
}
fn sort_and_truncate(rows: &mut Vec<TlsFingerprintSnapshotRow>, limit: usize) {
rows.sort_by_key(|row| {
(
Reverse(row.total),
row.scope_key.clone(),
row.ja4.clone(),
row.ja3.clone(),
)
});
rows.truncate(limit);
}
fn append_rows(out: &mut String, section: &str, rows: &[TlsFingerprintSnapshotRow]) {
if rows.is_empty() {
return;
}
out.push('[');
out.push_str(section);
out.push_str("]\n");
for row in rows {
if row.scope_key.is_empty() {
out.push_str(&format!(
"ja4={} ja3={} total={} auth_success={} bad_or_probe={} first_seen={} last_seen={}\n",
row.ja4,
row.ja3,
row.total,
row.auth_success,
row.bad_or_probe,
row.first_seen_epoch_secs,
row.last_seen_epoch_secs
));
} else {
out.push_str(&format!(
"scope={} ja4={} ja3={} total={} auth_success={} bad_or_probe={} first_seen={} last_seen={}\n",
row.scope_key,
row.ja4,
row.ja3,
row.total,
row.auth_success,
row.bad_or_probe,
row.first_seen_epoch_secs,
row.last_seen_epoch_secs
));
}
}
}
fn scope_key(kind: TlsFingerprintScopeKind, key: &str) -> (TlsFingerprintScopeKind, String) {
(kind, key.to_string())
}
fn cidr_bucket(ip: IpAddr) -> String {
match ip {
IpAddr::V4(ip) => {
let [a, b, c, _] = ip.octets();
format!("{a}.{b}.{c}.0/24")
}
IpAddr::V6(ip) => {
let mut octets = ip.octets();
for byte in &mut octets[7..] {
*byte = 0;
}
format!("{}/56", Ipv6Addr::from(octets))
}
}
}
fn now_epoch_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
impl Stats {
pub fn record_tls_fingerprint_observed(
&self,
fingerprint: &TlsClientFingerprint,
peer_ip: IpAddr,
ttl: Duration,
) {
if self.telemetry_core_enabled() {
self.tls_fingerprints
.record_observed(fingerprint, peer_ip, ttl);
}
}
pub fn record_tls_fingerprint_auth_success(
&self,
fingerprint: &TlsClientFingerprint,
peer_ip: IpAddr,
user: &str,
ttl: Duration,
) {
if self.telemetry_core_enabled() {
self.tls_fingerprints
.record_auth_success(fingerprint, peer_ip, user, ttl);
}
}
pub fn record_tls_fingerprint_bad_or_probe(
&self,
fingerprint: &TlsClientFingerprint,
peer_ip: IpAddr,
ttl: Duration,
) {
if self.telemetry_core_enabled() {
self.tls_fingerprints
.record_bad_or_probe(fingerprint, peer_ip, ttl);
}
}
pub fn increment_tls_fingerprint_parse_error(&self) {
if self.telemetry_core_enabled() {
self.tls_fingerprints.increment_parse_error();
}
}
pub fn tls_fingerprint_snapshot(&self, ttl: Duration, limit: usize) -> TlsFingerprintSnapshot {
self.tls_fingerprints.snapshot(ttl, limit)
}
pub fn tls_fingerprint_snapshot_text(&self, ttl: Duration, limit: usize) -> String {
self.tls_fingerprints.snapshot_text(ttl, limit)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn fp() -> TlsClientFingerprint {
TlsClientFingerprint {
ja3: "ja3".to_string(),
ja3_raw: "771,4865,,,0".to_string(),
ja4: "t13d010100_hash_hash".to_string(),
ja4_raw: "raw".to_string(),
}
}
#[test]
fn aggregates_ip_cidr_and_user_scopes() {
let collector = TlsFingerprintCollector::default();
let ip: IpAddr = "192.0.2.15".parse().expect("test IP parses");
collector.record_observed(&fp(), ip, Duration::from_secs(60));
collector.record_auth_success(&fp(), ip, "alice", Duration::from_secs(60));
let snapshot = collector.snapshot(Duration::from_secs(60), 10);
assert_eq!(snapshot.by_fingerprint[0].total, 1);
assert_eq!(snapshot.by_fingerprint[0].auth_success, 1);
assert_eq!(snapshot.by_ip[0].scope_key, "192.0.2.15");
assert_eq!(snapshot.by_cidr[0].scope_key, "192.0.2.0/24");
assert_eq!(snapshot.by_user[0].scope_key, "alice");
assert_eq!(snapshot.by_user[0].total, 1);
}
}