From 065cf21c66dd02bdece163f7b1f77f13a6065944 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Mon, 23 Feb 2026 05:55:17 +0300 Subject: [PATCH] Update tlsearch.py --- tools/tlsearch.py | 230 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 193 insertions(+), 37 deletions(-) diff --git a/tools/tlsearch.py b/tools/tlsearch.py index dd4a9bd..32b42c7 100644 --- a/tools/tlsearch.py +++ b/tools/tlsearch.py @@ -1,11 +1,12 @@ #!/usr/bin/env python3 """ -TLS Profile JSON Decoder +TLS Profile Inspector Usage: - python3 tlsfront/decode_profiles.py - python3 tlsfront/decode_profiles.py tlsfront - python3 tlsfront/decode_profiles.py tlsfront/petrovich.ru.json + python3 tools/tlsearch.py + python3 tools/tlsearch.py tlsfront + python3 tools/tlsearch.py tlsfront/petrovich.ru.json + python3 tools/tlsearch.py tlsfront --only-current """ from __future__ import annotations @@ -13,8 +14,9 @@ from __future__ import annotations import argparse import datetime as dt import json +from dataclasses import dataclass from pathlib import Path -from typing import Any +from typing import Any, Iterable TLS_VERSIONS = { @@ -65,32 +67,64 @@ NAMED_GROUPS = { } -def to_hex(data: list[int]) -> str: +@dataclass +class ProfileRecognition: + schema: str + mode: str + has_cert_info: bool + has_full_cert_payload: bool + cert_message_len: int + cert_chain_count: int + cert_chain_total_len: int + issues: list[str] + + +def to_hex(data: Iterable[int]) -> str: return "".join(f"{b:02x}" for b in data) -def u16be(data: list[int], off: int = 0) -> int: +def read_u16be(data: list[int], off: int = 0) -> int: return (data[off] << 8) | data[off + 1] +def normalize_u8_list(value: Any) -> list[int]: + if not isinstance(value, list): + return [] + out: list[int] = [] + for item in value: + if isinstance(item, int) and 0 <= item <= 0xFF: + out.append(item) + else: + return [] + return out + + +def as_dict(value: Any) -> dict[str, Any]: + return value if isinstance(value, dict) else {} + + +def as_int(value: Any, default: int = 0) -> int: + return value if isinstance(value, int) else default + + def decode_version_pair(v: list[int]) -> str: if len(v) != 2: return f"invalid({v})" - ver = u16be(v) + ver = read_u16be(v) return f"0x{ver:04x} ({TLS_VERSIONS.get(ver, 'unknown')})" def decode_cipher_suite(v: list[int]) -> str: if len(v) != 2: return f"invalid({v})" - cs = u16be(v) + cs = read_u16be(v) name = CIPHER_NAMES.get(cs, "unknown") return f"0x{cs:04x} ({name})" def decode_supported_versions(data: list[int]) -> str: if len(data) == 2: - ver = u16be(data) + ver = read_u16be(data) return f"selected=0x{ver:04x} ({TLS_VERSIONS.get(ver, 'unknown')})" if not data: return "empty" @@ -101,7 +135,7 @@ def decode_supported_versions(data: list[int]) -> str: for i in range(1, min(1 + vec_len, len(data)), 2): if i + 1 >= len(data): break - ver = u16be(data, i) + ver = read_u16be(data, i) versions.append(f"0x{ver:04x}({TLS_VERSIONS.get(ver, 'unknown')})") return "offered=[" + ", ".join(versions) + "]" @@ -109,8 +143,8 @@ def decode_supported_versions(data: list[int]) -> str: def decode_key_share(data: list[int]) -> str: if len(data) < 4: return f"raw={to_hex(data)}" - group = u16be(data, 0) - key_len = u16be(data, 2) + group = read_u16be(data, 0) + key_len = read_u16be(data, 2) key_hex = to_hex(data[4 : 4 + min(key_len, len(data) - 4)]) gname = NAMED_GROUPS.get(group, "unknown_group") return f"group=0x{group:04x}({gname}), key_len={key_len}, key={key_hex}" @@ -119,7 +153,7 @@ def decode_key_share(data: list[int]) -> str: def decode_alpn(data: list[int]) -> str: if len(data) < 3: return f"raw={to_hex(data)}" - total = u16be(data, 0) + total = read_u16be(data, 0) pos = 2 vals: list[str] = [] limit = min(len(data), 2 + total) @@ -147,57 +181,166 @@ def decode_extension(ext_type: int, data: list[int]) -> str: return f"raw={to_hex(data)}" -def ts_to_iso(ts: int | None) -> str: - if ts is None: +def ts_to_iso(ts: Any) -> str: + if not isinstance(ts, int): return "-" return dt.datetime.fromtimestamp(ts, tz=dt.timezone.utc).isoformat() -def decode_profile(path: Path) -> str: +def recognize_profile(obj: dict[str, Any]) -> ProfileRecognition: + issues: list[str] = [] + + sh = as_dict(obj.get("server_hello_template")) + if not sh: + issues.append("missing server_hello_template") + + version = normalize_u8_list(sh.get("version")) + if version and len(version) != 2: + issues.append("server_hello_template.version must have 2 bytes") + + app_sizes = obj.get("app_data_records_sizes") + if not isinstance(app_sizes, list) or not app_sizes: + issues.append("missing app_data_records_sizes") + elif any((not isinstance(v, int) or v <= 0) for v in app_sizes): + issues.append("app_data_records_sizes contains invalid values") + + if not isinstance(obj.get("total_app_data_len"), int): + issues.append("missing total_app_data_len") + + cert_info = as_dict(obj.get("cert_info")) + has_cert_info = bool( + cert_info.get("subject_cn") + or cert_info.get("issuer_cn") + or cert_info.get("san_names") + or isinstance(cert_info.get("not_before_unix"), int) + or isinstance(cert_info.get("not_after_unix"), int) + ) + + cert_payload = as_dict(obj.get("cert_payload")) + cert_message_len = 0 + cert_chain_count = 0 + cert_chain_total_len = 0 + has_full_cert_payload = False + + if cert_payload: + cert_msg = normalize_u8_list(cert_payload.get("certificate_message")) + if not cert_msg: + issues.append("cert_payload.certificate_message is missing or invalid") + else: + cert_message_len = len(cert_msg) + + chain_raw = cert_payload.get("cert_chain_der") + if not isinstance(chain_raw, list): + issues.append("cert_payload.cert_chain_der is missing or invalid") + else: + for entry in chain_raw: + cert = normalize_u8_list(entry) + if cert: + cert_chain_count += 1 + cert_chain_total_len += len(cert) + else: + issues.append("cert_payload.cert_chain_der has invalid certificate entry") + break + + has_full_cert_payload = cert_message_len > 0 and cert_chain_count > 0 + elif obj.get("cert_payload") is not None: + issues.append("cert_payload is not an object") + + if has_full_cert_payload: + schema = "current" + mode = "full-cert-payload" + elif has_cert_info: + schema = "current-compact" + mode = "compact-cert-info" + else: + schema = "legacy" + mode = "random-fallback" + + if issues: + schema = f"{schema}+issues" + + return ProfileRecognition( + schema=schema, + mode=mode, + has_cert_info=has_cert_info, + has_full_cert_payload=has_full_cert_payload, + cert_message_len=cert_message_len, + cert_chain_count=cert_chain_count, + cert_chain_total_len=cert_chain_total_len, + issues=issues, + ) + + +def decode_profile(path: Path) -> tuple[str, ProfileRecognition]: obj: dict[str, Any] = json.loads(path.read_text(encoding="utf-8")) - sh = obj.get("server_hello_template", {}) + recognition = recognize_profile(obj) + + sh = as_dict(obj.get("server_hello_template")) + version = normalize_u8_list(sh.get("version")) + cipher = normalize_u8_list(sh.get("cipher_suite")) + random_bytes = normalize_u8_list(sh.get("random")) + session_id = normalize_u8_list(sh.get("session_id")) lines: list[str] = [] lines.append(f"[{path.name}]") lines.append(f" domain: {obj.get('domain', '-')}") - lines.append(f" tls.version: {decode_version_pair(sh.get('version', []))}") - lines.append(f" tls.cipher: {decode_cipher_suite(sh.get('cipher_suite', []))}") + lines.append(f" profile.schema: {recognition.schema}") + lines.append(f" profile.mode: {recognition.mode}") + lines.append(f" profile.has_full_cert_payload: {recognition.has_full_cert_payload}") + lines.append(f" profile.has_cert_info: {recognition.has_cert_info}") + if recognition.has_full_cert_payload: + lines.append(f" profile.cert_message_len: {recognition.cert_message_len}") + lines.append(f" profile.cert_chain_count: {recognition.cert_chain_count}") + lines.append(f" profile.cert_chain_total_len: {recognition.cert_chain_total_len}") + if recognition.issues: + lines.append(" profile.issues:") + for issue in recognition.issues: + lines.append(f" - {issue}") + + lines.append(f" tls.version: {decode_version_pair(version)}") + lines.append(f" tls.cipher: {decode_cipher_suite(cipher)}") lines.append(f" tls.compression: {sh.get('compression', '-')}") - lines.append(f" tls.random: {to_hex(sh.get('random', []))}") - session_id = sh.get("session_id", []) + lines.append(f" tls.random: {to_hex(random_bytes)}") lines.append(f" tls.session_id_len: {len(session_id)}") if session_id: lines.append(f" tls.session_id: {to_hex(session_id)}") - lines.append( - " app_data_records_sizes: " - + ", ".join(str(v) for v in obj.get("app_data_records_sizes", [])) - ) + + app_sizes = obj.get("app_data_records_sizes", []) + if isinstance(app_sizes, list): + lines.append(" app_data_records_sizes: " + ", ".join(str(v) for v in app_sizes)) + else: + lines.append(" app_data_records_sizes: -") lines.append(f" total_app_data_len: {obj.get('total_app_data_len', '-')}") - cert = obj.get("cert_info") + cert = as_dict(obj.get("cert_info")) if cert: lines.append(" cert_info:") lines.append(f" subject_cn: {cert.get('subject_cn') or '-'}") lines.append(f" issuer_cn: {cert.get('issuer_cn') or '-'}") lines.append(f" not_before: {ts_to_iso(cert.get('not_before_unix'))}") lines.append(f" not_after: {ts_to_iso(cert.get('not_after_unix'))}") - sans = cert.get("san_names") or [] - lines.append(" san_names: " + (", ".join(sans) if sans else "-")) + sans = cert.get("san_names") + if isinstance(sans, list) and sans: + lines.append(" san_names: " + ", ".join(str(v) for v in sans)) + else: + lines.append(" san_names: -") else: lines.append(" cert_info: -") exts = sh.get("extensions", []) + if not isinstance(exts, list): + exts = [] lines.append(f" extensions[{len(exts)}]:") for ext in exts: - ext_type = int(ext.get("ext_type", -1)) - data = ext.get("data", []) + ext_obj = as_dict(ext) + ext_type = as_int(ext_obj.get("ext_type"), -1) + data = normalize_u8_list(ext_obj.get("data")) name = EXT_NAMES.get(ext_type, "unknown") decoded = decode_extension(ext_type, data) - lines.append( - f" - type={ext_type} ({name}), len={len(data)}: {decoded}" - ) + lines.append(f" - type={ext_type} ({name}), len={len(data)}: {decoded}") + lines.append("") - return "\n".join(lines) + return ("\n".join(lines), recognition) def collect_files(input_path: Path) -> list[Path]: @@ -208,7 +351,7 @@ def collect_files(input_path: Path) -> list[Path]: def main() -> int: parser = argparse.ArgumentParser( - description="Decode tlsfront profile JSON files into readable form." + description="Decode TLS profile JSON files and recognize current schema." ) parser.add_argument( "path", @@ -216,6 +359,11 @@ def main() -> int: default="tlsfront", help="Path to tlsfront directory or a single JSON file.", ) + parser.add_argument( + "--only-current", + action="store_true", + help="Show only profiles recognized as current/full-cert-payload.", + ) args = parser.parse_args() base = Path(args.path) @@ -228,11 +376,19 @@ def main() -> int: print(f"No JSON files found in: {base}") return 1 + printed = 0 for path in files: try: - print(decode_profile(path), end="") + rendered, recognition = decode_profile(path) + if args.only_current and recognition.schema != "current": + continue + print(rendered, end="") + printed += 1 except Exception as e: # noqa: BLE001 print(f"[{path.name}] decode error: {e}\n") + + if args.only_current and printed == 0: + print("No current profiles found.") return 0