Compare commits

..

54 Commits
3.0.5 ... 3.0.8

Author SHA1 Message Date
Alexey
d552ae84d0 Merge pull request #200 from telemt/flow
ME Connection lost fixes
2026-02-21 16:31:49 +03:00
Alexey
3ab56f55e9 ME Connection error handling 2026-02-21 16:28:47 +03:00
Alexey
06d2cdef78 ME Connection lost fixes 2026-02-21 16:12:19 +03:00
Alexey
1be4422431 Merge pull request #199 from telemt/axkurcom-patch-1
Update Cargo.toml
2026-02-21 14:11:34 +03:00
Alexey
3d3428ad4d Update Cargo.toml 2026-02-21 14:11:12 +03:00
Alexey
eaff96b8c1 Merge pull request #198 from telemt/flow
Peer - Connection closed fixes
2026-02-21 14:09:05 +03:00
Alexey
7bf6f3e071 Merge pull request #195 from ivulit/fix/mask-host-tls-emulation
Use mask_host for TLS emulation fetcher
2026-02-21 13:58:38 +03:00
Alexey
c3ebb42120 Peer - Connection closed fixes 2026-02-21 13:56:24 +03:00
Alexey
8d93695194 Merge pull request #196 from telemt/axkurcom-patch-1
Update Cargo.toml
2026-02-21 13:21:00 +03:00
Alexey
40711fda09 Update Cargo.toml 2026-02-21 13:20:44 +03:00
ivulit
6ce25c6600 Use mask_host for TLS emulation fetcher 2026-02-21 10:40:59 +03:00
Alexey
1a525f7d29 Merge pull request #191 from Dimasssss/patch-1
Update config.toml
2026-02-21 05:10:25 +03:00
Alexey
2dcbdbe302 Merge pull request #194 from telemt/flow
ME Frame too large Fixes
2026-02-21 05:04:42 +03:00
Alexey
1bd495a224 Fixed tests 2026-02-21 04:04:49 +03:00
Alexey
b0e6c04c54 Merge pull request #193 from artemws/main
Fix config reload for Docker
2026-02-21 03:37:48 +03:00
Alexey
d5a7882ad1 Merge pull request #190 from vladon/feature/socks-hostname-support
feat: add hostname support for SOCKS4/SOCKS5 upstream proxies
2026-02-21 03:36:58 +03:00
Alexey
83fc9d6db3 Middle-End Fixes
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-21 03:36:13 +03:00
Alexey
c9a043d8d5 ME Frame too large Fixes
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-21 02:15:10 +03:00
artemws
a74bdf8aea Update hot_reload.rs 2026-02-20 23:03:26 +02:00
Dimasssss
94e9bfbbb9 Update config.toml 2026-02-20 22:23:16 +03:00
Dimasssss
18c1444904 Update config.toml 2026-02-20 22:04:56 +03:00
Dimasssss
3b89c1ce7e Update config.toml
user_expirations
2026-02-20 22:02:34 +03:00
Vladislav Yaroslavlev
100cb92ad1 feat: add hostname support for SOCKS4/SOCKS5 upstream proxies
Previously, SOCKS proxy addresses only accepted IP:port format.
Now both IP:port and hostname:port formats are supported.

Changes:
- Try parsing as SocketAddr first (IP:port) for backward compatibility
- Fall back to tokio::net::TcpStream::connect() for hostname resolution
- Log warning if interface binding is specified with hostname (not supported)

Example usage:
[[upstreams]]
type = "socks5"
address = "proxy.example.com:1080"
username = "user"
password = "pass"
2026-02-20 21:42:15 +03:00
Alexey
7da062e448 Merge pull request #188 from telemt/main-stage
From staging #185 + #186 -> main
2026-02-20 18:04:58 +03:00
Alexey
1fd78e012d Metrics + Fixes in tests 2026-02-20 18:02:02 +03:00
Alexey
7304dacd60 Update main.rs 2026-02-20 17:42:20 +03:00
Alexey
3bff0629ca Merge pull request #187 from artemws/patch-1
Update metrics whitelist in README
2026-02-20 17:26:50 +03:00
Alexey
a79f0bbaf5 Merge pull request #186 from telemt/flow
TLS-F + PROXY Protocol Fixes
2026-02-20 17:25:06 +03:00
artemws
aa535bba0a Update metrics whitelist in README
Expanded metrics whitelist to include additional IP ranges.
2026-02-20 16:24:02 +02:00
Alexey
eb3245b78f Merge branch 'main-stage' into flow 2026-02-20 17:19:23 +03:00
Alexey
da84151e9f Merge pull request #184 from artemws/main
CIDR вместо обычного IP адреса metrics_whitelist
2026-02-20 17:15:54 +03:00
Alexey
a303fee65f ALPN Extract tests
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-20 17:12:16 +03:00
Alexey
bae811f8f1 Update Cargo.toml 2026-02-20 17:05:35 +03:00
artemws
8892860490 Change whitelist to use IpNetwork for IP filtering 2026-02-20 16:04:21 +02:00
artemws
0d2958fea7 Change metrics whitelist to use IpNetwork 2026-02-20 16:03:57 +02:00
artemws
dbd9b53940 Change metrics_whitelist type from Vec<IpAddr> to Vec<IpNetwork> 2026-02-20 16:03:38 +02:00
artemws
8f1f051a54 Add ipnetwork dependency to Cargo.toml 2026-02-20 16:03:03 +02:00
Alexey
471c680def TLS Improvements
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-20 17:02:17 +03:00
Alexey
be8742a229 Merge pull request #183 from artemws/main
Config Reload-on-fly
2026-02-20 16:57:38 +03:00
Alexey
781947a08a TlsFrontCache + X509 Parser + GREASE Tolerance
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-20 16:56:33 +03:00
Alexey
b295712dbb Update Cargo.toml 2026-02-20 16:47:13 +03:00
Alexey
e8454ea370 HAProxy PROXY Protocol Fixes
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-20 16:42:40 +03:00
artemws
ea88a40c8f Add config path canonicalization
Canonicalize the config path to match notify events.
2026-02-20 15:37:44 +02:00
Alexey
2ea4c83d9d Normalize IP + Masking + TLS 2026-02-20 16:32:14 +03:00
artemws
953fab68c4 Refactor hot-reload mechanism to use notify crate
Updated hot-reload functionality to use notify crate for file watching and improved documentation.
2026-02-20 15:29:37 +02:00
artemws
0f6621d359 Refactor hot-reload watcher implementation 2026-02-20 15:29:20 +02:00
artemws
82bb93e8da Add notify dependency for macOS file events 2026-02-20 15:28:58 +02:00
artemws
25b18ab064 Enhance logging for hot reload configuration changes
Added detailed logging for various configuration changes during hot reload, including log level, ad tag, middle proxy pool size, and user access changes.
2026-02-20 14:50:37 +02:00
artemws
3e0dc91db6 Add PartialEq to AccessConfig struct 2026-02-20 14:37:00 +02:00
artemws
26270bc651 Specify types for config_rx in main.rs 2026-02-20 14:27:31 +02:00
Alexey
be2ec4b9b4 Update CONTRIBUTING.md 2026-02-20 15:22:18 +03:00
artemws
766806f5df Add hot_reload module to config 2026-02-20 14:19:04 +02:00
artemws
26cf6ff4fa Add files via upload 2026-02-20 14:18:30 +02:00
artemws
b8add81018 Implement hot-reload for config and log level
Added hot-reload functionality for configuration and log level.
2026-02-20 14:18:09 +02:00
29 changed files with 1542 additions and 244 deletions

View File

@@ -1,10 +1,11 @@
## Pull Requests - Rules
# Pull Requests - Rules
## General
- ONLY signed and verified commits
- ONLY from your name
- DO NOT commit with `codex` or `claude` as author/commiter
- PREFER `flow` branch for development, not `main`
### AI
## AI
We are not against modern tools, like AI, where you act as a principal or architect, but we consider it important:
- you really understand what you're doing

View File

