Compare commits

..

31 Commits
3.0.5 ... 3.0.6

Author SHA1 Message Date
Alexey
7da062e448 Merge pull request #188 from telemt/main-stage
From staging #185 + #186 -> main
2026-02-20 18:04:58 +03:00
Alexey
1fd78e012d Metrics + Fixes in tests 2026-02-20 18:02:02 +03:00
Alexey
7304dacd60 Update main.rs 2026-02-20 17:42:20 +03:00
Alexey
3bff0629ca Merge pull request #187 from artemws/patch-1
Update metrics whitelist in README
2026-02-20 17:26:50 +03:00
Alexey
a79f0bbaf5 Merge pull request #186 from telemt/flow
TLS-F + PROXY Protocol Fixes
2026-02-20 17:25:06 +03:00
artemws
aa535bba0a Update metrics whitelist in README
Expanded metrics whitelist to include additional IP ranges.
2026-02-20 16:24:02 +02:00
Alexey
eb3245b78f Merge branch 'main-stage' into flow 2026-02-20 17:19:23 +03:00
Alexey
da84151e9f Merge pull request #184 from artemws/main
CIDR вместо обычного IP адреса metrics_whitelist
2026-02-20 17:15:54 +03:00
Alexey
a303fee65f ALPN Extract tests
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-20 17:12:16 +03:00
Alexey
bae811f8f1 Update Cargo.toml 2026-02-20 17:05:35 +03:00
artemws
8892860490 Change whitelist to use IpNetwork for IP filtering 2026-02-20 16:04:21 +02:00
artemws
0d2958fea7 Change metrics whitelist to use IpNetwork 2026-02-20 16:03:57 +02:00
artemws
dbd9b53940 Change metrics_whitelist type from Vec<IpAddr> to Vec<IpNetwork> 2026-02-20 16:03:38 +02:00
artemws
8f1f051a54 Add ipnetwork dependency to Cargo.toml 2026-02-20 16:03:03 +02:00
Alexey
471c680def TLS Improvements
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-20 17:02:17 +03:00
Alexey
be8742a229 Merge pull request #183 from artemws/main
Config Reload-on-fly
2026-02-20 16:57:38 +03:00
Alexey
781947a08a TlsFrontCache + X509 Parser + GREASE Tolerance
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-20 16:56:33 +03:00
Alexey
b295712dbb Update Cargo.toml 2026-02-20 16:47:13 +03:00
Alexey
e8454ea370 HAProxy PROXY Protocol Fixes
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-20 16:42:40 +03:00
artemws
ea88a40c8f Add config path canonicalization
Canonicalize the config path to match notify events.
2026-02-20 15:37:44 +02:00
Alexey
2ea4c83d9d Normalize IP + Masking + TLS 2026-02-20 16:32:14 +03:00
artemws
953fab68c4 Refactor hot-reload mechanism to use notify crate
Updated hot-reload functionality to use notify crate for file watching and improved documentation.
2026-02-20 15:29:37 +02:00
artemws
0f6621d359 Refactor hot-reload watcher implementation 2026-02-20 15:29:20 +02:00
artemws
82bb93e8da Add notify dependency for macOS file events 2026-02-20 15:28:58 +02:00
artemws
25b18ab064 Enhance logging for hot reload configuration changes
Added detailed logging for various configuration changes during hot reload, including log level, ad tag, middle proxy pool size, and user access changes.
2026-02-20 14:50:37 +02:00
artemws
3e0dc91db6 Add PartialEq to AccessConfig struct 2026-02-20 14:37:00 +02:00
artemws
26270bc651 Specify types for config_rx in main.rs 2026-02-20 14:27:31 +02:00
Alexey
be2ec4b9b4 Update CONTRIBUTING.md 2026-02-20 15:22:18 +03:00
artemws
766806f5df Add hot_reload module to config 2026-02-20 14:19:04 +02:00
artemws
26cf6ff4fa Add files via upload 2026-02-20 14:18:30 +02:00
artemws
b8add81018 Implement hot-reload for config and log level
Added hot-reload functionality for configuration and log level.
2026-02-20 14:18:09 +02:00
18 changed files with 1013 additions and 106 deletions

View File

@@ -1,10 +1,11 @@
## Pull Requests - Rules
# Pull Requests - Rules
## General
- ONLY signed and verified commits
- ONLY from your name
- DO NOT commit with `codex` or `claude` as author/commiter
- PREFER `flow` branch for development, not `main`
### AI
## AI
We are not against modern tools, like AI, where you act as a principal or architect, but we consider it important:
- you really understand what you're doing

View File