@@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.0.5"
version = "3.0.8"
edition = "2024"
[dependencies]
@@ -30,6 +30,7 @@ nix = { version = "0.28", default-features = false, features = ["net"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
toml = "0.8"
x509-parser = "0.15"
# Utils
bytes = "1.9"
@@ -52,6 +53,8 @@ anyhow = "1.0"
# HTTP
reqwest = { version = "0.12", features = ["rustls-tls"], default-features = false }
notify = { version = "6", features = ["macos_fsevent"] }
ipnetwork = "0.20"
hyper = { version = "1", features = ["server", "http1"] }
hyper-util = { version = "0.1", features = ["tokio", "server-auto"] }
http-body-util = "0.1"

View File

@@ -250,7 +250,12 @@ listen_addr_ipv6 = "::"
# listen_unix_sock = "/var/run/telemt.sock" # Unix socket
# listen_unix_sock_perm = "0666" # Socket file permissions
# metrics_port = 9090
# metrics_whitelist = ["127.0.0.1", "::1"]
# metrics_whitelist = [
# "192.168.0.0/24",
# "172.16.0.0/12",
# "127.0.0.1/32",
# "::1/128"
#]
# Listen on multiple interfaces/IPs - IPv4
[[server.listeners]]

View File

@@ -124,6 +124,10 @@ hello = "00000000000000000000000000000000"
# [access.user_data_quota]
# hello = 1073741824 # 1 GB
# [access.user_expirations]
# format: username = "[year]-[month]-[day]T[hour]:[minute]:[second]Z" UTC
# hello = "2027-01-01T00:00:00Z"
# === Upstreams & Routing ===
[[upstreams]]
type = "direct"

View File

@@ -1,5 +1,6 @@
use std::net::IpAddr;
use std::collections::HashMap;
use ipnetwork::IpNetwork;
use serde::Deserialize;
// Helper defaults kept private to the config module.
@@ -66,8 +67,11 @@ pub(crate) fn default_weight() -> u16 {
1
}
pub(crate) fn default_metrics_whitelist() -> Vec<IpAddr> {
vec!["127.0.0.1".parse().unwrap(), "::1".parse().unwrap()]
pub(crate) fn default_metrics_whitelist() -> Vec<IpNetwork> {
vec![
"127.0.0.1/32".parse().unwrap(),
"::1/128".parse().unwrap(),
]
}
pub(crate) fn default_prefer_4() -> u8 {
@@ -106,6 +110,75 @@ pub(crate) fn default_reconnect_backoff_cap_ms() -> u64 {
30_000
}
pub(crate) fn default_crypto_pending_buffer() -> usize {
256 * 1024
}
pub(crate) fn default_max_client_frame() -> usize {
16 * 1024 * 1024
}
pub(crate) fn default_tls_new_session_tickets() -> u8 {
0
}
pub(crate) fn default_server_hello_delay_min_ms() -> u64 {
0
}
pub(crate) fn default_server_hello_delay_max_ms() -> u64 {
0
}
pub(crate) fn default_alpn_enforce() -> bool {
true
}
pub(crate) fn default_stun_servers() -> Vec<String> {
vec![
"stun.l.google.com:19302".to_string(),
"stun1.l.google.com:19302".to_string(),
"stun2.l.google.com:19302".to_string(),
"stun.stunprotocol.org:3478".to_string(),
"stun.voip.eutelia.it:3478".to_string(),
]
}
pub(crate) fn default_http_ip_detect_urls() -> Vec<String> {
vec![
"https://ifconfig.me/ip".to_string(),
"https://api.ipify.org".to_string(),
]
}
pub(crate) fn default_cache_public_ip_path() -> String {
"cache/public_ip.txt".to_string()
}
pub(crate) fn default_proxy_secret_reload_secs() -> u64 {
12 * 60 * 60
}
pub(crate) fn default_proxy_config_reload_secs() -> u64 {
12 * 60 * 60
}
pub(crate) fn default_ntp_check() -> bool {
true
}
pub(crate) fn default_ntp_servers() -> Vec<String> {
vec!["pool.ntp.org".to_string()]
}
pub(crate) fn default_fast_mode_min_tls_record() -> usize {
0
}
pub(crate) fn default_degradation_min_unavailable_dc_groups() -> u8 {
2
}
// Custom deserializer helpers
#[derive(Deserialize)]

421
src/config/hot_reload.rs Normal file
View File

@@ -0,0 +1,421 @@
//! Hot-reload: watches the config file via inotify (Linux) / FSEvents (macOS)
//! / ReadDirectoryChangesW (Windows) using the `notify` crate.
//! SIGHUP is also supported on Unix as an additional manual trigger.
//!
//! # What can be reloaded without restart
//!
//! | Section | Field | Effect |
//! |-----------|-------------------------------|-----------------------------------|
//! | `general` | `log_level` | Filter updated via `log_level_tx` |
//! | `general` | `ad_tag` | Passed on next connection |
//! | `general` | `middle_proxy_pool_size` | Passed on next connection |
//! | `general` | `me_keepalive_*` | Passed on next connection |
//! | `access` | All user/quota fields | Effective immediately |
//!
//! Fields that require re-binding sockets (`server.port`, `censorship.*`,
//! `network.*`, `use_middle_proxy`) are **not** applied; a warning is emitted.
use std::net::IpAddr;
use std::path::PathBuf;
use std::sync::Arc;
use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher};
use tokio::sync::{mpsc, watch};
use tracing::{error, info, warn};
use crate::config::LogLevel;
use super::load::ProxyConfig;
// ── Hot fields ────────────────────────────────────────────────────────────────
/// Fields that are safe to swap without restarting listeners.
#[derive(Debug, Clone, PartialEq)]
pub struct HotFields {
pub log_level: LogLevel,
pub ad_tag: Option<String>,
pub middle_proxy_pool_size: usize,
pub me_keepalive_enabled: bool,
pub me_keepalive_interval_secs: u64,
pub me_keepalive_jitter_secs: u64,
pub me_keepalive_payload_random: bool,
pub access: crate::config::AccessConfig,
}
impl HotFields {
pub fn from_config(cfg: &ProxyConfig) -> Self {
Self {
log_level: cfg.general.log_level.clone(),
ad_tag: cfg.general.ad_tag.clone(),
middle_proxy_pool_size: cfg.general.middle_proxy_pool_size,
me_keepalive_enabled: cfg.general.me_keepalive_enabled,
me_keepalive_interval_secs: cfg.general.me_keepalive_interval_secs,
me_keepalive_jitter_secs: cfg.general.me_keepalive_jitter_secs,
me_keepalive_payload_random: cfg.general.me_keepalive_payload_random,
access: cfg.access.clone(),
}
}
}
// ── Helpers ───────────────────────────────────────────────────────────────────
/// Warn if any non-hot fields changed (require restart).
fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig) {
if old.server.port != new.server.port {
warn!(
"config reload: server.port changed ({} → {}); restart required",
old.server.port, new.server.port
);
}
if old.censorship.tls_domain != new.censorship.tls_domain {
warn!(
"config reload: censorship.tls_domain changed ('{}' → '{}'); restart required",
old.censorship.tls_domain, new.censorship.tls_domain
);
}
if old.network.ipv4 != new.network.ipv4 || old.network.ipv6 != new.network.ipv6 {
warn!("config reload: network.ipv4/ipv6 changed; restart required");
}
if old.general.use_middle_proxy != new.general.use_middle_proxy {
warn!("config reload: use_middle_proxy changed; restart required");
}
}
/// Resolve the public host for link generation — mirrors the logic in main.rs.
///
/// Priority:
/// 1. `[general.links] public_host` — explicit override in config
/// 2. `detected_ip_v4` — from STUN/interface probe at startup
/// 3. `detected_ip_v6` — fallback
/// 4. `"UNKNOWN"` — warn the user to set `public_host`
fn resolve_link_host(
cfg: &ProxyConfig,
detected_ip_v4: Option<IpAddr>,
detected_ip_v6: Option<IpAddr>,
) -> String {
if let Some(ref h) = cfg.general.links.public_host {
return h.clone();
}
detected_ip_v4
.or(detected_ip_v6)
.map(|ip| ip.to_string())
.unwrap_or_else(|| {
warn!(
"config reload: could not determine public IP for proxy links. \
Set [general.links] public_host in config."
);
"UNKNOWN".to_string()
})
}
/// Print TG proxy links for a single user — mirrors print_proxy_links() in main.rs.
fn print_user_links(user: &str, secret: &str, host: &str, port: u16, cfg: &ProxyConfig) {
info!(target: "telemt::links", "--- New user: {} ---", user);
if cfg.general.modes.classic {
info!(
target: "telemt::links",
" Classic: tg://proxy?server={}&port={}&secret={}",
host, port, secret
);
}
if cfg.general.modes.secure {
info!(
target: "telemt::links",
" DD: tg://proxy?server={}&port={}&secret=dd{}",
host, port, secret
);
}
if cfg.general.modes.tls {
let mut domains = vec![cfg.censorship.tls_domain.clone()];
for d in &cfg.censorship.tls_domains {
if !domains.contains(d) {
domains.push(d.clone());
}
}
for domain in &domains {
let domain_hex = hex::encode(domain.as_bytes());
info!(
target: "telemt::links",
" EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}",
host, port, secret, domain_hex
);
}
}
info!(target: "telemt::links", "--------------------");
}
/// Log all detected changes and emit TG links for new users.
fn log_changes(
old_hot: &HotFields,
new_hot: &HotFields,
new_cfg: &ProxyConfig,
log_tx: &watch::Sender<LogLevel>,
detected_ip_v4: Option<IpAddr>,
detected_ip_v6: Option<IpAddr>,
) {
if old_hot.log_level != new_hot.log_level {
info!(
"config reload: log_level: '{}' → '{}'",
old_hot.log_level, new_hot.log_level
);
log_tx.send(new_hot.log_level.clone()).ok();
}
if old_hot.ad_tag != new_hot.ad_tag {
info!(
"config reload: ad_tag: {} → {}",
old_hot.ad_tag.as_deref().unwrap_or("none"),
new_hot.ad_tag.as_deref().unwrap_or("none"),
);
}
if old_hot.middle_proxy_pool_size != new_hot.middle_proxy_pool_size {
info!(
"config reload: middle_proxy_pool_size: {} → {}",
old_hot.middle_proxy_pool_size, new_hot.middle_proxy_pool_size,
);
}
if old_hot.me_keepalive_enabled != new_hot.me_keepalive_enabled
|| old_hot.me_keepalive_interval_secs != new_hot.me_keepalive_interval_secs
|| old_hot.me_keepalive_jitter_secs != new_hot.me_keepalive_jitter_secs
|| old_hot.me_keepalive_payload_random != new_hot.me_keepalive_payload_random
{
info!(
"config reload: me_keepalive: enabled={} interval={}s jitter={}s random_payload={}",
new_hot.me_keepalive_enabled,
new_hot.me_keepalive_interval_secs,
new_hot.me_keepalive_jitter_secs,
new_hot.me_keepalive_payload_random,
);
}
if old_hot.access.users != new_hot.access.users {
let mut added: Vec<&String> = new_hot.access.users.keys()
.filter(|u| !old_hot.access.users.contains_key(*u))
.collect();
added.sort();
let mut removed: Vec<&String> = old_hot.access.users.keys()
.filter(|u| !new_hot.access.users.contains_key(*u))
.collect();
removed.sort();
let mut changed: Vec<&String> = new_hot.access.users.keys()
.filter(|u| {
old_hot.access.users.get(*u)
.map(|s| s != &new_hot.access.users[*u])
.unwrap_or(false)
})
.collect();
changed.sort();
if !added.is_empty() {
info!(
"config reload: users added: [{}]",
added.iter().map(|s| s.as_str()).collect::<Vec<_>>().join(", ")
);
let host = resolve_link_host(new_cfg, detected_ip_v4, detected_ip_v6);
let port = new_cfg.general.links.public_port.unwrap_or(new_cfg.server.port);
for user in &added {
if let Some(secret) = new_hot.access.users.get(*user) {
print_user_links(user, secret, &host, port, new_cfg);
}
}
}
if !removed.is_empty() {
info!(
"config reload: users removed: [{}]",
removed.iter().map(|s| s.as_str()).collect::<Vec<_>>().join(", ")
);
}
if !changed.is_empty() {
info!(
"config reload: users secret changed: [{}]",
changed.iter().map(|s| s.as_str()).collect::<Vec<_>>().join(", ")
);
}
}
if old_hot.access.user_max_tcp_conns != new_hot.access.user_max_tcp_conns {
info!(
"config reload: user_max_tcp_conns updated ({} entries)",
new_hot.access.user_max_tcp_conns.len()
);
}
if old_hot.access.user_expirations != new_hot.access.user_expirations {
info!(
"config reload: user_expirations updated ({} entries)",
new_hot.access.user_expirations.len()
);
}
if old_hot.access.user_data_quota != new_hot.access.user_data_quota {
info!(
"config reload: user_data_quota updated ({} entries)",
new_hot.access.user_data_quota.len()
);
}
if old_hot.access.user_max_unique_ips != new_hot.access.user_max_unique_ips {
info!(
"config reload: user_max_unique_ips updated ({} entries)",
new_hot.access.user_max_unique_ips.len()
);
}
}
/// Load config, validate, diff against current, and broadcast if changed.
fn reload_config(
config_path: &PathBuf,
config_tx: &watch::Sender<Arc<ProxyConfig>>,
log_tx: &watch::Sender<LogLevel>,
detected_ip_v4: Option<IpAddr>,
detected_ip_v6: Option<IpAddr>,
) {
let new_cfg = match ProxyConfig::load(config_path) {
Ok(c) => c,
Err(e) => {
error!("config reload: failed to parse {:?}: {}", config_path, e);
return;
}
};
if let Err(e) = new_cfg.validate() {
error!("config reload: validation failed: {}; keeping old config", e);
return;
}
let old_cfg = config_tx.borrow().clone();
let old_hot = HotFields::from_config(&old_cfg);
let new_hot = HotFields::from_config(&new_cfg);
if old_hot == new_hot {
return;
}
warn_non_hot_changes(&old_cfg, &new_cfg);
log_changes(&old_hot, &new_hot, &new_cfg, log_tx, detected_ip_v4, detected_ip_v6);
config_tx.send(Arc::new(new_cfg)).ok();
}
// ── Public API ────────────────────────────────────────────────────────────────
/// Spawn the hot-reload watcher task.
///
/// Uses `notify` (inotify on Linux) to detect file changes instantly.
/// SIGHUP is also handled on Unix as an additional manual trigger.
///
/// `detected_ip_v4` / `detected_ip_v6` are the IPs discovered during the
/// startup probe — used when generating proxy links for newly added users,
/// matching the same logic as the startup output.
pub fn spawn_config_watcher(
config_path: PathBuf,
initial: Arc<ProxyConfig>,
detected_ip_v4: Option<IpAddr>,
detected_ip_v6: Option<IpAddr>,
) -> (watch::Receiver<Arc<ProxyConfig>>, watch::Receiver<LogLevel>) {
let initial_level = initial.general.log_level.clone();
let (config_tx, config_rx) = watch::channel(initial);
let (log_tx, log_rx) = watch::channel(initial_level);
// Bridge: sync notify callbacks → async task via mpsc.
let (notify_tx, mut notify_rx) = mpsc::channel::<()>(4);
// Canonicalize so path matches what notify returns (absolute) in events.
let config_path = match config_path.canonicalize() {
Ok(p) => p,
Err(_) => config_path.to_path_buf(),
};
// Watch the parent directory rather than the file itself, because many
// editors (vim, nano) and systemd write via rename, which would cause
// inotify to lose track of the original inode.
let watch_dir = config_path
.parent()
.unwrap_or_else(|| std::path::Path::new("."))
.to_path_buf();
// ── inotify watcher (instant on local fs) ────────────────────────────
let config_file = config_path.clone();
let tx_inotify = notify_tx.clone();
let inotify_ok = match recommended_watcher(move |res: notify::Result<notify::Event>| {
let Ok(event) = res else { return };
let is_our_file = event.paths.iter().any(|p| p == &config_file);
if !is_our_file { return; }
if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)) {
let _ = tx_inotify.try_send(());
}
}) {
Ok(mut w) => match w.watch(&watch_dir, RecursiveMode::NonRecursive) {
Ok(()) => {
info!("config watcher: inotify active on {:?}", config_path);
Box::leak(Box::new(w));
true
}
Err(e) => { warn!("config watcher: inotify watch failed: {}", e); false }
},
Err(e) => { warn!("config watcher: inotify unavailable: {}", e); false }
};
// ── poll watcher (always active, fixes Docker bind mounts / NFS) ─────
// inotify does not receive events for files mounted from the host into
// a container. PollWatcher compares file contents every 3 s and fires
// on any change regardless of the underlying fs.
let config_file2 = config_path.clone();
let tx_poll = notify_tx.clone();
match notify::poll::PollWatcher::new(
move |res: notify::Result<notify::Event>| {
let Ok(event) = res else { return };
let is_our_file = event.paths.iter().any(|p| p == &config_file2);
if !is_our_file { return; }
if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)) {
let _ = tx_poll.try_send(());
}
},
notify::Config::default()
.with_poll_interval(std::time::Duration::from_secs(3))
.with_compare_contents(true),
) {
Ok(mut w) => match w.watch(&config_path, RecursiveMode::NonRecursive) {
Ok(()) => {
if inotify_ok {
info!("config watcher: poll watcher also active (Docker/NFS safe)");
} else {
info!("config watcher: poll watcher active on {:?} (3s interval)", config_path);
}
Box::leak(Box::new(w));
}
Err(e) => warn!("config watcher: poll watch failed: {}", e),
},
Err(e) => warn!("config watcher: poll watcher unavailable: {}", e),
}
// ── event loop ───────────────────────────────────────────────────────
tokio::spawn(async move {
#[cfg(unix)]
let mut sighup = {
use tokio::signal::unix::{SignalKind, signal};
signal(SignalKind::hangup()).expect("Failed to register SIGHUP handler")
};
loop {
#[cfg(unix)]
tokio::select! {
msg = notify_rx.recv() => {
if msg.is_none() { break; }
}
_ = sighup.recv() => {
info!("SIGHUP received — reloading {:?}", config_path);
}
}
#[cfg(not(unix))]
if notify_rx.recv().await.is_none() { break; }
// Debounce: drain extra events that arrive within 50 ms.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
while notify_rx.try_recv().is_ok() {}
reload_config(&config_path, &config_tx, &log_tx, detected_ip_v4, detected_ip_v6);
}
});
(config_rx, log_rx)
}