@@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.0.5"
version = "3.0.6"
edition = "2024"
[dependencies]
@@ -30,6 +30,7 @@ nix = { version = "0.28", default-features = false, features = ["net"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
toml = "0.8"
x509-parser = "0.15"
# Utils
bytes = "1.9"
@@ -52,6 +53,8 @@ anyhow = "1.0"
# HTTP
reqwest = { version = "0.12", features = ["rustls-tls"], default-features = false }
notify = { version = "6", features = ["macos_fsevent"] }
ipnetwork = "0.20"
hyper = { version = "1", features = ["server", "http1"] }
hyper-util = { version = "0.1", features = ["tokio", "server-auto"] }
http-body-util = "0.1"

View File

@@ -250,7 +250,12 @@ listen_addr_ipv6 = "::"
# listen_unix_sock = "/var/run/telemt.sock" # Unix socket
# listen_unix_sock_perm = "0666" # Socket file permissions
# metrics_port = 9090
# metrics_whitelist = ["127.0.0.1", "::1"]
# metrics_whitelist = [
# "192.168.0.0/24",
# "172.16.0.0/12",
# "127.0.0.1/32",
# "::1/128"
#]
# Listen on multiple interfaces/IPs - IPv4
[[server.listeners]]

View File

@@ -1,5 +1,6 @@
use std::net::IpAddr;
use std::collections::HashMap;
use ipnetwork::IpNetwork;
use serde::Deserialize;
// Helper defaults kept private to the config module.
@@ -66,8 +67,11 @@ pub(crate) fn default_weight() -> u16 {
1
}
pub(crate) fn default_metrics_whitelist() -> Vec<IpAddr> {
vec!["127.0.0.1".parse().unwrap(), "::1".parse().unwrap()]
pub(crate) fn default_metrics_whitelist() -> Vec<IpNetwork> {
vec![
"127.0.0.1/32".parse().unwrap(),
"::1/128".parse().unwrap(),
]
}
pub(crate) fn default_prefer_4() -> u8 {

433
src/config/hot_reload.rs Normal file
View File

@@ -0,0 +1,433 @@
//! Hot-reload: watches the config file via inotify (Linux) / FSEvents (macOS)
//! / ReadDirectoryChangesW (Windows) using the `notify` crate.
//! SIGHUP is also supported on Unix as an additional manual trigger.
//!
//! # What can be reloaded without restart
//!
//! | Section | Field | Effect |
//! |-----------|-------------------------------|-----------------------------------|
//! | `general` | `log_level` | Filter updated via `log_level_tx` |
//! | `general` | `ad_tag` | Passed on next connection |
//! | `general` | `middle_proxy_pool_size` | Passed on next connection |
//! | `general` | `me_keepalive_*` | Passed on next connection |
//! | `access` | All user/quota fields | Effective immediately |
//!
//! Fields that require re-binding sockets (`server.port`, `censorship.*`,
//! `network.*`, `use_middle_proxy`) are **not** applied; a warning is emitted.
use std::net::IpAddr;
use std::path::PathBuf;
use std::sync::Arc;
use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher};
use tokio::sync::{mpsc, watch};
use tracing::{error, info, warn};
use crate::config::LogLevel;
use super::load::ProxyConfig;
// ── Hot fields ────────────────────────────────────────────────────────────────
/// Fields that are safe to swap without restarting listeners.
#[derive(Debug, Clone, PartialEq)]
pub struct HotFields {
pub log_level: LogLevel,
pub ad_tag: Option<String>,
pub middle_proxy_pool_size: usize,
pub me_keepalive_enabled: bool,
pub me_keepalive_interval_secs: u64,
pub me_keepalive_jitter_secs: u64,
pub me_keepalive_payload_random: bool,
pub access: crate::config::AccessConfig,
}
impl HotFields {
pub fn from_config(cfg: &ProxyConfig) -> Self {
Self {
log_level: cfg.general.log_level.clone(),
ad_tag: cfg.general.ad_tag.clone(),
middle_proxy_pool_size: cfg.general.middle_proxy_pool_size,
me_keepalive_enabled: cfg.general.me_keepalive_enabled,
me_keepalive_interval_secs: cfg.general.me_keepalive_interval_secs,
me_keepalive_jitter_secs: cfg.general.me_keepalive_jitter_secs,
me_keepalive_payload_random: cfg.general.me_keepalive_payload_random,
access: cfg.access.clone(),
}
}
}
// ── Helpers ───────────────────────────────────────────────────────────────────
/// Warn if any non-hot fields changed (require restart).
fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig) {
if old.server.port != new.server.port {
warn!(
"config reload: server.port changed ({} → {}); restart required",
old.server.port, new.server.port
);
}
if old.censorship.tls_domain != new.censorship.tls_domain {
warn!(
"config reload: censorship.tls_domain changed ('{}' → '{}'); restart required",
old.censorship.tls_domain, new.censorship.tls_domain
);
}
if old.network.ipv4 != new.network.ipv4 || old.network.ipv6 != new.network.ipv6 {
warn!("config reload: network.ipv4/ipv6 changed; restart required");
}
if old.general.use_middle_proxy != new.general.use_middle_proxy {
warn!("config reload: use_middle_proxy changed; restart required");
}
}
/// Resolve the public host for link generation — mirrors the logic in main.rs.
///
/// Priority:
/// 1. `[general.links] public_host` — explicit override in config
/// 2. `detected_ip_v4` — from STUN/interface probe at startup
/// 3. `detected_ip_v6` — fallback
/// 4. `"UNKNOWN"` — warn the user to set `public_host`
fn resolve_link_host(
cfg: &ProxyConfig,
detected_ip_v4: Option<IpAddr>,
detected_ip_v6: Option<IpAddr>,
) -> String {
if let Some(ref h) = cfg.general.links.public_host {
return h.clone();
}
detected_ip_v4
.or(detected_ip_v6)
.map(|ip| ip.to_string())
.unwrap_or_else(|| {
warn!(
"config reload: could not determine public IP for proxy links. \
Set [general.links] public_host in config."
);
"UNKNOWN".to_string()
})
}
/// Print TG proxy links for a single user — mirrors print_proxy_links() in main.rs.
fn print_user_links(user: &str, secret: &str, host: &str, port: u16, cfg: &ProxyConfig) {
info!(target: "telemt::links", "--- New user: {} ---", user);
if cfg.general.modes.classic {
info!(
target: "telemt::links",
" Classic: tg://proxy?server={}&port={}&secret={}",
host, port, secret
);
}
if cfg.general.modes.secure {
info!(
target: "telemt::links",
" DD: tg://proxy?server={}&port={}&secret=dd{}",
host, port, secret
);
}
if cfg.general.modes.tls {
let mut domains = vec![cfg.censorship.tls_domain.clone()];
for d in &cfg.censorship.tls_domains {
if !domains.contains(d) {
domains.push(d.clone());
}
}
for domain in &domains {
let domain_hex = hex::encode(domain.as_bytes());
info!(
target: "telemt::links",
" EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}",
host, port, secret, domain_hex
);
}
}
info!(target: "telemt::links", "--------------------");
}
/// Log all detected changes and emit TG links for new users.
fn log_changes(
old_hot: &HotFields,
new_hot: &HotFields,
new_cfg: &ProxyConfig,
log_tx: &watch::Sender<LogLevel>,
detected_ip_v4: Option<IpAddr>,
detected_ip_v6: Option<IpAddr>,
) {
if old_hot.log_level != new_hot.log_level {
info!(
"config reload: log_level: '{}' → '{}'",
old_hot.log_level, new_hot.log_level
);
log_tx.send(new_hot.log_level.clone()).ok();
}
if old_hot.ad_tag != new_hot.ad_tag {
info!(
"config reload: ad_tag: {} → {}",
old_hot.ad_tag.as_deref().unwrap_or("none"),
new_hot.ad_tag.as_deref().unwrap_or("none"),
);
}
if old_hot.middle_proxy_pool_size != new_hot.middle_proxy_pool_size {
info!(
"config reload: middle_proxy_pool_size: {} → {}",
old_hot.middle_proxy_pool_size, new_hot.middle_proxy_pool_size,
);
}
if old_hot.me_keepalive_enabled != new_hot.me_keepalive_enabled
|| old_hot.me_keepalive_interval_secs != new_hot.me_keepalive_interval_secs
|| old_hot.me_keepalive_jitter_secs != new_hot.me_keepalive_jitter_secs
|| old_hot.me_keepalive_payload_random != new_hot.me_keepalive_payload_random
{
info!(
"config reload: me_keepalive: enabled={} interval={}s jitter={}s random_payload={}",
new_hot.me_keepalive_enabled,
new_hot.me_keepalive_interval_secs,
new_hot.me_keepalive_jitter_secs,
new_hot.me_keepalive_payload_random,
);
}
if old_hot.access.users != new_hot.access.users {
let mut added: Vec<&String> = new_hot.access.users.keys()
.filter(|u| !old_hot.access.users.contains_key(*u))
.collect();
added.sort();
let mut removed: Vec<&String> = old_hot.access.users.keys()
.filter(|u| !new_hot.access.users.contains_key(*u))
.collect();
removed.sort();
let mut changed: Vec<&String> = new_hot.access.users.keys()
.filter(|u| {
old_hot.access.users.get(*u)
.map(|s| s != &new_hot.access.users[*u])
.unwrap_or(false)
})
.collect();
changed.sort();
if !added.is_empty() {
info!(
"config reload: users added: [{}]",
added.iter().map(|s| s.as_str()).collect::<Vec<_>>().join(", ")
);
let host = resolve_link_host(new_cfg, detected_ip_v4, detected_ip_v6);
let port = new_cfg.general.links.public_port.unwrap_or(new_cfg.server.port);
for user in &added {
if let Some(secret) = new_hot.access.users.get(*user) {
print_user_links(user, secret, &host, port, new_cfg);
}
}
}
if !removed.is_empty() {
info!(
"config reload: users removed: [{}]",
removed.iter().map(|s| s.as_str()).collect::<Vec<_>>().join(", ")
);
}
if !changed.is_empty() {
info!(
"config reload: users secret changed: [{}]",
changed.iter().map(|s| s.as_str()).collect::<Vec<_>>().join(", ")
);
}
}
if old_hot.access.user_max_tcp_conns != new_hot.access.user_max_tcp_conns {
info!(
"config reload: user_max_tcp_conns updated ({} entries)",
new_hot.access.user_max_tcp_conns.len()
);
}
if old_hot.access.user_expirations != new_hot.access.user_expirations {
info!(
"config reload: user_expirations updated ({} entries)",
new_hot.access.user_expirations.len()
);
}
if old_hot.access.user_data_quota != new_hot.access.user_data_quota {
info!(
"config reload: user_data_quota updated ({} entries)",
new_hot.access.user_data_quota.len()
);
}
if old_hot.access.user_max_unique_ips != new_hot.access.user_max_unique_ips {
info!(
"config reload: user_max_unique_ips updated ({} entries)",
new_hot.access.user_max_unique_ips.len()
);
}
}
/// Load config, validate, diff against current, and broadcast if changed.
fn reload_config(
config_path: &PathBuf,
config_tx: &watch::Sender<Arc<ProxyConfig>>,
log_tx: &watch::Sender<LogLevel>,
detected_ip_v4: Option<IpAddr>,
detected_ip_v6: Option<IpAddr>,
) {
let new_cfg = match ProxyConfig::load(config_path) {
Ok(c) => c,
Err(e) => {
error!("config reload: failed to parse {:?}: {}", config_path, e);
return;
}
};
if let Err(e) = new_cfg.validate() {
error!("config reload: validation failed: {}; keeping old config", e);
return;
}
let old_cfg = config_tx.borrow().clone();
let old_hot = HotFields::from_config(&old_cfg);
let new_hot = HotFields::from_config(&new_cfg);
if old_hot == new_hot {
return;
}
warn_non_hot_changes(&old_cfg, &new_cfg);
log_changes(&old_hot, &new_hot, &new_cfg, log_tx, detected_ip_v4, detected_ip_v6);
config_tx.send(Arc::new(new_cfg)).ok();
}
// ── Public API ────────────────────────────────────────────────────────────────
/// Spawn the hot-reload watcher task.
///
/// Uses `notify` (inotify on Linux) to detect file changes instantly.
/// SIGHUP is also handled on Unix as an additional manual trigger.
///
/// `detected_ip_v4` / `detected_ip_v6` are the IPs discovered during the
/// startup probe — used when generating proxy links for newly added users,
/// matching the same logic as the startup output.
pub fn spawn_config_watcher(
config_path: PathBuf,
initial: Arc<ProxyConfig>,
detected_ip_v4: Option<IpAddr>,
detected_ip_v6: Option<IpAddr>,
) -> (watch::Receiver<Arc<ProxyConfig>>, watch::Receiver<LogLevel>) {
let initial_level = initial.general.log_level.clone();
let (config_tx, config_rx) = watch::channel(initial);
let (log_tx, log_rx) = watch::channel(initial_level);
// Bridge: sync notify callback → async task via mpsc.
let (notify_tx, mut notify_rx) = mpsc::channel::<()>(4);
// Canonicalize the config path so it matches what notify returns in events
// (notify always gives absolute paths, but config_path may be relative).
let config_path = match config_path.canonicalize() {
Ok(p) => p,
Err(_) => config_path.to_path_buf(), // file doesn't exist yet, use as-is
};
// Watch the parent directory rather than the file itself, because many
// editors (vim, nano, systemd-sysusers) write via rename, which would
// cause inotify to lose track of the original inode.
let watch_dir = config_path
.parent()
.unwrap_or_else(|| std::path::Path::new("."))
.to_path_buf();
let config_file = config_path.clone();
let tx_clone = notify_tx.clone();
let watcher_result = recommended_watcher(move |res: notify::Result<notify::Event>| {
let Ok(event) = res else { return };
let is_our_file = event.paths.iter().any(|p| p == &config_file);
if !is_our_file {
return;
}
let relevant = matches!(
event.kind,
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
);
if relevant {
let _ = tx_clone.try_send(());
}
});
match watcher_result {
Ok(mut watcher) => {
match watcher.watch(&watch_dir, RecursiveMode::NonRecursive) {
Ok(()) => info!("config watcher: watching {:?} via inotify", config_path),
Err(e) => warn!(
"config watcher: failed to watch {:?}: {}; use SIGHUP to reload",
watch_dir, e
),
}
tokio::spawn(async move {
let _watcher = watcher; // keep alive
#[cfg(unix)]
let mut sighup = {
use tokio::signal::unix::{SignalKind, signal};
signal(SignalKind::hangup()).expect("Failed to register SIGHUP handler")
};
loop {
#[cfg(unix)]
tokio::select! {
msg = notify_rx.recv() => {
if msg.is_none() { break; }
}
_ = sighup.recv() => {
info!("SIGHUP received — reloading {:?}", config_path);
}
}
#[cfg(not(unix))]
if notify_rx.recv().await.is_none() { break; }
// Debounce: drain extra events fired within 50ms.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
while notify_rx.try_recv().is_ok() {}
reload_config(
&config_path,
&config_tx,
&log_tx,
detected_ip_v4,
detected_ip_v6,
);
}
});
}
Err(e) => {
warn!(
"config watcher: inotify unavailable ({}); only SIGHUP will trigger reload",
e
);
// Fall back to SIGHUP-only.
tokio::spawn(async move {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let mut sighup = signal(SignalKind::hangup())
.expect("Failed to register SIGHUP handler");
loop {
sighup.recv().await;
info!("SIGHUP received — reloading {:?}", config_path);
reload_config(
&config_path,
&config_tx,
&log_tx,
detected_ip_v4,
detected_ip_v6,
);
}
}
#[cfg(not(unix))]
let _ = (config_tx, log_tx, config_path);
});
}
}
(config_rx, log_rx)
}

View File

@@ -194,6 +194,10 @@ impl ProxyConfig {
validate_network_cfg(&mut config.network)?;
if config.general.use_middle_proxy && config.network.ipv6 == Some(true) {
warn!("IPv6 with Middle Proxy is experimental and may cause KDF address mismatch; consider disabling IPv6 or ME");
}
// Random fake_cert_len only when default is in use.
if !config.censorship.tls_emulation && config.censorship.fake_cert_len == default_fake_cert_len() {
config.censorship.fake_cert_len = rand::rng().gen_range(1024..4096);
@@ -222,6 +226,7 @@ impl ProxyConfig {
ip: ipv4,
announce: None,
announce_ip: None,
proxy_protocol: None,
});
}
if let Some(ipv6_str) = &config.server.listen_addr_ipv6 {
@@ -230,6 +235,7 @@ impl ProxyConfig {
ip: ipv6,
announce: None,
announce_ip: None,
proxy_protocol: None,
});
}
}