View File

@@ -194,6 +194,10 @@ impl ProxyConfig {
validate_network_cfg(&mut config.network)?;
if config.general.use_middle_proxy && config.network.ipv6 == Some(true) {
warn!("IPv6 with Middle Proxy is experimental and may cause KDF address mismatch; consider disabling IPv6 or ME");
}
// Random fake_cert_len only when default is in use.
if !config.censorship.tls_emulation && config.censorship.fake_cert_len == default_fake_cert_len() {
config.censorship.fake_cert_len = rand::rng().gen_range(1024..4096);
@@ -222,6 +226,7 @@ impl ProxyConfig {
ip: ipv4,
announce: None,
announce_ip: None,
proxy_protocol: None,
});
}
if let Some(ipv6_str) = &config.server.listen_addr_ipv6 {
@@ -230,6 +235,7 @@ impl ProxyConfig {
ip: ipv6,
announce: None,
announce_ip: None,
proxy_protocol: None,
});
}
}

View File

@@ -3,6 +3,7 @@
pub(crate) mod defaults;
mod types;
mod load;
pub mod hot_reload;
pub use load::ProxyConfig;
pub use types::*;

View File

@@ -1,4 +1,5 @@
use chrono::{DateTime, Utc};
use ipnetwork::IpNetwork;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::IpAddr;
@@ -95,6 +96,22 @@ pub struct NetworkConfig {
#[serde(default)]
pub multipath: bool,
/// STUN servers list for public IP discovery.
#[serde(default = "default_stun_servers")]
pub stun_servers: Vec<String>,
/// Enable TCP STUN fallback when UDP is blocked.
#[serde(default)]
pub stun_tcp_fallback: bool,
/// HTTP-based public IP detection endpoints (fallback after STUN).
#[serde(default = "default_http_ip_detect_urls")]
pub http_ip_detect_urls: Vec<String>,
/// Cache file path for detected public IP.
#[serde(default = "default_cache_public_ip_path")]
pub cache_public_ip_path: String,
}
impl Default for NetworkConfig {
@@ -104,6 +121,10 @@ impl Default for NetworkConfig {
ipv6: None,
prefer: 4,
multipath: false,
stun_servers: default_stun_servers(),
stun_tcp_fallback: true,
http_ip_detect_urls: default_http_ip_detect_urls(),
cache_public_ip_path: default_cache_public_ip_path(),
}
}
}
@@ -171,6 +192,15 @@ pub struct GeneralConfig {
#[serde(default = "default_true")]
pub me_keepalive_payload_random: bool,
/// Max pending ciphertext buffer per client writer (bytes).
/// Controls FakeTLS backpressure vs throughput.
#[serde(default = "default_crypto_pending_buffer")]
pub crypto_pending_buffer: usize,
/// Maximum allowed client MTProto frame size (bytes).
#[serde(default = "default_max_client_frame")]
pub max_client_frame: usize,
/// Enable staggered warmup of extra ME writers.
#[serde(default = "default_true")]
pub me_warmup_stagger_enabled: bool,
@@ -217,6 +247,34 @@ pub struct GeneralConfig {
/// [general.links] — proxy link generation overrides.
#[serde(default)]
pub links: LinksConfig,
/// Minimum TLS record size when fast_mode coalescing is enabled (0 = disabled).
#[serde(default = "default_fast_mode_min_tls_record")]
pub fast_mode_min_tls_record: usize,
/// Automatically reload proxy-secret every N seconds.
#[serde(default = "default_proxy_secret_reload_secs")]
pub proxy_secret_auto_reload_secs: u64,
/// Automatically reload proxy-multi.conf every N seconds.
#[serde(default = "default_proxy_config_reload_secs")]
pub proxy_config_auto_reload_secs: u64,
/// Enable NTP drift check at startup.
#[serde(default = "default_ntp_check")]
pub ntp_check: bool,
/// NTP servers for drift check.
#[serde(default = "default_ntp_servers")]
pub ntp_servers: Vec<String>,
/// Enable auto-degradation from ME to Direct-DC.
#[serde(default = "default_true")]
pub auto_degradation_enabled: bool,
/// Minimum unavailable ME DC groups before degrading.
#[serde(default = "default_degradation_min_unavailable_dc_groups")]
pub degradation_min_unavailable_dc_groups: u8,
}
impl Default for GeneralConfig {
@@ -250,6 +308,15 @@ impl Default for GeneralConfig {
log_level: LogLevel::Normal,
disable_colors: false,
links: LinksConfig::default(),
crypto_pending_buffer: default_crypto_pending_buffer(),
max_client_frame: default_max_client_frame(),
fast_mode_min_tls_record: default_fast_mode_min_tls_record(),
proxy_secret_auto_reload_secs: default_proxy_secret_reload_secs(),
proxy_config_auto_reload_secs: default_proxy_config_reload_secs(),
ntp_check: default_ntp_check(),
ntp_servers: default_ntp_servers(),
auto_degradation_enabled: true,
degradation_min_unavailable_dc_groups: default_degradation_min_unavailable_dc_groups(),
}
}
}
@@ -304,7 +371,7 @@ pub struct ServerConfig {
pub metrics_port: Option<u16>,
#[serde(default = "default_metrics_whitelist")]
pub metrics_whitelist: Vec<IpAddr>,
pub metrics_whitelist: Vec<IpNetwork>,
#[serde(default)]
pub listeners: Vec<ListenerConfig>,
@@ -394,6 +461,22 @@ pub struct AntiCensorshipConfig {
/// Directory to store TLS front cache (on disk).
#[serde(default = "default_tls_front_dir")]
pub tls_front_dir: String,
/// Minimum server_hello delay in milliseconds (anti-fingerprint).
#[serde(default = "default_server_hello_delay_min_ms")]
pub server_hello_delay_min_ms: u64,
/// Maximum server_hello delay in milliseconds.
#[serde(default = "default_server_hello_delay_max_ms")]
pub server_hello_delay_max_ms: u64,
/// Number of NewSessionTicket messages to emit post-handshake.
#[serde(default = "default_tls_new_session_tickets")]
pub tls_new_session_tickets: u8,
/// Enforce ALPN echo of client preference.
#[serde(default = "default_alpn_enforce")]
pub alpn_enforce: bool,
}
impl Default for AntiCensorshipConfig {
@@ -408,11 +491,15 @@ impl Default for AntiCensorshipConfig {
fake_cert_len: default_fake_cert_len(),
tls_emulation: false,
tls_front_dir: default_tls_front_dir(),
server_hello_delay_min_ms: default_server_hello_delay_min_ms(),
server_hello_delay_max_ms: default_server_hello_delay_max_ms(),
tls_new_session_tickets: default_tls_new_session_tickets(),
alpn_enforce: default_alpn_enforce(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AccessConfig {
#[serde(default)]
pub users: HashMap<String, String>,
@@ -513,6 +600,9 @@ pub struct ListenerConfig {
/// Migrated to `announce` automatically if `announce` is not set.
#[serde(default)]
pub announce_ip: Option<IpAddr>,
/// Per-listener PROXY protocol override. When set, overrides global server.proxy_protocol.
#[serde(default)]
pub proxy_protocol: Option<bool>,
}
// ============= ShowLink =============

View File

@@ -3,6 +3,7 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use rand::Rng;
use tokio::net::TcpListener;
use tokio::signal;
use tokio::sync::Semaphore;
@@ -27,6 +28,7 @@ mod tls_front;
mod util;
use crate::config::{LogLevel, ProxyConfig};
use crate::config::hot_reload::spawn_config_watcher;
use crate::crypto::SecureRandom;
use crate::ip_tracker::UserIpTracker;
use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe};
@@ -211,6 +213,9 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
"Modes: classic={} secure={} tls={}",
config.general.modes.classic, config.general.modes.secure, config.general.modes.tls
);
if config.general.modes.classic {
warn!("Classic mode is vulnerable to DPI detection; enable only for legacy clients");
}
info!("TLS domain: {}", config.censorship.tls_domain);
if let Some(ref sock) = config.censorship.mask_unix_sock {
info!("Mask: {} -> unix:{}", config.censorship.mask, sock);
@@ -259,46 +264,6 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
info!("IP limits configured for {} users", config.access.user_max_unique_ips.len());
}
// TLS front cache (optional emulation)
let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len());
tls_domains.push(config.censorship.tls_domain.clone());
for d in &config.censorship.tls_domains {
if !tls_domains.contains(d) {
tls_domains.push(d.clone());
}
}
let tls_cache: Option<Arc<TlsFrontCache>> = if config.censorship.tls_emulation {
let cache = Arc::new(TlsFrontCache::new(
&tls_domains,
config.censorship.fake_cert_len,
&config.censorship.tls_front_dir,
));
let cache_clone = cache.clone();
let domains = tls_domains.clone();
let port = config.censorship.mask_port;
tokio::spawn(async move {
for domain in domains {
match crate::tls_front::fetcher::fetch_real_tls(
&domain,
port,
&domain,
Duration::from_secs(5),
)
.await
{
Ok(res) => cache_clone.update_from_fetch(&domain, res).await,
Err(e) => warn!(domain = %domain, error = %e, "TLS emulation fetch failed"),
}
}
});
Some(cache)
} else {
None
};
// Connection concurrency limit
let _max_connections = Arc::new(Semaphore::new(10_000));
@@ -477,6 +442,74 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone()));
let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096));
// TLS front cache (optional emulation)
let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len());
tls_domains.push(config.censorship.tls_domain.clone());
for d in &config.censorship.tls_domains {
if !tls_domains.contains(d) {
tls_domains.push(d.clone());
}
}
let tls_cache: Option<Arc<TlsFrontCache>> = if config.censorship.tls_emulation {
let cache = Arc::new(TlsFrontCache::new(
&tls_domains,
config.censorship.fake_cert_len,
&config.censorship.tls_front_dir,
));
cache.load_from_disk().await;
let port = config.censorship.mask_port;
let mask_host = config.censorship.mask_host.clone()
.unwrap_or_else(|| config.censorship.tls_domain.clone());
// Initial synchronous fetch to warm cache before serving clients.
for domain in tls_domains.clone() {
match crate::tls_front::fetcher::fetch_real_tls(
&mask_host,
port,
&domain,
Duration::from_secs(5),
Some(upstream_manager.clone()),
)
.await
{
Ok(res) => cache.update_from_fetch(&domain, res).await,
Err(e) => warn!(domain = %domain, error = %e, "TLS emulation fetch failed"),
}
}
// Periodic refresh with jitter.
let cache_clone = cache.clone();
let domains = tls_domains.clone();
let upstream_for_task = upstream_manager.clone();
tokio::spawn(async move {
loop {
let base_secs = rand::rng().random_range(4 * 3600..=6 * 3600);
let jitter_secs = rand::rng().random_range(0..=7200);
tokio::time::sleep(Duration::from_secs(base_secs + jitter_secs)).await;
for domain in &domains {
match crate::tls_front::fetcher::fetch_real_tls(
&mask_host,
port,
domain,
Duration::from_secs(5),
Some(upstream_for_task.clone()),
)
.await
{
Ok(res) => cache_clone.update_from_fetch(domain, res).await,
Err(e) => warn!(domain = %domain, error = %e, "TLS emulation refresh failed"),
}
}
}
});
Some(cache)
} else {
None
};
// Middle-End ping before DC connectivity
if let Some(ref pool) = me_pool {
let me_results = run_me_ping(pool, &rng).await;
@@ -656,6 +689,19 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
detected_ip_v4, detected_ip_v6
);
// ── Hot-reload watcher ────────────────────────────────────────────────
// Uses inotify to detect file changes instantly (SIGHUP also works).
// detected_ip_v4/v6 are passed so newly added users get correct TG links.
let (config_rx, mut log_level_rx): (
tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
tokio::sync::watch::Receiver<LogLevel>,
) = spawn_config_watcher(
std::path::PathBuf::from(&config_path),
config.clone(),
detected_ip_v4,
detected_ip_v6,
);
let mut listeners = Vec::new();
for listener_conf in &config.server.listeners {
@@ -677,6 +723,8 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
Ok(socket) => {
let listener = TcpListener::from_std(socket.into())?;
info!("Listening on {}", addr);
let listener_proxy_protocol =
listener_conf.proxy_protocol.unwrap_or(config.server.proxy_protocol);
// Resolve the public host for link generation
let public_host = if let Some(ref announce) = listener_conf.announce {
@@ -702,7 +750,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
print_proxy_links(&public_host, link_port, &config);
}
listeners.push(listener);
listeners.push((listener, listener_proxy_protocol));
}
Err(e) => {
error!("Failed to bind to {}: {}", addr, e);
@@ -760,7 +808,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
has_unix_listener = true;
let config = config.clone();
let mut config_rx_unix: tokio::sync::watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone();
@@ -779,7 +827,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let conn_id = unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let fake_peer = SocketAddr::from(([127, 0, 0, 1], (conn_id % 65535) as u16));
let config = config.clone();
let config = config_rx_unix.borrow_and_update().clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone();
@@ -788,12 +836,13 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
let proxy_protocol_enabled = config.server.proxy_protocol;
tokio::spawn(async move {
if let Err(e) = crate::proxy::client::handle_client_stream(
stream, fake_peer, config, stats,
upstream_manager, replay_checker, buffer_pool, rng,
me_pool, tls_cache, ip_tracker,
me_pool, tls_cache, ip_tracker, proxy_protocol_enabled,
).await {
debug!(error = %e, "Unix socket connection error");
}
@@ -825,6 +874,20 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
.reload(runtime_filter)
.expect("Failed to switch log filter");
// Apply log_level changes from hot-reload to the tracing filter.
tokio::spawn(async move {
loop {
if log_level_rx.changed().await.is_err() {
break;
}
let level = log_level_rx.borrow_and_update().clone();
let new_filter = tracing_subscriber::EnvFilter::new(level.to_filter_str());
if let Err(e) = filter_handle.reload(new_filter) {
tracing::error!("config reload: failed to update log filter: {}", e);
}
}
});
if let Some(port) = config.server.metrics_port {
let stats = stats.clone();
let whitelist = config.server.metrics_whitelist.clone();
@@ -833,8 +896,8 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
});
}
for listener in listeners {
let config = config.clone();
for (listener, listener_proxy_protocol) in listeners {
let mut config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone();
@@ -848,7 +911,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
loop {
match listener.accept().await {
Ok((stream, peer_addr)) => {
let config = config.clone();
let config = config_rx.borrow_and_update().clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone();
@@ -857,6 +920,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
let proxy_protocol_enabled = listener_proxy_protocol;
tokio::spawn(async move {
if let Err(e) = ClientHandler::new(
@@ -871,11 +935,45 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
me_pool,
tls_cache,
ip_tracker,
proxy_protocol_enabled,
)
.run()
.await
{
warn!(peer = %peer_addr, error = %e, "Connection closed with error");
let peer_closed = matches!(
&e,
crate::error::ProxyError::Io(ioe)
if matches!(
ioe.kind(),
std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::NotConnected
)
) || matches!(
&e,
crate::error::ProxyError::Stream(
crate::error::StreamError::Io(ioe)
)
if matches!(
ioe.kind(),
std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::NotConnected
)
);
let me_closed = matches!(
&e,
crate::error::ProxyError::Proxy(msg) if msg == "ME connection lost"
);
match (peer_closed, me_closed) {
(true, _) => debug!(peer = %peer_addr, error = %e, "Connection closed by client"),
(_, true) => warn!(peer = %peer_addr, error = %e, "Connection closed: Middle-End dropped session"),
_ => warn!(peer = %peer_addr, error = %e, "Connection closed with error"),
}
}
});
}

View File

@@ -1,18 +1,19 @@
use std::convert::Infallible;
use std::net::{IpAddr, SocketAddr};
use std::net::SocketAddr;
use std::sync::Arc;
use http_body_util::Full;
use http_body_util::{Full, BodyExt};
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response, StatusCode};
use ipnetwork::IpNetwork;
use tokio::net::TcpListener;
use tracing::{info, warn, debug};
use crate::stats::Stats;
pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpAddr>) {
pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) {
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = match TcpListener::bind(addr).await {
Ok(l) => l,
@@ -32,7 +33,7 @@ pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpAddr>) {
}
};
if !whitelist.is_empty() && !whitelist.contains(&peer.ip()) {
if !whitelist.is_empty() && !whitelist.iter().any(|net| net.contains(peer.ip())) {
debug!(peer = %peer, "Metrics request denied by whitelist");
continue;
}
@@ -53,7 +54,7 @@ pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpAddr>) {
}
}
fn handle(req: Request<hyper::body::Incoming>, stats: &Stats) -> Result<Response<Full<Bytes>>, Infallible> {
fn handle<B>(req: Request<B>, stats: &Stats) -> Result<Response<Full<Bytes>>, Infallible> {
if req.uri().path() != "/metrics" {
let resp = Response::builder()
.status(StatusCode::NOT_FOUND)
@@ -193,21 +194,20 @@ mod tests {
stats.increment_connects_all();
stats.increment_connects_all();
let port = 19091u16;
let s = stats.clone();
tokio::spawn(async move {
serve(port, s, vec![]).await;
});
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let req = Request::builder()
.uri("/metrics")
.body(())
.unwrap();
let resp = handle(req, &stats).unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes();
assert!(std::str::from_utf8(body.as_ref()).unwrap().contains("telemt_connections_total 3"));
let resp = reqwest::get(format!("http://127.0.0.1:{}/metrics", port))
.await.unwrap();
assert_eq!(resp.status(), 200);
let body = resp.text().await.unwrap();
assert!(body.contains("telemt_connections_total 3"));
let resp404 = reqwest::get(format!("http://127.0.0.1:{}/other", port))
.await.unwrap();
assert_eq!(resp404.status(), 404);
let req404 = Request::builder()
.uri("/other")
.body(())
.unwrap();
let resp404 = handle(req404, &stats).unwrap();
assert_eq!(resp404.status(), StatusCode::NOT_FOUND);
}
}

View File

@@ -1,6 +1,8 @@
//! Protocol constants and datacenter addresses
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use crate::crypto::SecureRandom;
use std::sync::LazyLock;
// ============= Telegram Datacenters =============
@@ -151,7 +153,18 @@ pub const TLS_RECORD_ALERT: u8 = 0x15;
/// Maximum TLS record size
pub const MAX_TLS_RECORD_SIZE: usize = 16384;
/// Maximum TLS chunk size (with overhead)
pub const MAX_TLS_CHUNK_SIZE: usize = 16384 + 24;
/// RFC 8446 §5.2 allows up to 16384 + 256 bytes of ciphertext
pub const MAX_TLS_CHUNK_SIZE: usize = 16384 + 256;
/// Generate padding length for Secure Intermediate protocol.
/// Total (data + padding) must not be divisible by 4 per MTProto spec.
pub fn secure_padding_len(data_len: usize, rng: &SecureRandom) -> usize {
if data_len % 4 == 0 {
(rng.range(3) + 1) as usize // 1-3
} else {
rng.range(4) as usize // 0-3
}
}
// ============= Timeouts =============
@@ -319,4 +332,4 @@ mod tests {
assert_eq!(TG_DATACENTERS_V4.len(), 5);
assert_eq!(TG_DATACENTERS_V6.len(), 5);
}
}
}

View File

@@ -32,6 +32,7 @@ pub const TIME_SKEW_MAX: i64 = 10 * 60; // 10 minutes after
mod extension_type {
pub const KEY_SHARE: u16 = 0x0033;
pub const SUPPORTED_VERSIONS: u16 = 0x002b;
pub const ALPN: u16 = 0x0010;
}
/// TLS Cipher Suites
@@ -62,6 +63,7 @@ pub struct TlsValidation {
// ============= TLS Extension Builder =============
/// Builder for TLS extensions with correct length calculation
#[derive(Clone)]
struct TlsExtensionBuilder {
extensions: Vec<u8>,
}
@@ -108,6 +110,27 @@ impl TlsExtensionBuilder {
self
}
/// Add ALPN extension with a single selected protocol.
fn add_alpn(&mut self, proto: &[u8]) -> &mut Self {
// Extension type: ALPN (0x0010)
self.extensions.extend_from_slice(&extension_type::ALPN.to_be_bytes());
// ALPN extension format:
// extension_data length (2 bytes)
// protocols length (2 bytes)
// protocol name length (1 byte)
// protocol name bytes
let proto_len = proto.len() as u8;
let list_len: u16 = 1 + proto_len as u16;
let ext_len: u16 = 2 + list_len;
self.extensions.extend_from_slice(&ext_len.to_be_bytes());
self.extensions.extend_from_slice(&list_len.to_be_bytes());
self.extensions.push(proto_len);
self.extensions.extend_from_slice(proto);
self
}
/// Build final extensions with length prefix
fn build(self) -> Vec<u8> {
@@ -144,6 +167,8 @@ struct ServerHelloBuilder {
compression: u8,
/// Extensions
extensions: TlsExtensionBuilder,
/// Selected ALPN protocol (if any)
alpn: Option<Vec<u8>>,
}
impl ServerHelloBuilder {
@@ -154,6 +179,7 @@ impl ServerHelloBuilder {
cipher_suite: cipher_suite::TLS_AES_128_GCM_SHA256,
compression: 0x00,
extensions: TlsExtensionBuilder::new(),
alpn: None,
}
}
@@ -167,10 +193,19 @@ impl ServerHelloBuilder {
self.extensions.add_supported_versions(0x0304);
self
}
fn with_alpn(mut self, proto: Option<Vec<u8>>) -> Self {
self.alpn = proto;
self
}
/// Build ServerHello message (without record header)
fn build_message(&self) -> Vec<u8> {
let extensions = self.extensions.extensions.clone();
let mut ext_builder = self.extensions.clone();
if let Some(ref alpn) = self.alpn {
ext_builder.add_alpn(alpn);
}
let extensions = ext_builder.extensions.clone();
let extensions_len = extensions.len() as u16;
// Calculate total length
@@ -350,13 +385,19 @@ pub fn build_server_hello(
session_id: &[u8],
fake_cert_len: usize,
rng: &SecureRandom,
alpn: Option<Vec<u8>>,
new_session_tickets: u8,
) -> Vec<u8> {
const MIN_APP_DATA: usize = 64;
const MAX_APP_DATA: usize = 16640; // RFC 8446 §5.2 upper bound
let fake_cert_len = fake_cert_len.max(MIN_APP_DATA).min(MAX_APP_DATA);
let x25519_key = gen_fake_x25519_key(rng);
// Build ServerHello
let server_hello = ServerHelloBuilder::new(session_id.to_vec())
.with_x25519_key(&x25519_key)
.with_tls13_version()
.with_alpn(alpn)
.build_record();
// Build Change Cipher Spec record
@@ -373,15 +414,35 @@ pub fn build_server_hello(
app_data_record.push(TLS_RECORD_APPLICATION);
app_data_record.extend_from_slice(&TLS_VERSION);
app_data_record.extend_from_slice(&(fake_cert_len as u16).to_be_bytes());
// Fill ApplicationData with fully random bytes of desired length to avoid
// deterministic DPI fingerprints (fixed inner content type markers).
app_data_record.extend_from_slice(&fake_cert);
// Build optional NewSessionTicket records (TLS 1.3 handshake messages are encrypted;
// here we mimic with opaque ApplicationData records of plausible size).
let mut tickets = Vec::new();
if new_session_tickets > 0 {
for _ in 0..new_session_tickets {
let ticket_len: usize = rng.range(48) + 48; // 48-95 bytes
let mut record = Vec::with_capacity(5 + ticket_len);
record.push(TLS_RECORD_APPLICATION);
record.extend_from_slice(&TLS_VERSION);
record.extend_from_slice(&(ticket_len as u16).to_be_bytes());
record.extend_from_slice(&rng.bytes(ticket_len));
tickets.push(record);
}
}
// Combine all records
let mut response = Vec::with_capacity(
server_hello.len() + change_cipher_spec.len() + app_data_record.len()
server_hello.len() + change_cipher_spec.len() + app_data_record.len() + tickets.iter().map(|r| r.len()).sum::<usize>()
);
response.extend_from_slice(&server_hello);
response.extend_from_slice(&change_cipher_spec);
response.extend_from_slice(&app_data_record);
for t in &tickets {
response.extend_from_slice(t);
}
// Compute HMAC for the response
let mut hmac_input = Vec::with_capacity(TLS_DIGEST_LEN + response.len());
@@ -475,6 +536,53 @@ pub fn extract_sni_from_client_hello(handshake: &[u8]) -> Option<String> {
None
}
/// Extract ALPN protocol list from ClientHello, return in offered order.
pub fn extract_alpn_from_client_hello(handshake: &[u8]) -> Vec<Vec<u8>> {
let mut pos = 5; // after record header
if handshake.get(pos) != Some(&0x01) {
return Vec::new();
}
pos += 4; // type + len
pos += 2 + 32; // version + random
if pos >= handshake.len() { return Vec::new(); }
let session_id_len = *handshake.get(pos).unwrap_or(&0) as usize;
pos += 1 + session_id_len;
if pos + 2 > handshake.len() { return Vec::new(); }
let cipher_len = u16::from_be_bytes([handshake[pos], handshake[pos+1]]) as usize;
pos += 2 + cipher_len;
if pos >= handshake.len() { return Vec::new(); }
let comp_len = *handshake.get(pos).unwrap_or(&0) as usize;
pos += 1 + comp_len;
if pos + 2 > handshake.len() { return Vec::new(); }
let ext_len = u16::from_be_bytes([handshake[pos], handshake[pos+1]]) as usize;
pos += 2;
let ext_end = pos + ext_len;
if ext_end > handshake.len() { return Vec::new(); }
let mut out = Vec::new();
while pos + 4 <= ext_end {
let etype = u16::from_be_bytes([handshake[pos], handshake[pos+1]]);
let elen = u16::from_be_bytes([handshake[pos+2], handshake[pos+3]]) as usize;
pos += 4;
if pos + elen > ext_end { break; }
if etype == extension_type::ALPN && elen >= 3 {
let list_len = u16::from_be_bytes([handshake[pos], handshake[pos+1]]) as usize;
let mut lp = pos + 2;
let list_end = (pos + 2).saturating_add(list_len).min(pos + elen);
while lp + 1 <= list_end {
let plen = handshake[lp] as usize;
lp += 1;
if lp + plen > list_end { break; }
out.push(handshake[lp..lp+plen].to_vec());
lp += plen;
}
break;
}
pos += elen;
}
out
}
/// Check if bytes look like a TLS ClientHello
pub fn is_tls_handshake(first_bytes: &[u8]) -> bool {
if first_bytes.len() < 3 {
@@ -653,7 +761,7 @@ mod tests {
let session_id = vec![0xAA; 32];
let rng = SecureRandom::new();
let response = build_server_hello(secret, &client_digest, &session_id, 2048, &rng);
let response = build_server_hello(secret, &client_digest, &session_id, 2048, &rng, None, 0);
// Should have at least 3 records
assert!(response.len() > 100);
@@ -686,8 +794,8 @@ mod tests {
let session_id = vec![0xAA; 32];
let rng = SecureRandom::new();
let response1 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng);
let response2 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng);
let response1 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng, None, 0);
let response2 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng, None, 0);
// Digest position should have non-zero data
let digest1 = &response1[TLS_DIGEST_POS..TLS_DIGEST_POS + TLS_DIGEST_LEN];
@@ -746,4 +854,101 @@ mod tests {
// Should return None (no match) but not panic
assert!(result.is_none());
}
fn build_client_hello_with_exts(exts: Vec<(u16, Vec<u8>)>, host: &str) -> Vec<u8> {
let mut body = Vec::new();
body.extend_from_slice(&TLS_VERSION); // legacy version
body.extend_from_slice(&[0u8; 32]); // random
body.push(0); // session id len
body.extend_from_slice(&2u16.to_be_bytes()); // cipher suites len
body.extend_from_slice(&[0x13, 0x01]); // TLS_AES_128_GCM_SHA256
body.push(1); // compression len
body.push(0); // null compression
// Build SNI extension
let host_bytes = host.as_bytes();
let mut sni_ext = Vec::new();
sni_ext.extend_from_slice(&(host_bytes.len() as u16 + 3).to_be_bytes());
sni_ext.push(0);
sni_ext.extend_from_slice(&(host_bytes.len() as u16).to_be_bytes());
sni_ext.extend_from_slice(host_bytes);
let mut ext_blob = Vec::new();
for (typ, data) in exts {
ext_blob.extend_from_slice(&typ.to_be_bytes());
ext_blob.extend_from_slice(&(data.len() as u16).to_be_bytes());
ext_blob.extend_from_slice(&data);
}
// SNI last
ext_blob.extend_from_slice(&0x0000u16.to_be_bytes());
ext_blob.extend_from_slice(&(sni_ext.len() as u16).to_be_bytes());
ext_blob.extend_from_slice(&sni_ext);
body.extend_from_slice(&(ext_blob.len() as u16).to_be_bytes());
body.extend_from_slice(&ext_blob);
let mut handshake = Vec::new();
handshake.push(0x01); // ClientHello
let len_bytes = (body.len() as u32).to_be_bytes();
handshake.extend_from_slice(&len_bytes[1..4]);
handshake.extend_from_slice(&body);
let mut record = Vec::new();
record.push(TLS_RECORD_HANDSHAKE);
record.extend_from_slice(&[0x03, 0x01]);
record.extend_from_slice(&(handshake.len() as u16).to_be_bytes());
record.extend_from_slice(&handshake);
record
}
#[test]
fn test_extract_sni_with_grease_extension() {
// GREASE type 0x0a0a with zero length before SNI
let ch = build_client_hello_with_exts(vec![(0x0a0a, Vec::new())], "example.com");
let sni = extract_sni_from_client_hello(&ch);
assert_eq!(sni.as_deref(), Some("example.com"));
}
#[test]
fn test_extract_sni_tolerates_empty_unknown_extension() {
let ch = build_client_hello_with_exts(vec![(0x1234, Vec::new())], "test.local");
let sni = extract_sni_from_client_hello(&ch);
assert_eq!(sni.as_deref(), Some("test.local"));
}
#[test]
fn test_extract_alpn_single() {
let mut alpn_data = Vec::new();
// list length = 3 (1 length byte + "h2")
alpn_data.extend_from_slice(&3u16.to_be_bytes());
alpn_data.push(2);
alpn_data.extend_from_slice(b"h2");
let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test");
let alpn = extract_alpn_from_client_hello(&ch);
let alpn_str: Vec<String> = alpn
.iter()
.map(|p| std::str::from_utf8(p).unwrap().to_string())
.collect();
assert_eq!(alpn_str, vec!["h2"]);
}
#[test]
fn test_extract_alpn_multiple() {
let mut alpn_data = Vec::new();
// list length = 11 (sum of per-proto lengths including length bytes)
alpn_data.extend_from_slice(&11u16.to_be_bytes());
alpn_data.push(2);
alpn_data.extend_from_slice(b"h2");
alpn_data.push(4);
alpn_data.extend_from_slice(b"spdy");
alpn_data.push(2);
alpn_data.extend_from_slice(b"h3");
let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test");
let alpn = extract_alpn_from_client_hello(&ch);
let alpn_str: Vec<String> = alpn
.iter()
.map(|p| std::str::from_utf8(p).unwrap().to_string())
.collect();
assert_eq!(alpn_str, vec!["h2", "spdy", "h3"]);
}
}

View File

@@ -31,6 +31,7 @@ use crate::stats::{ReplayChecker, Stats};
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
use crate::transport::middle_proxy::MePool;
use crate::transport::{UpstreamManager, configure_client_socket, parse_proxy_protocol};
use crate::transport::socket::normalize_ip;
use crate::tls_front::TlsFrontCache;
use crate::proxy::direct_relay::handle_via_direct;
@@ -50,14 +51,15 @@ pub async fn handle_client_stream<S>(
me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>,
proxy_protocol_enabled: bool,
) -> Result<()>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
stats.increment_connects_all();
let mut real_peer = peer;
let mut real_peer = normalize_ip(peer);
if config.server.proxy_protocol {
if proxy_protocol_enabled {
match parse_proxy_protocol(&mut stream, peer).await {
Ok(info) => {
debug!(
@@ -66,7 +68,7 @@ where
version = info.version,
"PROXY protocol header parsed"
);
real_peer = info.src_addr;
real_peer = normalize_ip(info.src_addr);
}
Err(e) => {
stats.increment_connects_bad();
@@ -228,6 +230,7 @@ pub struct RunningClientHandler {
me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>,
proxy_protocol_enabled: bool,
}
impl ClientHandler {
@@ -243,6 +246,7 @@ impl ClientHandler {
me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>,
proxy_protocol_enabled: bool,
) -> RunningClientHandler {
RunningClientHandler {
stream,
@@ -256,6 +260,7 @@ impl ClientHandler {
me_pool,
tls_cache,
ip_tracker,
proxy_protocol_enabled,
}
}
}
@@ -264,6 +269,7 @@ impl RunningClientHandler {
pub async fn run(mut self) -> Result<()> {
self.stats.increment_connects_all();
self.peer = normalize_ip(self.peer);
let peer = self.peer;
let ip_tracker = self.ip_tracker.clone();
debug!(peer = %peer, "New connection");
@@ -301,7 +307,7 @@ impl RunningClientHandler {
}
async fn do_handshake(mut self) -> Result<HandshakeOutcome> {
if self.config.server.proxy_protocol {
if self.proxy_protocol_enabled {
match parse_proxy_protocol(&mut self.stream, self.peer).await {
Ok(info) => {
debug!(
@@ -310,7 +316,7 @@ impl RunningClientHandler {
version = info.version,
"PROXY protocol header parsed"
);
self.peer = info.src_addr;
self.peer = normalize_ip(info.src_addr);
}
Err(e) => {
self.stats.increment_connects_bad();

View File

@@ -178,8 +178,9 @@ async fn do_tg_handshake_static(
let (read_half, write_half) = stream.into_split();
let max_pending = config.general.crypto_pending_buffer;
Ok((
CryptoReader::new(read_half, tg_decryptor),
CryptoWriter::new(write_half, tg_encryptor),
CryptoWriter::new(write_half, tg_encryptor, max_pending),
))
}

View File

@@ -7,6 +7,7 @@ use tracing::{debug, warn, trace, info};
use zeroize::Zeroize;
use crate::crypto::{sha256, AesCtr, SecureRandom};
use rand::Rng;
use crate::protocol::constants::*;
use crate::protocol::tls;
use crate::stream::{FakeTlsReader, FakeTlsWriter, CryptoReader, CryptoWriter};
@@ -119,6 +120,23 @@ where
None
};
let alpn_list = if config.censorship.alpn_enforce {
tls::extract_alpn_from_client_hello(handshake)
} else {
Vec::new()
};
let selected_alpn = if config.censorship.alpn_enforce {
if alpn_list.iter().any(|p| p == b"h2") {
Some(b"h2".to_vec())
} else if alpn_list.iter().any(|p| p == b"http/1.1") {
Some(b"http/1.1".to_vec())
} else {
None
}
} else {
None
};
let response = if let Some(cached_entry) = cached {
emulator::build_emulated_server_hello(
secret,
@@ -126,6 +144,8 @@ where
&validation.session_id,
&cached_entry,
rng,
selected_alpn.clone(),
config.censorship.tls_new_session_tickets,
)
} else {
tls::build_server_hello(
@@ -134,9 +154,25 @@ where
&validation.session_id,
config.censorship.fake_cert_len,
rng,
selected_alpn.clone(),
config.censorship.tls_new_session_tickets,
)
};
// Optional anti-fingerprint delay before sending ServerHello.
if config.censorship.server_hello_delay_max_ms > 0 {
let min = config.censorship.server_hello_delay_min_ms;
let max = config.censorship.server_hello_delay_max_ms.max(min);
let delay_ms = if max == min {
max
} else {
rand::rng().random_range(min..=max)
};
if delay_ms > 0 {
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
}
}
debug!(peer = %peer, response_len = response.len(), "Sending TLS ServerHello");
if let Err(e) = writer.write_all(&response).await {
@@ -264,9 +300,10 @@ where
"MTProto handshake successful"
);
let max_pending = config.general.crypto_pending_buffer;
return HandshakeResult::Success((
CryptoReader::new(reader, decryptor),
CryptoWriter::new(writer, encryptor),
CryptoWriter::new(writer, encryptor, max_pending),
success,
));
}

View File

@@ -1,7 +1,7 @@
//! Masking - forward unrecognized traffic to mask host
use std::time::Duration;
use std::str;
use std::time::Duration;
use tokio::net::TcpStream;
#[cfg(unix)]
use tokio::net::UnixStream;
@@ -11,9 +11,9 @@ use tracing::debug;
use crate::config::ProxyConfig;
const MASK_TIMEOUT: Duration = Duration::from_secs(5);
/// Maximum duration for the entire masking relay.
/// Limits resource consumption from slow-loris attacks and port scanners.
const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60);
/// Maximum duration for the entire masking relay.
/// Limits resource consumption from slow-loris attacks and port scanners.
const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60);
const MASK_BUFFER_SIZE: usize = 8192;
/// Detect client type based on initial data
@@ -78,7 +78,9 @@ where
match connect_result {
Ok(Ok(stream)) => {
let (mask_read, mask_write) = stream.into_split();
relay_to_mask(reader, writer, mask_read, mask_write, initial_data).await;
if timeout(MASK_RELAY_TIMEOUT, relay_to_mask(reader, writer, mask_read, mask_write, initial_data)).await.is_err() {
debug!("Mask relay timed out (unix socket)");
}
}
Ok(Err(e)) => {
debug!(error = %e, "Failed to connect to mask unix socket");
@@ -110,7 +112,9 @@ where
match connect_result {
Ok(Ok(stream)) => {
let (mask_read, mask_write) = stream.into_split();
relay_to_mask(reader, writer, mask_read, mask_write, initial_data).await;
if timeout(MASK_RELAY_TIMEOUT, relay_to_mask(reader, writer, mask_read, mask_write, initial_data)).await.is_err() {
debug!("Mask relay timed out");
}
}
Ok(Err(e)) => {
debug!(error = %e, "Failed to connect to mask host");

View File

@@ -2,12 +2,13 @@ use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tracing::{debug, info, trace};
use tokio::sync::oneshot;
use tracing::{debug, info, trace, warn};
use crate::config::ProxyConfig;
use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result};
use crate::protocol::constants::*;
use crate::protocol::constants::{*, secure_padding_len};
use crate::proxy::handshake::HandshakeSuccess;
use crate::stats::Stats;
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
@@ -15,11 +16,11 @@ use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
pub(crate) async fn handle_via_middle_proxy<R, W>(
mut crypto_reader: CryptoReader<R>,
mut crypto_writer: CryptoWriter<W>,
crypto_writer: CryptoWriter<W>,
success: HandshakeSuccess,
me_pool: Arc<MePool>,
stats: Arc<Stats>,
_config: Arc<ProxyConfig>,
config: Arc<ProxyConfig>,
_buffer_pool: Arc<BufferPool>,
local_addr: SocketAddr,
rng: Arc<SecureRandom>,
@@ -41,7 +42,7 @@ where
"Routing via Middle-End"
);
let (conn_id, mut me_rx) = me_pool.registry().register().await;
let (conn_id, me_rx) = me_pool.registry().register().await;
stats.increment_user_connects(&user);
stats.increment_user_curr_connects(&user);
@@ -56,59 +57,104 @@ where
let translated_local_addr = me_pool.translate_our_addr(local_addr);
let result: Result<()> = loop {
tokio::select! {
client_frame = read_client_payload(&mut crypto_reader, proto_tag) => {
match client_frame {
Ok(Some((payload, quickack))) => {
trace!(conn_id, bytes = payload.len(), "C->ME frame");
stats.add_user_octets_from(&user, payload.len() as u64);
let mut flags = proto_flags;
if quickack {
flags |= RPC_FLAG_QUICKACK;
let frame_limit = config.general.max_client_frame;
let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
let mut me_rx_task = me_rx;
let stats_clone = stats.clone();
let rng_clone = rng.clone();
let user_clone = user.clone();
let me_writer = tokio::spawn(async move {
let mut writer = crypto_writer;
loop {
tokio::select! {
msg = me_rx_task.recv() => {
match msg {
Some(MeResponse::Data { flags, data }) => {
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
write_client_payload(&mut writer, proto_tag, flags, &data, rng_clone.as_ref()).await?;
}
if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) {
flags |= RPC_FLAG_NOT_ENCRYPTED;
Some(MeResponse::Ack(confirm)) => {
trace!(conn_id, confirm, "ME->C quickack");
write_client_ack(&mut writer, proto_tag, confirm).await?;
}
Some(MeResponse::Close) => {
debug!(conn_id, "ME sent close");
return Ok(());
}
None => {
debug!(conn_id, "ME channel closed");
return Err(ProxyError::Proxy("ME connection lost".into()));
}
me_pool.send_proxy_req(
conn_id,
success.dc_idx,
peer,
translated_local_addr,
&payload,
flags,
).await?;
}
Ok(None) => {
debug!(conn_id, "Client EOF");
let _ = me_pool.send_close(conn_id).await;
break Ok(());
}
Err(e) => break Err(e),
}
}
me_msg = me_rx.recv() => {
match me_msg {
Some(MeResponse::Data { flags, data }) => {
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
stats.add_user_octets_to(&user, data.len() as u64);
write_client_payload(&mut crypto_writer, proto_tag, flags, &data, rng.as_ref()).await?;
}
Some(MeResponse::Ack(confirm)) => {
trace!(conn_id, confirm, "ME->C quickack");
write_client_ack(&mut crypto_writer, proto_tag, confirm).await?;
}
Some(MeResponse::Close) => {
debug!(conn_id, "ME sent close");
break Ok(());
}
None => {
debug!(conn_id, "ME channel closed");
break Err(ProxyError::Proxy("ME connection lost".into()));
}
_ = &mut stop_rx => {
debug!(conn_id, "ME writer stop signal");
return Ok(());
}
}
}
});
let mut main_result: Result<()> = Ok(());
let mut client_closed = false;
loop {
match read_client_payload(&mut crypto_reader, proto_tag, frame_limit, &user).await {
Ok(Some((payload, quickack))) => {
trace!(conn_id, bytes = payload.len(), "C->ME frame");
stats.add_user_octets_from(&user, payload.len() as u64);
let mut flags = proto_flags;
if quickack {
flags |= RPC_FLAG_QUICKACK;
}
if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) {
flags |= RPC_FLAG_NOT_ENCRYPTED;
}
if let Err(e) = me_pool.send_proxy_req(
conn_id,
success.dc_idx,
peer,
translated_local_addr,
&payload,
flags,
).await {
main_result = Err(e);
break;
}
}
Ok(None) => {
debug!(conn_id, "Client EOF");
client_closed = true;
let _ = me_pool.send_close(conn_id).await;
break;
}
Err(e) => {
main_result = Err(e);
break;
}
}
}
let _ = stop_tx.send(());
let mut writer_result = me_writer
.await
.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}"))));
// When client closes, but ME channel stopped as unregistered - it isnt error
if client_closed {
if matches!(
writer_result,
Err(ProxyError::Proxy(ref msg)) if msg == "ME connection lost"
) {
writer_result = Ok(());
}
}
let result = match (main_result, writer_result) {
(Ok(()), Ok(())) => Ok(()),
(Err(e), _) => Err(e),
(_, Err(e)) => Err(e),
};
debug!(user = %user, conn_id, "ME relay cleanup");
@@ -120,6 +166,8 @@ where
async fn read_client_payload<R>(
client_reader: &mut CryptoReader<R>,
proto_tag: ProtoTag,
max_frame: usize,
user: &str,
) -> Result<Option<(Vec<u8>, bool)>>
where
R: AsyncRead + Unpin + Send + 'static,
@@ -162,8 +210,15 @@ where
}
};
if len > 16 * 1024 * 1024 {
return Err(ProxyError::Proxy(format!("Frame too large: {len}")));
if len > max_frame {
warn!(
user = %user,
raw_len = len,
raw_len_hex = format_args!("0x{:08x}", len),
proto = ?proto_tag,
"Frame too large — possible crypto desync or TLS record error"
);
return Err(ProxyError::Proxy(format!("Frame too large: {len} (max {max_frame})")));
}
let mut payload = vec![0u8; len];
@@ -237,7 +292,7 @@ where
}
ProtoTag::Intermediate | ProtoTag::Secure => {
let padding_len = if proto_tag == ProtoTag::Secure {
(rng.bytes(1)[0] % 4) as usize
secure_padding_len(data.len(), rng)
} else {
0
};

View File

@@ -34,7 +34,7 @@
//! └────────────────────────────────────────┘
//!
//! Backpressure
//! - pending ciphertext buffer is bounded (MAX_PENDING_WRITE)
//! - pending ciphertext buffer is bounded (configurable per connection)
//! - pending is full and upstream is pending
//! -> poll_write returns Poll::Pending
//! -> do not accept any plaintext
@@ -62,10 +62,9 @@ use super::state::{StreamState, YieldBuffer};
// ============= Constants =============
/// Maximum size for pending ciphertext buffer (bounded backpressure).
/// Reduced to 64KB to prevent bufferbloat on mobile networks.
/// 512KB was causing high latency on 3G/LTE connections.
const MAX_PENDING_WRITE: usize = 64 * 1024;
/// Default size for pending ciphertext buffer (bounded backpressure).
/// Actual limit is supplied at runtime from configuration.
const DEFAULT_MAX_PENDING_WRITE: usize = 64 * 1024;
/// Default read buffer capacity (reader mostly decrypts in-place into caller buffer).
const DEFAULT_READ_CAPACITY: usize = 16 * 1024;
@@ -427,15 +426,22 @@ pub struct CryptoWriter<W> {
encryptor: AesCtr,
state: CryptoWriterState,
scratch: BytesMut,
max_pending_write: usize,
}
impl<W> CryptoWriter<W> {
pub fn new(upstream: W, encryptor: AesCtr) -> Self {
pub fn new(upstream: W, encryptor: AesCtr, max_pending_write: usize) -> Self {
let max_pending = if max_pending_write == 0 {
DEFAULT_MAX_PENDING_WRITE
} else {
max_pending_write
};
Self {
upstream,
encryptor,
state: CryptoWriterState::Idle,
scratch: BytesMut::with_capacity(16 * 1024),
max_pending_write: max_pending.max(4 * 1024),
}
}
@@ -484,10 +490,10 @@ impl<W> CryptoWriter<W> {
}
/// Ensure we are in Flushing state and return mutable pending buffer.
fn ensure_pending<'a>(state: &'a mut CryptoWriterState) -> &'a mut PendingCiphertext {
fn ensure_pending<'a>(state: &'a mut CryptoWriterState, max_pending: usize) -> &'a mut PendingCiphertext {
if matches!(state, CryptoWriterState::Idle) {
*state = CryptoWriterState::Flushing {
pending: PendingCiphertext::new(MAX_PENDING_WRITE),
pending: PendingCiphertext::new(max_pending),
};
}
@@ -498,14 +504,14 @@ impl<W> CryptoWriter<W> {
}
/// Select how many plaintext bytes can be accepted in buffering path
fn select_to_accept_for_buffering(state: &CryptoWriterState, buf_len: usize) -> usize {
fn select_to_accept_for_buffering(state: &CryptoWriterState, buf_len: usize, max_pending: usize) -> usize {
if buf_len == 0 {
return 0;
}
match state {
CryptoWriterState::Flushing { pending } => buf_len.min(pending.remaining_capacity()),
CryptoWriterState::Idle => buf_len.min(MAX_PENDING_WRITE),
CryptoWriterState::Idle => buf_len.min(max_pending),
CryptoWriterState::Poisoned { .. } => 0,
}
}
@@ -603,7 +609,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
Poll::Pending => {
// Upstream blocked. Apply ideal backpressure
let to_accept =
Self::select_to_accept_for_buffering(&this.state, buf.len());
Self::select_to_accept_for_buffering(&this.state, buf.len(), this.max_pending_write);
if to_accept == 0 {
trace!(
@@ -618,7 +624,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
// Disjoint borrows
let encryptor = &mut this.encryptor;
let pending = Self::ensure_pending(&mut this.state);
let pending = Self::ensure_pending(&mut this.state, this.max_pending_write);
if let Err(e) = pending.push_encrypted(encryptor, plaintext) {
if e.kind() == ErrorKind::WouldBlock {
@@ -635,7 +641,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
// 2) Fast path: pending empty -> write-through
debug_assert!(matches!(this.state, CryptoWriterState::Idle));
let to_accept = buf.len().min(MAX_PENDING_WRITE);
let to_accept = buf.len().min(this.max_pending_write);
let plaintext = &buf[..to_accept];
Self::encrypt_into_scratch(&mut this.encryptor, &mut this.scratch, plaintext);
@@ -645,7 +651,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
// Upstream blocked: buffer FULL ciphertext for accepted bytes.
let ciphertext = std::mem::take(&mut this.scratch);
let pending = Self::ensure_pending(&mut this.state);
let pending = Self::ensure_pending(&mut this.state, this.max_pending_write);
pending.replace_with(ciphertext);
Poll::Ready(Ok(to_accept))
@@ -672,7 +678,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
let remainder = this.scratch.split_off(n);
this.scratch.clear();
let pending = Self::ensure_pending(&mut this.state);
let pending = Self::ensure_pending(&mut this.state, this.max_pending_write);
pending.replace_with(remainder);
Poll::Ready(Ok(to_accept))
@@ -767,4 +773,4 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for PassthroughStream<S> {
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}
}
}

View File

@@ -267,8 +267,8 @@ impl<W: AsyncWrite + Unpin> SecureIntermediateFrameWriter<W> {
return Ok(());
}
// Add random padding (0-3 bytes)
let padding_len = self.rng.range(4);
// Add padding so total length is never divisible by 4 (MTProto Secure)
let padding_len = secure_padding_len(data.len(), &self.rng);
let padding = self.rng.bytes(padding_len);
let total_len = data.len() + padding_len;
@@ -585,4 +585,4 @@ mod tests {
let (received, _) = reader.read_frame().await.unwrap();
assert_eq!(&received[..], &data[..]);
}
}
}

View File

@@ -40,4 +40,4 @@ pub use frame_stream::{
SecureIntermediateFrameReader, SecureIntermediateFrameWriter,
MtprotoFrameReader, MtprotoFrameWriter,
FrameReaderKind, FrameWriterKind,
};
};

View File

@@ -25,7 +25,8 @@
//! - However, the on-the-wire record length can exceed 16384 because TLS 1.3
//! uses AEAD and can include tag/overhead/padding.
//! - Telegram FakeTLS clients (notably iOS) may send Application Data records
//! with length up to 16384 + 24 bytes. We accept that as MAX_TLS_CHUNK_SIZE.
//! with length up to 16384 + 256 bytes (RFC 8446 §5.2). We accept that as
//! MAX_TLS_CHUNK_SIZE.
//!
//! If you reject those (e.g. validate length <= 16384), you will see errors like:
//! "TLS record too large: 16408 bytes"
@@ -52,9 +53,8 @@ use super::state::{StreamState, HeaderBuffer, YieldBuffer, WriteBuffer};
const TLS_HEADER_SIZE: usize = 5;
/// Maximum TLS fragment size we emit for Application Data.
/// Real TLS 1.3 ciphertexts often add ~16-24 bytes AEAD overhead, so to mimic
/// on-the-wire record sizes we allow up to 16384 + 24 bytes of plaintext.
const MAX_TLS_PAYLOAD: usize = 16384 + 24;
/// Real TLS 1.3 allows up to 16384 + 256 bytes of ciphertext (incl. tag).
const MAX_TLS_PAYLOAD: usize = 16384 + 256;
/// Maximum pending write buffer for one record remainder.
/// Note: we never queue unlimited amount of data here; state holds at most one record.
@@ -91,7 +91,7 @@ impl TlsRecordHeader {
/// - We accept TLS 1.0 header version for ClientHello-like records (0x03 0x01),
/// and TLS 1.2/1.3 style version bytes for the rest (we use TLS_VERSION = 0x03 0x03).
/// - For Application Data, Telegram FakeTLS may send payload length up to
/// MAX_TLS_CHUNK_SIZE (16384 + 24).
/// MAX_TLS_CHUNK_SIZE (16384 + 256).
/// - For other record types we keep stricter bounds to avoid memory abuse.
fn validate(&self) -> Result<()> {
// Version: accept TLS 1.0 header (ClientHello quirk) and TLS_VERSION (0x0303).
@@ -105,7 +105,7 @@ impl TlsRecordHeader {
let len = self.length as usize;
// Length checks depend on record type.
// Telegram FakeTLS: ApplicationData length may be 16384 + 24.
// Telegram FakeTLS: ApplicationData length may be 16384 + 256.
match self.record_type {
TLS_RECORD_APPLICATION => {
if len > MAX_TLS_CHUNK_SIZE {
@@ -755,9 +755,6 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for FakeTlsWriter<W> {
payload_size: chunk_size,
};
// Wake to retry flushing soon.
cx.waker().wake_by_ref();
Poll::Ready(Ok(chunk_size))
}
}

View File

@@ -59,6 +59,65 @@ impl TlsFrontCache {
guard.insert(domain.to_string(), Arc::new(data));
}
pub async fn load_from_disk(&self) {
let path = self.disk_path.clone();
if tokio::fs::create_dir_all(&path).await.is_err() {
return;
}
let mut loaded = 0usize;
if let Ok(mut dir) = tokio::fs::read_dir(&path).await {
while let Ok(Some(entry)) = dir.next_entry().await {
if let Ok(name) = entry.file_name().into_string() {
if !name.ends_with(".json") {
continue;
}
if let Ok(data) = tokio::fs::read(entry.path()).await {
if let Ok(mut cached) = serde_json::from_slice::<CachedTlsData>(&data) {
if cached.domain.is_empty()
|| cached.domain.len() > 255
|| !cached.domain.chars().all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-')
{
warn!(file = %name, "Skipping TLS cache entry with invalid domain");
continue;
}
// fetched_at is skipped during deserialization; approximate with file mtime if available.
if let Ok(meta) = entry.metadata().await {
if let Ok(modified) = meta.modified() {
cached.fetched_at = modified;
}
}
// Drop entries older than 72h
if let Ok(age) = cached.fetched_at.elapsed() {
if age > Duration::from_secs(72 * 3600) {
warn!(domain = %cached.domain, "Skipping stale TLS cache entry (>72h)");
continue;
}
}
let domain = cached.domain.clone();
self.set(&domain, cached).await;
loaded += 1;
}
}
}
}
}
if loaded > 0 {
info!(count = loaded, "Loaded TLS cache entries from disk");
}
}
async fn persist(&self, domain: &str, data: &CachedTlsData) {
if tokio::fs::create_dir_all(&self.disk_path).await.is_err() {
return;
}
let fname = format!("{}.json", domain.replace(['/', '\\'], "_"));
let path = self.disk_path.join(fname);
if let Ok(json) = serde_json::to_vec_pretty(data) {
// best-effort write
let _ = tokio::fs::write(path, json).await;
}
}
/// Spawn background updater that periodically refreshes cached domains using provided fetcher.
pub fn spawn_updater<F>(
self: Arc<Self>,
@@ -82,14 +141,15 @@ impl TlsFrontCache {
pub async fn update_from_fetch(&self, domain: &str, fetched: TlsFetchResult) {
let data = CachedTlsData {
server_hello_template: fetched.server_hello_parsed,
cert_info: None,
cert_info: fetched.cert_info,
app_data_records_sizes: fetched.app_data_records_sizes.clone(),
total_app_data_len: fetched.total_app_data_len,
fetched_at: SystemTime::now(),
domain: domain.to_string(),
};
self.set(domain, data).await;
self.set(domain, data.clone()).await;
self.persist(domain, &data).await;
debug!(domain = %domain, len = fetched.total_app_data_len, "TLS cache updated");
}

View File

@@ -5,6 +5,28 @@ use crate::protocol::constants::{
use crate::protocol::tls::{TLS_DIGEST_LEN, TLS_DIGEST_POS, gen_fake_x25519_key};
use crate::tls_front::types::CachedTlsData;
const MIN_APP_DATA: usize = 64;
const MAX_APP_DATA: usize = 16640; // RFC 8446 §5.2 allows up to 2^14 + 256
fn jitter_and_clamp_sizes(sizes: &[usize], rng: &SecureRandom) -> Vec<usize> {
sizes
.iter()
.map(|&size| {
let base = size.max(MIN_APP_DATA).min(MAX_APP_DATA);
let jitter_range = ((base as f64) * 0.03).round() as i64;
if jitter_range == 0 {
return base;
}
let mut rand_bytes = [0u8; 2];
rand_bytes.copy_from_slice(&rng.bytes(2));
let span = 2 * jitter_range + 1;
let delta = (u16::from_le_bytes(rand_bytes) as i64 % span) - jitter_range;
let adjusted = (base as i64 + delta).clamp(MIN_APP_DATA as i64, MAX_APP_DATA as i64);
adjusted as usize
})
.collect()
}
/// Build a ServerHello + CCS + ApplicationData sequence using cached TLS metadata.
pub fn build_emulated_server_hello(
secret: &[u8],
@@ -12,6 +34,8 @@ pub fn build_emulated_server_hello(
session_id: &[u8],
cached: &CachedTlsData,
rng: &SecureRandom,
alpn: Option<Vec<u8>>,
new_session_tickets: u8,
) -> Vec<u8> {
// --- ServerHello ---
let mut extensions = Vec::new();
@@ -26,6 +50,15 @@ pub fn build_emulated_server_hello(
extensions.extend_from_slice(&0x002bu16.to_be_bytes());
extensions.extend_from_slice(&(2u16).to_be_bytes());
extensions.extend_from_slice(&0x0304u16.to_be_bytes());
if let Some(alpn_proto) = &alpn {
extensions.extend_from_slice(&0x0010u16.to_be_bytes());
let list_len: u16 = 1 + alpn_proto.len() as u16;
let ext_len: u16 = 2 + list_len;
extensions.extend_from_slice(&ext_len.to_be_bytes());
extensions.extend_from_slice(&list_len.to_be_bytes());
extensions.push(alpn_proto.len() as u8);
extensions.extend_from_slice(alpn_proto);
}
let extensions_len = extensions.len() as u16;
@@ -76,6 +109,7 @@ pub fn build_emulated_server_hello(
if sizes.is_empty() {
sizes.push(cached.total_app_data_len.max(1024));
}
let sizes = jitter_and_clamp_sizes(&sizes, rng);
let mut app_data = Vec::new();
for size in sizes {
@@ -83,15 +117,37 @@ pub fn build_emulated_server_hello(
rec.push(TLS_RECORD_APPLICATION);
rec.extend_from_slice(&TLS_VERSION);
rec.extend_from_slice(&(size as u16).to_be_bytes());
rec.extend_from_slice(&rng.bytes(size));
if size > 17 {
let body_len = size - 17;
rec.extend_from_slice(&rng.bytes(body_len));
rec.push(0x16); // inner content type marker (handshake)
rec.extend_from_slice(&rng.bytes(16)); // AEAD-like tag
} else {
rec.extend_from_slice(&rng.bytes(size));
}
app_data.extend_from_slice(&rec);
}
// --- Combine ---
let mut response = Vec::with_capacity(server_hello.len() + change_cipher_spec.len() + app_data.len());
// Optional NewSessionTicket mimic records (opaque ApplicationData for fingerprint).
let mut tickets = Vec::new();
if new_session_tickets > 0 {
for _ in 0..new_session_tickets {
let ticket_len: usize = rng.range(48) + 48;
let mut rec = Vec::with_capacity(5 + ticket_len);
rec.push(TLS_RECORD_APPLICATION);
rec.extend_from_slice(&TLS_VERSION);
rec.extend_from_slice(&(ticket_len as u16).to_be_bytes());
rec.extend_from_slice(&rng.bytes(ticket_len));
tickets.extend_from_slice(&rec);
}
}
let mut response = Vec::with_capacity(server_hello.len() + change_cipher_spec.len() + app_data.len() + tickets.len());
response.extend_from_slice(&server_hello);
response.extend_from_slice(&change_cipher_spec);
response.extend_from_slice(&app_data);
response.extend_from_slice(&tickets);
// --- HMAC ---
let mut hmac_input = Vec::with_capacity(TLS_DIGEST_LEN + response.len());

View File

@@ -14,9 +14,12 @@ use rustls::client::ClientConfig;
use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
use rustls::{DigitallySignedStruct, Error as RustlsError};
use x509_parser::prelude::FromDer;
use x509_parser::certificate::X509Certificate;
use crate::crypto::SecureRandom;
use crate::protocol::constants::{TLS_RECORD_APPLICATION, TLS_RECORD_HANDSHAKE, TLS_VERSION};
use crate::tls_front::types::{ParsedServerHello, TlsExtension, TlsFetchResult};
use crate::protocol::constants::{TLS_RECORD_APPLICATION, TLS_RECORD_HANDSHAKE};
use crate::tls_front::types::{ParsedServerHello, TlsExtension, TlsFetchResult, ParsedCertificateInfo};
/// No-op verifier: accept any certificate (we only need lengths and metadata).
#[derive(Debug)]
@@ -163,12 +166,15 @@ fn build_client_hello(sni: &str, rng: &SecureRandom) -> Vec<u8> {
exts.extend_from_slice(alpn_proto);
// padding to reduce recognizability and keep length ~500 bytes
if exts.len() < 180 {
let pad_len = 180 - exts.len();
exts.extend_from_slice(&0x0015u16.to_be_bytes()); // padding extension
exts.extend_from_slice(&(pad_len as u16 + 2).to_be_bytes());
exts.extend_from_slice(&(pad_len as u16).to_be_bytes());
exts.resize(exts.len() + pad_len, 0);
const TARGET_EXT_LEN: usize = 180;
if exts.len() < TARGET_EXT_LEN {
let remaining = TARGET_EXT_LEN - exts.len();
if remaining > 4 {
let pad_len = remaining - 4; // minus type+len
exts.extend_from_slice(&0x0015u16.to_be_bytes()); // padding extension
exts.extend_from_slice(&(pad_len as u16).to_be_bytes());
exts.resize(exts.len() + pad_len, 0);
}
}
// Extensions length prefix
@@ -263,6 +269,52 @@ fn parse_server_hello(body: &[u8]) -> Option<ParsedServerHello> {
})
}
fn parse_cert_info(certs: &[CertificateDer<'static>]) -> Option<ParsedCertificateInfo> {
let first = certs.first()?;
let (_rem, cert) = X509Certificate::from_der(first.as_ref()).ok()?;
let not_before = Some(cert.validity().not_before.to_datetime().unix_timestamp());
let not_after = Some(cert.validity().not_after.to_datetime().unix_timestamp());
let issuer_cn = cert
.issuer()
.iter_common_name()
.next()
.and_then(|cn| cn.as_str().ok())
.map(|s| s.to_string());
let subject_cn = cert
.subject()
.iter_common_name()
.next()
.and_then(|cn| cn.as_str().ok())
.map(|s| s.to_string());
let san_names = cert
.subject_alternative_name()
.ok()
.flatten()
.map(|san| {
san.value
.general_names
.iter()
.filter_map(|gn| match gn {
x509_parser::extensions::GeneralName::DNSName(n) => Some(n.to_string()),
_ => None,
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
Some(ParsedCertificateInfo {
not_after_unix: not_after,
not_before_unix: not_before,
issuer_cn,
subject_cn,
san_names,
})
}
async fn fetch_via_raw_tls(
host: &str,
port: u16,
@@ -315,6 +367,7 @@ async fn fetch_via_raw_tls(
app_sizes
},
total_app_data_len,
cert_info: None,
})
}
@@ -324,6 +377,7 @@ pub async fn fetch_real_tls(
port: u16,
sni: &str,
connect_timeout: Duration,
upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>,
) -> Result<TlsFetchResult> {
// Preferred path: raw TLS probe for accurate record sizing
match fetch_via_raw_tls(host, port, sni, connect_timeout).await {
@@ -334,8 +388,26 @@ pub async fn fetch_real_tls(
}
// Fallback: rustls handshake to at least get certificate sizes
let addr = format!("{host}:{port}");
let stream = timeout(connect_timeout, TcpStream::connect(addr)).await??;
let stream = if let Some(manager) = upstream {
// Resolve host to SocketAddr
if let Ok(mut addrs) = tokio::net::lookup_host((host, port)).await {
if let Some(addr) = addrs.find(|a| a.is_ipv4()) {
match manager.connect(addr, None, None).await {
Ok(s) => s,
Err(e) => {
warn!(sni = %sni, error = %e, "Upstream connect failed, using direct connect");
timeout(connect_timeout, TcpStream::connect((host, port))).await??
}
}
} else {
timeout(connect_timeout, TcpStream::connect((host, port))).await??
}
} else {
timeout(connect_timeout, TcpStream::connect((host, port))).await??
}
} else {
timeout(connect_timeout, TcpStream::connect((host, port))).await??
};
let config = build_client_config();
let connector = TlsConnector::from(config);
@@ -359,6 +431,7 @@ pub async fn fetch_real_tls(
.unwrap_or_default();
let total_cert_len: usize = certs.iter().map(|c| c.len()).sum::<usize>().max(1024);
let cert_info = parse_cert_info(&certs);
// Heuristic: split across two records if large to mimic real servers a bit.
let app_data_records_sizes = if total_cert_len > 3000 {
@@ -387,5 +460,6 @@ pub async fn fetch_real_tls(
server_hello_parsed: parsed,
app_data_records_sizes: app_data_records_sizes.clone(),
total_app_data_len: app_data_records_sizes.iter().sum(),
cert_info,
})
}

View File

@@ -1,7 +1,8 @@
use std::time::SystemTime;
use serde::{Serialize, Deserialize};
/// Parsed representation of an unencrypted TLS ServerHello.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParsedServerHello {
pub version: [u8; 2],
pub random: [u8; 32],
@@ -12,14 +13,14 @@ pub struct ParsedServerHello {
}
/// Generic TLS extension container.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TlsExtension {
pub ext_type: u16,
pub data: Vec<u8>,
}
/// Basic certificate metadata (optional, informative).
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParsedCertificateInfo {
pub not_after_unix: Option<i64>,
pub not_before_unix: Option<i64>,
@@ -29,20 +30,26 @@ pub struct ParsedCertificateInfo {
}
/// Cached data per SNI used by the emulator.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CachedTlsData {
pub server_hello_template: ParsedServerHello,
pub cert_info: Option<ParsedCertificateInfo>,
pub app_data_records_sizes: Vec<usize>,
pub total_app_data_len: usize,
#[serde(default = "now_system_time", skip_serializing, skip_deserializing)]
pub fetched_at: SystemTime,
pub domain: String,
}
fn now_system_time() -> SystemTime {
SystemTime::now()
}
/// Result of attempting to fetch real TLS artifacts.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TlsFetchResult {
pub server_hello_parsed: ParsedServerHello,
pub app_data_records_sizes: Vec<usize>,
pub total_app_data_len: usize,
pub cert_info: Option<ParsedCertificateInfo>,
}

View File

@@ -567,12 +567,14 @@ impl MePool {
let cancel_keepalive = cancel_keepalive_token;
tokio::spawn(async move {
// Per-writer jittered start to avoid phase sync.
let initial_jitter_ms = rand::rng().random_range(0..=keepalive_jitter.as_millis().max(1) as u64);
let jitter_cap_ms = keepalive_interval.as_millis() / 2;
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
let initial_jitter_ms = rand::rng().random_range(0..=effective_jitter_ms as u64);
tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await;
loop {
tokio::select! {
_ = cancel_keepalive.cancelled() => break,
_ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=keepalive_jitter.as_millis() as u64))) => {}
_ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))) => {}
}
if tx_keepalive.send(WriterCommand::Keepalive).await.is_err() {
break;

View File

@@ -283,6 +283,58 @@ impl Default for ProxyProtocolV1Builder {
}
}
/// Builder for PROXY protocol v2 header
pub struct ProxyProtocolV2Builder {
src: Option<SocketAddr>,
dst: Option<SocketAddr>,
}
impl ProxyProtocolV2Builder {
pub fn new() -> Self {
Self { src: None, dst: None }
}
pub fn with_addrs(mut self, src: SocketAddr, dst: SocketAddr) -> Self {
self.src = Some(src);
self.dst = Some(dst);
self
}
pub fn build(&self) -> Vec<u8> {
let mut header = Vec::new();
header.extend_from_slice(PROXY_V2_SIGNATURE);
// version 2, PROXY command
header.push(0x21);
match (self.src, self.dst) {
(Some(SocketAddr::V4(src)), Some(SocketAddr::V4(dst))) => {
header.push(0x11); // INET + STREAM
header.extend_from_slice(&(12u16).to_be_bytes());
header.extend_from_slice(&src.ip().octets());
header.extend_from_slice(&dst.ip().octets());
header.extend_from_slice(&src.port().to_be_bytes());
header.extend_from_slice(&dst.port().to_be_bytes());
}
(Some(SocketAddr::V6(src)), Some(SocketAddr::V6(dst))) => {
header.push(0x21); // INET6 + STREAM
header.extend_from_slice(&(36u16).to_be_bytes());
header.extend_from_slice(&src.ip().octets());
header.extend_from_slice(&dst.ip().octets());
header.extend_from_slice(&src.port().to_be_bytes());
header.extend_from_slice(&dst.port().to_be_bytes());
}
_ => {
// LOCAL/UNSPEC: no address information
header[12] = 0x20; // version 2, LOCAL command
header.push(0x00);
header.extend_from_slice(&0u16.to_be_bytes());
}
}
header
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -378,4 +430,4 @@ mod tests {
let header = ProxyProtocolV1Builder::new().build();
assert_eq!(header, b"PROXY UNKNOWN\r\n");
}
}
}

View File

@@ -383,32 +383,43 @@ impl UpstreamManager {
Ok(stream)
},
UpstreamType::Socks4 { address, interface, user_id } => {
let proxy_addr: SocketAddr = address.parse()
.map_err(|_| ProxyError::Config("Invalid SOCKS4 address".to_string()))?;
// Try to parse as SocketAddr first (IP:port), otherwise treat as hostname:port
let mut stream = if let Ok(proxy_addr) = address.parse::<SocketAddr>() {
// IP:port format - use socket with optional interface binding
let bind_ip = Self::resolve_bind_address(
interface,
&None,
proxy_addr,
bind_rr.as_deref(),
);
let bind_ip = Self::resolve_bind_address(
interface,
&None,
proxy_addr,
bind_rr.as_deref(),
);
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
socket.set_nonblocking(true)?;
match socket.connect(&proxy_addr.into()) {
Ok(()) => {},
Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) || err.kind() == std::io::ErrorKind::WouldBlock => {},
Err(err) => return Err(ProxyError::Io(err)),
}
socket.set_nonblocking(true)?;
match socket.connect(&proxy_addr.into()) {
Ok(()) => {},
Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) || err.kind() == std::io::ErrorKind::WouldBlock => {},
Err(err) => return Err(ProxyError::Io(err)),
}
let std_stream: std::net::TcpStream = socket.into();
let stream = TcpStream::from_std(std_stream)?;
let std_stream: std::net::TcpStream = socket.into();
let mut stream = TcpStream::from_std(std_stream)?;
stream.writable().await?;
if let Some(e) = stream.take_error()? {
return Err(ProxyError::Io(e));
}
stream
} else {
// Hostname:port format - use tokio DNS resolution
// Note: interface binding is not supported for hostnames
if interface.is_some() {
warn!("SOCKS4 interface binding is not supported for hostname addresses, ignoring");
}
TcpStream::connect(address).await
.map_err(ProxyError::Io)?
};
stream.writable().await?;
if let Some(e) = stream.take_error()? {
return Err(ProxyError::Io(e));
}
// replace socks user_id with config.selected_scope, if set
let scope: Option<&str> = Some(config.selected_scope.as_str())
.filter(|s| !s.is_empty());
@@ -418,32 +429,42 @@ impl UpstreamManager {
Ok(stream)
},
UpstreamType::Socks5 { address, interface, username, password } => {
let proxy_addr: SocketAddr = address.parse()
.map_err(|_| ProxyError::Config("Invalid SOCKS5 address".to_string()))?;
// Try to parse as SocketAddr first (IP:port), otherwise treat as hostname:port
let mut stream = if let Ok(proxy_addr) = address.parse::<SocketAddr>() {
// IP:port format - use socket with optional interface binding
let bind_ip = Self::resolve_bind_address(
interface,
&None,
proxy_addr,
bind_rr.as_deref(),
);
let bind_ip = Self::resolve_bind_address(
interface,
&None,
proxy_addr,
bind_rr.as_deref(),
);
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
socket.set_nonblocking(true)?;
match socket.connect(&proxy_addr.into()) {
Ok(()) => {},
Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) || err.kind() == std::io::ErrorKind::WouldBlock => {},
Err(err) => return Err(ProxyError::Io(err)),
}
socket.set_nonblocking(true)?;
match socket.connect(&proxy_addr.into()) {
Ok(()) => {},
Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) || err.kind() == std::io::ErrorKind::WouldBlock => {},
Err(err) => return Err(ProxyError::Io(err)),
}
let std_stream: std::net::TcpStream = socket.into();
let stream = TcpStream::from_std(std_stream)?;
let std_stream: std::net::TcpStream = socket.into();
let mut stream = TcpStream::from_std(std_stream)?;
stream.writable().await?;
if let Some(e) = stream.take_error()? {
return Err(ProxyError::Io(e));
}
stream.writable().await?;
if let Some(e) = stream.take_error()? {
return Err(ProxyError::Io(e));
}
stream
} else {
// Hostname:port format - use tokio DNS resolution
// Note: interface binding is not supported for hostnames
if interface.is_some() {
warn!("SOCKS5 interface binding is not supported for hostname addresses, ignoring");
}
TcpStream::connect(address).await
.map_err(ProxyError::Io)?
};
debug!(config = ?config, "Socks5 connection");
// replace socks user:pass with config.selected_scope, if set