View File

@@ -3,6 +3,7 @@
pub(crate) mod defaults;
mod types;
mod load;
pub mod hot_reload;
pub use load::ProxyConfig;
pub use types::*;

View File

@@ -1,4 +1,5 @@
use chrono::{DateTime, Utc};
use ipnetwork::IpNetwork;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::IpAddr;
@@ -304,7 +305,7 @@ pub struct ServerConfig {
pub metrics_port: Option<u16>,
#[serde(default = "default_metrics_whitelist")]
pub metrics_whitelist: Vec<IpAddr>,
pub metrics_whitelist: Vec<IpNetwork>,
#[serde(default)]
pub listeners: Vec<ListenerConfig>,
@@ -412,7 +413,7 @@ impl Default for AntiCensorshipConfig {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AccessConfig {
#[serde(default)]
pub users: HashMap<String, String>,
@@ -513,6 +514,9 @@ pub struct ListenerConfig {
/// Migrated to `announce` automatically if `announce` is not set.
#[serde(default)]
pub announce_ip: Option<IpAddr>,
/// Per-listener PROXY protocol override. When set, overrides global server.proxy_protocol.
#[serde(default)]
pub proxy_protocol: Option<bool>,
}
// ============= ShowLink =============

View File

@@ -3,6 +3,7 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use rand::Rng;
use tokio::net::TcpListener;
use tokio::signal;
use tokio::sync::Semaphore;
@@ -27,6 +28,7 @@ mod tls_front;
mod util;
use crate::config::{LogLevel, ProxyConfig};
use crate::config::hot_reload::spawn_config_watcher;
use crate::crypto::SecureRandom;
use crate::ip_tracker::UserIpTracker;
use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe};
@@ -259,46 +261,6 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
info!("IP limits configured for {} users", config.access.user_max_unique_ips.len());
}
// TLS front cache (optional emulation)
let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len());
tls_domains.push(config.censorship.tls_domain.clone());
for d in &config.censorship.tls_domains {
if !tls_domains.contains(d) {
tls_domains.push(d.clone());
}
}
let tls_cache: Option<Arc<TlsFrontCache>> = if config.censorship.tls_emulation {
let cache = Arc::new(TlsFrontCache::new(
&tls_domains,
config.censorship.fake_cert_len,
&config.censorship.tls_front_dir,
));
let cache_clone = cache.clone();
let domains = tls_domains.clone();
let port = config.censorship.mask_port;
tokio::spawn(async move {
for domain in domains {
match crate::tls_front::fetcher::fetch_real_tls(
&domain,
port,
&domain,
Duration::from_secs(5),
)
.await
{
Ok(res) => cache_clone.update_from_fetch(&domain, res).await,
Err(e) => warn!(domain = %domain, error = %e, "TLS emulation fetch failed"),
}
}
});
Some(cache)
} else {
None
};
// Connection concurrency limit
let _max_connections = Arc::new(Semaphore::new(10_000));
@@ -477,6 +439,72 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone()));
let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096));
// TLS front cache (optional emulation)
let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len());
tls_domains.push(config.censorship.tls_domain.clone());
for d in &config.censorship.tls_domains {
if !tls_domains.contains(d) {
tls_domains.push(d.clone());
}
}
let tls_cache: Option<Arc<TlsFrontCache>> = if config.censorship.tls_emulation {
let cache = Arc::new(TlsFrontCache::new(
&tls_domains,
config.censorship.fake_cert_len,
&config.censorship.tls_front_dir,
));
cache.load_from_disk().await;
let port = config.censorship.mask_port;
// Initial synchronous fetch to warm cache before serving clients.
for domain in tls_domains.clone() {
match crate::tls_front::fetcher::fetch_real_tls(
&domain,
port,
&domain,
Duration::from_secs(5),
Some(upstream_manager.clone()),
)
.await
{
Ok(res) => cache.update_from_fetch(&domain, res).await,
Err(e) => warn!(domain = %domain, error = %e, "TLS emulation fetch failed"),
}
}
// Periodic refresh with jitter.
let cache_clone = cache.clone();
let domains = tls_domains.clone();
let upstream_for_task = upstream_manager.clone();
tokio::spawn(async move {
loop {
let base_secs = rand::rng().random_range(4 * 3600..=6 * 3600);
let jitter_secs = rand::rng().random_range(0..=7200);
tokio::time::sleep(Duration::from_secs(base_secs + jitter_secs)).await;
for domain in &domains {
match crate::tls_front::fetcher::fetch_real_tls(
domain,
port,
domain,
Duration::from_secs(5),
Some(upstream_for_task.clone()),
)
.await
{
Ok(res) => cache_clone.update_from_fetch(domain, res).await,
Err(e) => warn!(domain = %domain, error = %e, "TLS emulation refresh failed"),
}
}
}
});
Some(cache)
} else {
None
};
// Middle-End ping before DC connectivity
if let Some(ref pool) = me_pool {
let me_results = run_me_ping(pool, &rng).await;
@@ -656,6 +684,19 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
detected_ip_v4, detected_ip_v6
);
// ── Hot-reload watcher ────────────────────────────────────────────────
// Uses inotify to detect file changes instantly (SIGHUP also works).
// detected_ip_v4/v6 are passed so newly added users get correct TG links.
let (config_rx, mut log_level_rx): (
tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
tokio::sync::watch::Receiver<LogLevel>,
) = spawn_config_watcher(
std::path::PathBuf::from(&config_path),
config.clone(),
detected_ip_v4,
detected_ip_v6,
);
let mut listeners = Vec::new();
for listener_conf in &config.server.listeners {
@@ -677,6 +718,8 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
Ok(socket) => {
let listener = TcpListener::from_std(socket.into())?;
info!("Listening on {}", addr);
let listener_proxy_protocol =
listener_conf.proxy_protocol.unwrap_or(config.server.proxy_protocol);
// Resolve the public host for link generation
let public_host = if let Some(ref announce) = listener_conf.announce {
@@ -702,7 +745,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
print_proxy_links(&public_host, link_port, &config);
}
listeners.push(listener);
listeners.push((listener, listener_proxy_protocol));
}
Err(e) => {
error!("Failed to bind to {}: {}", addr, e);
@@ -760,7 +803,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
has_unix_listener = true;
let config = config.clone();
let mut config_rx_unix: tokio::sync::watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone();
@@ -779,7 +822,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let conn_id = unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let fake_peer = SocketAddr::from(([127, 0, 0, 1], (conn_id % 65535) as u16));
let config = config.clone();
let config = config_rx_unix.borrow_and_update().clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone();
@@ -788,12 +831,13 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
let proxy_protocol_enabled = config.server.proxy_protocol;
tokio::spawn(async move {
if let Err(e) = crate::proxy::client::handle_client_stream(
stream, fake_peer, config, stats,
upstream_manager, replay_checker, buffer_pool, rng,
me_pool, tls_cache, ip_tracker,
me_pool, tls_cache, ip_tracker, proxy_protocol_enabled,
).await {
debug!(error = %e, "Unix socket connection error");
}
@@ -825,6 +869,20 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
.reload(runtime_filter)
.expect("Failed to switch log filter");
// Apply log_level changes from hot-reload to the tracing filter.
tokio::spawn(async move {
loop {
if log_level_rx.changed().await.is_err() {
break;
}
let level = log_level_rx.borrow_and_update().clone();
let new_filter = tracing_subscriber::EnvFilter::new(level.to_filter_str());
if let Err(e) = filter_handle.reload(new_filter) {
tracing::error!("config reload: failed to update log filter: {}", e);
}
}
});
if let Some(port) = config.server.metrics_port {
let stats = stats.clone();
let whitelist = config.server.metrics_whitelist.clone();
@@ -833,8 +891,8 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
});
}
for listener in listeners {
let config = config.clone();
for (listener, listener_proxy_protocol) in listeners {
let mut config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone();
@@ -848,7 +906,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
loop {
match listener.accept().await {
Ok((stream, peer_addr)) => {
let config = config.clone();
let config = config_rx.borrow_and_update().clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone();
@@ -857,6 +915,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
let proxy_protocol_enabled = listener_proxy_protocol;
tokio::spawn(async move {
if let Err(e) = ClientHandler::new(
@@ -871,6 +930,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
me_pool,
tls_cache,
ip_tracker,
proxy_protocol_enabled,
)
.run()
.await

View File

@@ -1,18 +1,19 @@
use std::convert::Infallible;
use std::net::{IpAddr, SocketAddr};
use std::net::SocketAddr;
use std::sync::Arc;
use http_body_util::Full;
use http_body_util::{Full, BodyExt};
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response, StatusCode};
use ipnetwork::IpNetwork;
use tokio::net::TcpListener;
use tracing::{info, warn, debug};
use crate::stats::Stats;
pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpAddr>) {
pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) {
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = match TcpListener::bind(addr).await {
Ok(l) => l,
@@ -32,7 +33,7 @@ pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpAddr>) {
}
};
if !whitelist.is_empty() && !whitelist.contains(&peer.ip()) {
if !whitelist.is_empty() && !whitelist.iter().any(|net| net.contains(peer.ip())) {
debug!(peer = %peer, "Metrics request denied by whitelist");
continue;
}
@@ -53,7 +54,7 @@ pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpAddr>) {
}
}
fn handle(req: Request<hyper::body::Incoming>, stats: &Stats) -> Result<Response<Full<Bytes>>, Infallible> {
fn handle<B>(req: Request<B>, stats: &Stats) -> Result<Response<Full<Bytes>>, Infallible> {
if req.uri().path() != "/metrics" {
let resp = Response::builder()
.status(StatusCode::NOT_FOUND)
@@ -193,21 +194,20 @@ mod tests {
stats.increment_connects_all();
stats.increment_connects_all();
let port = 19091u16;
let s = stats.clone();
tokio::spawn(async move {
serve(port, s, vec![]).await;
});
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let req = Request::builder()
.uri("/metrics")
.body(())
.unwrap();
let resp = handle(req, &stats).unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes();
assert!(std::str::from_utf8(body.as_ref()).unwrap().contains("telemt_connections_total 3"));
let resp = reqwest::get(format!("http://127.0.0.1:{}/metrics", port))
.await.unwrap();
assert_eq!(resp.status(), 200);
let body = resp.text().await.unwrap();
assert!(body.contains("telemt_connections_total 3"));
let resp404 = reqwest::get(format!("http://127.0.0.1:{}/other", port))
.await.unwrap();
assert_eq!(resp404.status(), 404);
let req404 = Request::builder()
.uri("/other")
.body(())
.unwrap();
let resp404 = handle(req404, &stats).unwrap();
assert_eq!(resp404.status(), StatusCode::NOT_FOUND);
}
}

View File

@@ -351,6 +351,9 @@ pub fn build_server_hello(
fake_cert_len: usize,
rng: &SecureRandom,
) -> Vec<u8> {
const MIN_APP_DATA: usize = 64;
const MAX_APP_DATA: usize = 16640; // RFC 8446 §5.2 upper bound
let fake_cert_len = fake_cert_len.max(MIN_APP_DATA).min(MAX_APP_DATA);
let x25519_key = gen_fake_x25519_key(rng);
// Build ServerHello
@@ -373,7 +376,13 @@ pub fn build_server_hello(
app_data_record.push(TLS_RECORD_APPLICATION);
app_data_record.extend_from_slice(&TLS_VERSION);
app_data_record.extend_from_slice(&(fake_cert_len as u16).to_be_bytes());
app_data_record.extend_from_slice(&fake_cert);
if fake_cert_len > 17 {
app_data_record.extend_from_slice(&fake_cert[..fake_cert_len - 17]);
app_data_record.push(0x16); // inner content type marker
app_data_record.extend_from_slice(&rng.bytes(16)); // AEAD-like tag mimic
} else {
app_data_record.extend_from_slice(&fake_cert);
}
// Combine all records
let mut response = Vec::with_capacity(
@@ -475,6 +484,85 @@ pub fn extract_sni_from_client_hello(handshake: &[u8]) -> Option<String> {
None
}
/// Extract ALPN protocol list from TLS ClientHello.
pub fn extract_alpn_from_client_hello(handshake: &[u8]) -> Option<Vec<String>> {
if handshake.len() < 43 || handshake[0] != TLS_RECORD_HANDSHAKE {
return None;
}
let mut pos = 5; // after record header
if handshake.get(pos).copied()? != 0x01 {
return None; // not ClientHello
}
// Handshake length bytes
pos += 4; // type + len (3)
// version (2) + random (32)
pos += 2 + 32;
if pos + 1 > handshake.len() {
return None;
}
let session_id_len = *handshake.get(pos)? as usize;
pos += 1 + session_id_len;
if pos + 2 > handshake.len() {
return None;
}
let cipher_suites_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize;
pos += 2 + cipher_suites_len;
if pos + 1 > handshake.len() {
return None;
}
let comp_len = *handshake.get(pos)? as usize;
pos += 1 + comp_len;
if pos + 2 > handshake.len() {
return None;
}
let ext_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize;
pos += 2;
let ext_end = pos + ext_len;
if ext_end > handshake.len() {
return None;
}
while pos + 4 <= ext_end {
let etype = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]);
let elen = u16::from_be_bytes([handshake[pos + 2], handshake[pos + 3]]) as usize;
pos += 4;
if pos + elen > ext_end {
break;
}
if etype == 0x0010 && elen >= 3 {
// ALPN
let list_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize;
let mut alpn_pos = pos + 2;
let list_end = std::cmp::min(alpn_pos + list_len, pos + elen);
let mut protocols = Vec::new();
while alpn_pos < list_end {
let proto_len = *handshake.get(alpn_pos)? as usize;
alpn_pos += 1;
if alpn_pos + proto_len > list_end {
break;
}
if let Ok(p) = std::str::from_utf8(&handshake[alpn_pos..alpn_pos + proto_len]) {
protocols.push(p.to_string());
}
alpn_pos += proto_len;
}
return Some(protocols);
}
pos += elen;
}
None
}
/// Check if bytes look like a TLS ClientHello
pub fn is_tls_handshake(first_bytes: &[u8]) -> bool {
if first_bytes.len() < 3 {
@@ -746,4 +834,93 @@ mod tests {
// Should return None (no match) but not panic
assert!(result.is_none());
}
fn build_client_hello_with_exts(exts: Vec<(u16, Vec<u8>)>, host: &str) -> Vec<u8> {
let mut body = Vec::new();
body.extend_from_slice(&TLS_VERSION); // legacy version
body.extend_from_slice(&[0u8; 32]); // random
body.push(0); // session id len
body.extend_from_slice(&2u16.to_be_bytes()); // cipher suites len
body.extend_from_slice(&[0x13, 0x01]); // TLS_AES_128_GCM_SHA256
body.push(1); // compression len
body.push(0); // null compression
// Build SNI extension
let host_bytes = host.as_bytes();
let mut sni_ext = Vec::new();
sni_ext.extend_from_slice(&(host_bytes.len() as u16 + 3).to_be_bytes());
sni_ext.push(0);
sni_ext.extend_from_slice(&(host_bytes.len() as u16).to_be_bytes());
sni_ext.extend_from_slice(host_bytes);
let mut ext_blob = Vec::new();
for (typ, data) in exts {
ext_blob.extend_from_slice(&typ.to_be_bytes());
ext_blob.extend_from_slice(&(data.len() as u16).to_be_bytes());
ext_blob.extend_from_slice(&data);
}
// SNI last
ext_blob.extend_from_slice(&0x0000u16.to_be_bytes());
ext_blob.extend_from_slice(&(sni_ext.len() as u16).to_be_bytes());
ext_blob.extend_from_slice(&sni_ext);
body.extend_from_slice(&(ext_blob.len() as u16).to_be_bytes());
body.extend_from_slice(&ext_blob);
let mut handshake = Vec::new();
handshake.push(0x01); // ClientHello
let len_bytes = (body.len() as u32).to_be_bytes();
handshake.extend_from_slice(&len_bytes[1..4]);
handshake.extend_from_slice(&body);
let mut record = Vec::new();
record.push(TLS_RECORD_HANDSHAKE);
record.extend_from_slice(&[0x03, 0x01]);
record.extend_from_slice(&(handshake.len() as u16).to_be_bytes());
record.extend_from_slice(&handshake);
record
}
#[test]
fn test_extract_sni_with_grease_extension() {
// GREASE type 0x0a0a with zero length before SNI
let ch = build_client_hello_with_exts(vec![(0x0a0a, Vec::new())], "example.com");
let sni = extract_sni_from_client_hello(&ch);
assert_eq!(sni.as_deref(), Some("example.com"));
}
#[test]
fn test_extract_sni_tolerates_empty_unknown_extension() {
let ch = build_client_hello_with_exts(vec![(0x1234, Vec::new())], "test.local");
let sni = extract_sni_from_client_hello(&ch);
assert_eq!(sni.as_deref(), Some("test.local"));
}
#[test]
fn test_extract_alpn_single() {
let mut alpn_data = Vec::new();
// list length = 3 (1 length byte + "h2")
alpn_data.extend_from_slice(&3u16.to_be_bytes());
alpn_data.push(2);
alpn_data.extend_from_slice(b"h2");
let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test");
let alpn = extract_alpn_from_client_hello(&ch).unwrap();
assert_eq!(alpn, vec!["h2"]);
}
#[test]
fn test_extract_alpn_multiple() {
let mut alpn_data = Vec::new();
// list length = 11 (sum of per-proto lengths including length bytes)
alpn_data.extend_from_slice(&11u16.to_be_bytes());
alpn_data.push(2);
alpn_data.extend_from_slice(b"h2");
alpn_data.push(4);
alpn_data.extend_from_slice(b"spdy");
alpn_data.push(2);
alpn_data.extend_from_slice(b"h3");
let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test");
let alpn = extract_alpn_from_client_hello(&ch).unwrap();
assert_eq!(alpn, vec!["h2", "spdy", "h3"]);
}
}

View File

@@ -31,6 +31,7 @@ use crate::stats::{ReplayChecker, Stats};
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
use crate::transport::middle_proxy::MePool;
use crate::transport::{UpstreamManager, configure_client_socket, parse_proxy_protocol};
use crate::transport::socket::normalize_ip;
use crate::tls_front::TlsFrontCache;
use crate::proxy::direct_relay::handle_via_direct;
@@ -50,14 +51,15 @@ pub async fn handle_client_stream<S>(
me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>,
proxy_protocol_enabled: bool,
) -> Result<()>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
stats.increment_connects_all();
let mut real_peer = peer;
let mut real_peer = normalize_ip(peer);
if config.server.proxy_protocol {
if proxy_protocol_enabled {
match parse_proxy_protocol(&mut stream, peer).await {
Ok(info) => {
debug!(
@@ -66,7 +68,7 @@ where
version = info.version,
"PROXY protocol header parsed"
);
real_peer = info.src_addr;
real_peer = normalize_ip(info.src_addr);
}
Err(e) => {
stats.increment_connects_bad();
@@ -228,6 +230,7 @@ pub struct RunningClientHandler {
me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>,
proxy_protocol_enabled: bool,
}
impl ClientHandler {
@@ -243,6 +246,7 @@ impl ClientHandler {
me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>,
proxy_protocol_enabled: bool,
) -> RunningClientHandler {
RunningClientHandler {
stream,
@@ -256,6 +260,7 @@ impl ClientHandler {
me_pool,
tls_cache,
ip_tracker,
proxy_protocol_enabled,
}
}
}
@@ -264,6 +269,7 @@ impl RunningClientHandler {
pub async fn run(mut self) -> Result<()> {
self.stats.increment_connects_all();
self.peer = normalize_ip(self.peer);
let peer = self.peer;
let ip_tracker = self.ip_tracker.clone();
debug!(peer = %peer, "New connection");
@@ -301,7 +307,7 @@ impl RunningClientHandler {
}
async fn do_handshake(mut self) -> Result<HandshakeOutcome> {
if self.config.server.proxy_protocol {
if self.proxy_protocol_enabled {
match parse_proxy_protocol(&mut self.stream, self.peer).await {
Ok(info) => {
debug!(
@@ -310,7 +316,7 @@ impl RunningClientHandler {
version = info.version,
"PROXY protocol header parsed"
);
self.peer = info.src_addr;
self.peer = normalize_ip(info.src_addr);
}
Err(e) => {
self.stats.increment_connects_bad();

View File

@@ -1,7 +1,7 @@
//! Masking - forward unrecognized traffic to mask host
use std::time::Duration;
use std::str;
use std::time::Duration;
use tokio::net::TcpStream;
#[cfg(unix)]
use tokio::net::UnixStream;
@@ -11,9 +11,9 @@ use tracing::debug;
use crate::config::ProxyConfig;
const MASK_TIMEOUT: Duration = Duration::from_secs(5);
/// Maximum duration for the entire masking relay.
/// Limits resource consumption from slow-loris attacks and port scanners.
const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60);
/// Maximum duration for the entire masking relay.
/// Limits resource consumption from slow-loris attacks and port scanners.
const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60);
const MASK_BUFFER_SIZE: usize = 8192;
/// Detect client type based on initial data
@@ -78,7 +78,9 @@ where
match connect_result {
Ok(Ok(stream)) => {
let (mask_read, mask_write) = stream.into_split();
relay_to_mask(reader, writer, mask_read, mask_write, initial_data).await;
if timeout(MASK_RELAY_TIMEOUT, relay_to_mask(reader, writer, mask_read, mask_write, initial_data)).await.is_err() {
debug!("Mask relay timed out (unix socket)");
}
}
Ok(Err(e)) => {
debug!(error = %e, "Failed to connect to mask unix socket");
@@ -110,7 +112,9 @@ where
match connect_result {
Ok(Ok(stream)) => {
let (mask_read, mask_write) = stream.into_split();
relay_to_mask(reader, writer, mask_read, mask_write, initial_data).await;
if timeout(MASK_RELAY_TIMEOUT, relay_to_mask(reader, writer, mask_read, mask_write, initial_data)).await.is_err() {
debug!("Mask relay timed out");
}
}
Ok(Err(e)) => {
debug!(error = %e, "Failed to connect to mask host");

View File

@@ -59,6 +59,45 @@ impl TlsFrontCache {
guard.insert(domain.to_string(), Arc::new(data));
}
pub async fn load_from_disk(&self) {
let path = self.disk_path.clone();
if tokio::fs::create_dir_all(&path).await.is_err() {
return;
}
let mut loaded = 0usize;
if let Ok(mut dir) = tokio::fs::read_dir(&path).await {
while let Ok(Some(entry)) = dir.next_entry().await {
if let Ok(name) = entry.file_name().into_string() {
if !name.ends_with(".json") {
continue;
}
if let Ok(data) = tokio::fs::read(entry.path()).await {
if let Ok(cached) = serde_json::from_slice::<CachedTlsData>(&data) {
let domain = cached.domain.clone();
self.set(&domain, cached).await;
loaded += 1;
}
}
}
}
}
if loaded > 0 {
info!(count = loaded, "Loaded TLS cache entries from disk");
}
}
async fn persist(&self, domain: &str, data: &CachedTlsData) {
if tokio::fs::create_dir_all(&self.disk_path).await.is_err() {
return;
}
let fname = format!("{}.json", domain.replace(['/', '\\'], "_"));
let path = self.disk_path.join(fname);
if let Ok(json) = serde_json::to_vec_pretty(data) {
// best-effort write
let _ = tokio::fs::write(path, json).await;
}
}
/// Spawn background updater that periodically refreshes cached domains using provided fetcher.
pub fn spawn_updater<F>(
self: Arc<Self>,
@@ -82,14 +121,15 @@ impl TlsFrontCache {
pub async fn update_from_fetch(&self, domain: &str, fetched: TlsFetchResult) {
let data = CachedTlsData {
server_hello_template: fetched.server_hello_parsed,
cert_info: None,
cert_info: fetched.cert_info,
app_data_records_sizes: fetched.app_data_records_sizes.clone(),
total_app_data_len: fetched.total_app_data_len,
fetched_at: SystemTime::now(),
domain: domain.to_string(),
};
self.set(domain, data).await;
self.set(domain, data.clone()).await;
self.persist(domain, &data).await;
debug!(domain = %domain, len = fetched.total_app_data_len, "TLS cache updated");
}

View File

@@ -5,6 +5,28 @@ use crate::protocol::constants::{
use crate::protocol::tls::{TLS_DIGEST_LEN, TLS_DIGEST_POS, gen_fake_x25519_key};
use crate::tls_front::types::CachedTlsData;
const MIN_APP_DATA: usize = 64;
const MAX_APP_DATA: usize = 16640; // RFC 8446 §5.2 allows up to 2^14 + 256
fn jitter_and_clamp_sizes(sizes: &[usize], rng: &SecureRandom) -> Vec<usize> {
sizes
.iter()
.map(|&size| {
let base = size.max(MIN_APP_DATA).min(MAX_APP_DATA);
let jitter_range = ((base as f64) * 0.03).round() as i64;
if jitter_range == 0 {
return base;
}
let mut rand_bytes = [0u8; 2];
rand_bytes.copy_from_slice(&rng.bytes(2));
let span = 2 * jitter_range + 1;
let delta = (u16::from_le_bytes(rand_bytes) as i64 % span) - jitter_range;
let adjusted = (base as i64 + delta).clamp(MIN_APP_DATA as i64, MAX_APP_DATA as i64);
adjusted as usize
})
.collect()
}
/// Build a ServerHello + CCS + ApplicationData sequence using cached TLS metadata.
pub fn build_emulated_server_hello(
secret: &[u8],
@@ -76,6 +98,7 @@ pub fn build_emulated_server_hello(
if sizes.is_empty() {
sizes.push(cached.total_app_data_len.max(1024));
}
let sizes = jitter_and_clamp_sizes(&sizes, rng);
let mut app_data = Vec::new();
for size in sizes {
@@ -83,7 +106,14 @@ pub fn build_emulated_server_hello(
rec.push(TLS_RECORD_APPLICATION);
rec.extend_from_slice(&TLS_VERSION);
rec.extend_from_slice(&(size as u16).to_be_bytes());
rec.extend_from_slice(&rng.bytes(size));
if size > 17 {
let body_len = size - 17;
rec.extend_from_slice(&rng.bytes(body_len));
rec.push(0x16); // inner content type marker (handshake)
rec.extend_from_slice(&rng.bytes(16)); // AEAD-like tag
} else {
rec.extend_from_slice(&rng.bytes(size));
}
app_data.extend_from_slice(&rec);
}

View File

@@ -14,9 +14,12 @@ use rustls::client::ClientConfig;
use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
use rustls::{DigitallySignedStruct, Error as RustlsError};
use x509_parser::prelude::FromDer;
use x509_parser::certificate::X509Certificate;
use crate::crypto::SecureRandom;
use crate::protocol::constants::{TLS_RECORD_APPLICATION, TLS_RECORD_HANDSHAKE, TLS_VERSION};
use crate::tls_front::types::{ParsedServerHello, TlsExtension, TlsFetchResult};
use crate::protocol::constants::{TLS_RECORD_APPLICATION, TLS_RECORD_HANDSHAKE};
use crate::tls_front::types::{ParsedServerHello, TlsExtension, TlsFetchResult, ParsedCertificateInfo};
/// No-op verifier: accept any certificate (we only need lengths and metadata).
#[derive(Debug)]
@@ -163,12 +166,15 @@ fn build_client_hello(sni: &str, rng: &SecureRandom) -> Vec<u8> {
exts.extend_from_slice(alpn_proto);
// padding to reduce recognizability and keep length ~500 bytes
if exts.len() < 180 {
let pad_len = 180 - exts.len();
exts.extend_from_slice(&0x0015u16.to_be_bytes()); // padding extension
exts.extend_from_slice(&(pad_len as u16 + 2).to_be_bytes());
exts.extend_from_slice(&(pad_len as u16).to_be_bytes());
exts.resize(exts.len() + pad_len, 0);
const TARGET_EXT_LEN: usize = 180;
if exts.len() < TARGET_EXT_LEN {
let remaining = TARGET_EXT_LEN - exts.len();
if remaining > 4 {
let pad_len = remaining - 4; // minus type+len
exts.extend_from_slice(&0x0015u16.to_be_bytes()); // padding extension
exts.extend_from_slice(&(pad_len as u16).to_be_bytes());
exts.resize(exts.len() + pad_len, 0);
}
}
// Extensions length prefix
@@ -263,6 +269,52 @@ fn parse_server_hello(body: &[u8]) -> Option<ParsedServerHello> {
})
}
fn parse_cert_info(certs: &[CertificateDer<'static>]) -> Option<ParsedCertificateInfo> {
let first = certs.first()?;
let (_rem, cert) = X509Certificate::from_der(first.as_ref()).ok()?;
let not_before = Some(cert.validity().not_before.to_datetime().unix_timestamp());
let not_after = Some(cert.validity().not_after.to_datetime().unix_timestamp());
let issuer_cn = cert
.issuer()
.iter_common_name()
.next()
.and_then(|cn| cn.as_str().ok())
.map(|s| s.to_string());
let subject_cn = cert
.subject()
.iter_common_name()
.next()
.and_then(|cn| cn.as_str().ok())
.map(|s| s.to_string());
let san_names = cert
.subject_alternative_name()
.ok()
.flatten()
.map(|san| {
san.value
.general_names
.iter()
.filter_map(|gn| match gn {
x509_parser::extensions::GeneralName::DNSName(n) => Some(n.to_string()),
_ => None,
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
Some(ParsedCertificateInfo {
not_after_unix: not_after,
not_before_unix: not_before,
issuer_cn,
subject_cn,
san_names,
})
}
async fn fetch_via_raw_tls(
host: &str,
port: u16,
@@ -315,6 +367,7 @@ async fn fetch_via_raw_tls(
app_sizes
},
total_app_data_len,
cert_info: None,
})
}
@@ -324,6 +377,7 @@ pub async fn fetch_real_tls(
port: u16,
sni: &str,
connect_timeout: Duration,
upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>,
) -> Result<TlsFetchResult> {
// Preferred path: raw TLS probe for accurate record sizing
match fetch_via_raw_tls(host, port, sni, connect_timeout).await {
@@ -334,8 +388,26 @@ pub async fn fetch_real_tls(
}
// Fallback: rustls handshake to at least get certificate sizes
let addr = format!("{host}:{port}");
let stream = timeout(connect_timeout, TcpStream::connect(addr)).await??;
let stream = if let Some(manager) = upstream {
// Resolve host to SocketAddr
if let Ok(mut addrs) = tokio::net::lookup_host((host, port)).await {
if let Some(addr) = addrs.find(|a| a.is_ipv4()) {
match manager.connect(addr, None, None).await {
Ok(s) => s,
Err(e) => {
warn!(sni = %sni, error = %e, "Upstream connect failed, using direct connect");
timeout(connect_timeout, TcpStream::connect((host, port))).await??
}
}
} else {
timeout(connect_timeout, TcpStream::connect((host, port))).await??
}
} else {
timeout(connect_timeout, TcpStream::connect((host, port))).await??
}
} else {
timeout(connect_timeout, TcpStream::connect((host, port))).await??
};
let config = build_client_config();
let connector = TlsConnector::from(config);
@@ -359,6 +431,7 @@ pub async fn fetch_real_tls(
.unwrap_or_default();
let total_cert_len: usize = certs.iter().map(|c| c.len()).sum::<usize>().max(1024);
let cert_info = parse_cert_info(&certs);
// Heuristic: split across two records if large to mimic real servers a bit.
let app_data_records_sizes = if total_cert_len > 3000 {
@@ -387,5 +460,6 @@ pub async fn fetch_real_tls(
server_hello_parsed: parsed,
app_data_records_sizes: app_data_records_sizes.clone(),
total_app_data_len: app_data_records_sizes.iter().sum(),
cert_info,
})
}

View File

@@ -1,7 +1,8 @@
use std::time::SystemTime;
use serde::{Serialize, Deserialize};
/// Parsed representation of an unencrypted TLS ServerHello.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParsedServerHello {
pub version: [u8; 2],
pub random: [u8; 32],
@@ -12,14 +13,14 @@ pub struct ParsedServerHello {
}
/// Generic TLS extension container.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TlsExtension {
pub ext_type: u16,
pub data: Vec<u8>,
}
/// Basic certificate metadata (optional, informative).
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParsedCertificateInfo {
pub not_after_unix: Option<i64>,
pub not_before_unix: Option<i64>,
@@ -29,20 +30,26 @@ pub struct ParsedCertificateInfo {
}
/// Cached data per SNI used by the emulator.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CachedTlsData {
pub server_hello_template: ParsedServerHello,
pub cert_info: Option<ParsedCertificateInfo>,
pub app_data_records_sizes: Vec<usize>,
pub total_app_data_len: usize,
#[serde(default = "now_system_time", skip_serializing, skip_deserializing)]
pub fetched_at: SystemTime,
pub domain: String,
}
fn now_system_time() -> SystemTime {
SystemTime::now()
}
/// Result of attempting to fetch real TLS artifacts.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TlsFetchResult {
pub server_hello_parsed: ParsedServerHello,
pub app_data_records_sizes: Vec<usize>,
pub total_app_data_len: usize,
pub cert_info: Option<ParsedCertificateInfo>,
}

View File

@@ -283,6 +283,58 @@ impl Default for ProxyProtocolV1Builder {
}
}
/// Builder for PROXY protocol v2 header
pub struct ProxyProtocolV2Builder {
src: Option<SocketAddr>,
dst: Option<SocketAddr>,
}
impl ProxyProtocolV2Builder {
pub fn new() -> Self {
Self { src: None, dst: None }
}
pub fn with_addrs(mut self, src: SocketAddr, dst: SocketAddr) -> Self {
self.src = Some(src);
self.dst = Some(dst);
self
}
pub fn build(&self) -> Vec<u8> {
let mut header = Vec::new();
header.extend_from_slice(PROXY_V2_SIGNATURE);
// version 2, PROXY command
header.push(0x21);
match (self.src, self.dst) {
(Some(SocketAddr::V4(src)), Some(SocketAddr::V4(dst))) => {
header.push(0x11); // INET + STREAM
header.extend_from_slice(&(12u16).to_be_bytes());
header.extend_from_slice(&src.ip().octets());
header.extend_from_slice(&dst.ip().octets());
header.extend_from_slice(&src.port().to_be_bytes());
header.extend_from_slice(&dst.port().to_be_bytes());
}
(Some(SocketAddr::V6(src)), Some(SocketAddr::V6(dst))) => {
header.push(0x21); // INET6 + STREAM
header.extend_from_slice(&(36u16).to_be_bytes());
header.extend_from_slice(&src.ip().octets());
header.extend_from_slice(&dst.ip().octets());
header.extend_from_slice(&src.port().to_be_bytes());
header.extend_from_slice(&dst.port().to_be_bytes());
}
_ => {
// LOCAL/UNSPEC: no address information
header[12] = 0x20; // version 2, LOCAL command
header.push(0x00);
header.extend_from_slice(&0u16.to_be_bytes());
}
}
header
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -378,4 +430,4 @@ mod tests {
let header = ProxyProtocolV1Builder::new().build();
assert_eq!(header, b"PROXY UNKNOWN\r\n");
}
}
}