Merge pull request #188 from telemt/main-stage

From staging #185 + #186 -> main
This commit is contained in:
Alexey 2026-02-20 18:04:58 +03:00 committed by GitHub
commit 7da062e448
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 522 additions and 91 deletions

View File

@ -30,6 +30,7 @@ nix = { version = "0.28", default-features = false, features = ["net"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
toml = "0.8" toml = "0.8"
x509-parser = "0.15"
# Utils # Utils
bytes = "1.9" bytes = "1.9"

View File

@ -194,6 +194,10 @@ impl ProxyConfig {
validate_network_cfg(&mut config.network)?; validate_network_cfg(&mut config.network)?;
if config.general.use_middle_proxy && config.network.ipv6 == Some(true) {
warn!("IPv6 with Middle Proxy is experimental and may cause KDF address mismatch; consider disabling IPv6 or ME");
}
// Random fake_cert_len only when default is in use. // Random fake_cert_len only when default is in use.
if !config.censorship.tls_emulation && config.censorship.fake_cert_len == default_fake_cert_len() { if !config.censorship.tls_emulation && config.censorship.fake_cert_len == default_fake_cert_len() {
config.censorship.fake_cert_len = rand::rng().gen_range(1024..4096); config.censorship.fake_cert_len = rand::rng().gen_range(1024..4096);
@ -222,6 +226,7 @@ impl ProxyConfig {
ip: ipv4, ip: ipv4,
announce: None, announce: None,
announce_ip: None, announce_ip: None,
proxy_protocol: None,
}); });
} }
if let Some(ipv6_str) = &config.server.listen_addr_ipv6 { if let Some(ipv6_str) = &config.server.listen_addr_ipv6 {
@ -230,6 +235,7 @@ impl ProxyConfig {
ip: ipv6, ip: ipv6,
announce: None, announce: None,
announce_ip: None, announce_ip: None,
proxy_protocol: None,
}); });
} }
} }

View File

@ -514,6 +514,9 @@ pub struct ListenerConfig {
/// Migrated to `announce` automatically if `announce` is not set. /// Migrated to `announce` automatically if `announce` is not set.
#[serde(default)] #[serde(default)]
pub announce_ip: Option<IpAddr>, pub announce_ip: Option<IpAddr>,
/// Per-listener PROXY protocol override. When set, overrides global server.proxy_protocol.
#[serde(default)]
pub proxy_protocol: Option<bool>,
} }
// ============= ShowLink ============= // ============= ShowLink =============

View File

@ -3,6 +3,7 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use rand::Rng;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::signal; use tokio::signal;
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
@ -260,46 +261,6 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
info!("IP limits configured for {} users", config.access.user_max_unique_ips.len()); info!("IP limits configured for {} users", config.access.user_max_unique_ips.len());
} }
// TLS front cache (optional emulation)
let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len());
tls_domains.push(config.censorship.tls_domain.clone());
for d in &config.censorship.tls_domains {
if !tls_domains.contains(d) {
tls_domains.push(d.clone());
}
}
let tls_cache: Option<Arc<TlsFrontCache>> = if config.censorship.tls_emulation {
let cache = Arc::new(TlsFrontCache::new(
&tls_domains,
config.censorship.fake_cert_len,
&config.censorship.tls_front_dir,
));
let cache_clone = cache.clone();
let domains = tls_domains.clone();
let port = config.censorship.mask_port;
tokio::spawn(async move {
for domain in domains {
match crate::tls_front::fetcher::fetch_real_tls(
&domain,
port,
&domain,
Duration::from_secs(5),
)
.await
{
Ok(res) => cache_clone.update_from_fetch(&domain, res).await,
Err(e) => warn!(domain = %domain, error = %e, "TLS emulation fetch failed"),
}
}
});
Some(cache)
} else {
None
};
// Connection concurrency limit // Connection concurrency limit
let _max_connections = Arc::new(Semaphore::new(10_000)); let _max_connections = Arc::new(Semaphore::new(10_000));
@ -478,6 +439,72 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone())); let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone()));
let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096)); let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096));
// TLS front cache (optional emulation)
let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len());
tls_domains.push(config.censorship.tls_domain.clone());
for d in &config.censorship.tls_domains {
if !tls_domains.contains(d) {
tls_domains.push(d.clone());
}
}
let tls_cache: Option<Arc<TlsFrontCache>> = if config.censorship.tls_emulation {
let cache = Arc::new(TlsFrontCache::new(
&tls_domains,
config.censorship.fake_cert_len,
&config.censorship.tls_front_dir,
));
cache.load_from_disk().await;
let port = config.censorship.mask_port;
// Initial synchronous fetch to warm cache before serving clients.
for domain in tls_domains.clone() {
match crate::tls_front::fetcher::fetch_real_tls(
&domain,
port,
&domain,
Duration::from_secs(5),
Some(upstream_manager.clone()),
)
.await
{
Ok(res) => cache.update_from_fetch(&domain, res).await,
Err(e) => warn!(domain = %domain, error = %e, "TLS emulation fetch failed"),
}
}
// Periodic refresh with jitter.
let cache_clone = cache.clone();
let domains = tls_domains.clone();
let upstream_for_task = upstream_manager.clone();
tokio::spawn(async move {
loop {
let base_secs = rand::rng().random_range(4 * 3600..=6 * 3600);
let jitter_secs = rand::rng().random_range(0..=7200);
tokio::time::sleep(Duration::from_secs(base_secs + jitter_secs)).await;
for domain in &domains {
match crate::tls_front::fetcher::fetch_real_tls(
domain,
port,
domain,
Duration::from_secs(5),
Some(upstream_for_task.clone()),
)
.await
{
Ok(res) => cache_clone.update_from_fetch(domain, res).await,
Err(e) => warn!(domain = %domain, error = %e, "TLS emulation refresh failed"),
}
}
}
});
Some(cache)
} else {
None
};
// Middle-End ping before DC connectivity // Middle-End ping before DC connectivity
if let Some(ref pool) = me_pool { if let Some(ref pool) = me_pool {
let me_results = run_me_ping(pool, &rng).await; let me_results = run_me_ping(pool, &rng).await;
@ -691,6 +718,8 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
Ok(socket) => { Ok(socket) => {
let listener = TcpListener::from_std(socket.into())?; let listener = TcpListener::from_std(socket.into())?;
info!("Listening on {}", addr); info!("Listening on {}", addr);
let listener_proxy_protocol =
listener_conf.proxy_protocol.unwrap_or(config.server.proxy_protocol);
// Resolve the public host for link generation // Resolve the public host for link generation
let public_host = if let Some(ref announce) = listener_conf.announce { let public_host = if let Some(ref announce) = listener_conf.announce {
@ -716,7 +745,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
print_proxy_links(&public_host, link_port, &config); print_proxy_links(&public_host, link_port, &config);
} }
listeners.push(listener); listeners.push((listener, listener_proxy_protocol));
} }
Err(e) => { Err(e) => {
error!("Failed to bind to {}: {}", addr, e); error!("Failed to bind to {}: {}", addr, e);
@ -802,12 +831,13 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let me_pool = me_pool.clone(); let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone(); let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone(); let ip_tracker = ip_tracker.clone();
let proxy_protocol_enabled = config.server.proxy_protocol;
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = crate::proxy::client::handle_client_stream( if let Err(e) = crate::proxy::client::handle_client_stream(
stream, fake_peer, config, stats, stream, fake_peer, config, stats,
upstream_manager, replay_checker, buffer_pool, rng, upstream_manager, replay_checker, buffer_pool, rng,
me_pool, tls_cache, ip_tracker, me_pool, tls_cache, ip_tracker, proxy_protocol_enabled,
).await { ).await {
debug!(error = %e, "Unix socket connection error"); debug!(error = %e, "Unix socket connection error");
} }
@ -861,7 +891,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
}); });
} }
for listener in listeners { for (listener, listener_proxy_protocol) in listeners {
let mut config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>> = config_rx.clone(); let mut config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
let stats = stats.clone(); let stats = stats.clone();
let upstream_manager = upstream_manager.clone(); let upstream_manager = upstream_manager.clone();
@ -885,6 +915,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let me_pool = me_pool.clone(); let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone(); let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone(); let ip_tracker = ip_tracker.clone();
let proxy_protocol_enabled = listener_proxy_protocol;
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = ClientHandler::new( if let Err(e) = ClientHandler::new(
@ -899,6 +930,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
me_pool, me_pool,
tls_cache, tls_cache,
ip_tracker, ip_tracker,
proxy_protocol_enabled,
) )
.run() .run()
.await .await

View File

@ -2,7 +2,7 @@ use std::convert::Infallible;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use http_body_util::Full; use http_body_util::{Full, BodyExt};
use hyper::body::Bytes; use hyper::body::Bytes;
use hyper::server::conn::http1; use hyper::server::conn::http1;
use hyper::service::service_fn; use hyper::service::service_fn;
@ -54,7 +54,7 @@ pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) {
} }
} }
fn handle(req: Request<hyper::body::Incoming>, stats: &Stats) -> Result<Response<Full<Bytes>>, Infallible> { fn handle<B>(req: Request<B>, stats: &Stats) -> Result<Response<Full<Bytes>>, Infallible> {
if req.uri().path() != "/metrics" { if req.uri().path() != "/metrics" {
let resp = Response::builder() let resp = Response::builder()
.status(StatusCode::NOT_FOUND) .status(StatusCode::NOT_FOUND)
@ -194,21 +194,20 @@ mod tests {
stats.increment_connects_all(); stats.increment_connects_all();
stats.increment_connects_all(); stats.increment_connects_all();
let port = 19091u16; let req = Request::builder()
let s = stats.clone(); .uri("/metrics")
tokio::spawn(async move { .body(())
serve(port, s, vec![]).await; .unwrap();
}); let resp = handle(req, &stats).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await; assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes();
assert!(std::str::from_utf8(body.as_ref()).unwrap().contains("telemt_connections_total 3"));
let resp = reqwest::get(format!("http://127.0.0.1:{}/metrics", port)) let req404 = Request::builder()
.await.unwrap(); .uri("/other")
assert_eq!(resp.status(), 200); .body(())
let body = resp.text().await.unwrap(); .unwrap();
assert!(body.contains("telemt_connections_total 3")); let resp404 = handle(req404, &stats).unwrap();
assert_eq!(resp404.status(), StatusCode::NOT_FOUND);
let resp404 = reqwest::get(format!("http://127.0.0.1:{}/other", port))
.await.unwrap();
assert_eq!(resp404.status(), 404);
} }
} }

View File

@ -351,6 +351,9 @@ pub fn build_server_hello(
fake_cert_len: usize, fake_cert_len: usize,
rng: &SecureRandom, rng: &SecureRandom,
) -> Vec<u8> { ) -> Vec<u8> {
const MIN_APP_DATA: usize = 64;
const MAX_APP_DATA: usize = 16640; // RFC 8446 §5.2 upper bound
let fake_cert_len = fake_cert_len.max(MIN_APP_DATA).min(MAX_APP_DATA);
let x25519_key = gen_fake_x25519_key(rng); let x25519_key = gen_fake_x25519_key(rng);
// Build ServerHello // Build ServerHello
@ -373,7 +376,13 @@ pub fn build_server_hello(
app_data_record.push(TLS_RECORD_APPLICATION); app_data_record.push(TLS_RECORD_APPLICATION);
app_data_record.extend_from_slice(&TLS_VERSION); app_data_record.extend_from_slice(&TLS_VERSION);
app_data_record.extend_from_slice(&(fake_cert_len as u16).to_be_bytes()); app_data_record.extend_from_slice(&(fake_cert_len as u16).to_be_bytes());
app_data_record.extend_from_slice(&fake_cert); if fake_cert_len > 17 {
app_data_record.extend_from_slice(&fake_cert[..fake_cert_len - 17]);
app_data_record.push(0x16); // inner content type marker
app_data_record.extend_from_slice(&rng.bytes(16)); // AEAD-like tag mimic
} else {
app_data_record.extend_from_slice(&fake_cert);
}
// Combine all records // Combine all records
let mut response = Vec::with_capacity( let mut response = Vec::with_capacity(
@ -475,6 +484,85 @@ pub fn extract_sni_from_client_hello(handshake: &[u8]) -> Option<String> {
None None
} }
/// Extract ALPN protocol list from TLS ClientHello.
pub fn extract_alpn_from_client_hello(handshake: &[u8]) -> Option<Vec<String>> {
if handshake.len() < 43 || handshake[0] != TLS_RECORD_HANDSHAKE {
return None;
}
let mut pos = 5; // after record header
if handshake.get(pos).copied()? != 0x01 {
return None; // not ClientHello
}
// Handshake length bytes
pos += 4; // type + len (3)
// version (2) + random (32)
pos += 2 + 32;
if pos + 1 > handshake.len() {
return None;
}
let session_id_len = *handshake.get(pos)? as usize;
pos += 1 + session_id_len;
if pos + 2 > handshake.len() {
return None;
}
let cipher_suites_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize;
pos += 2 + cipher_suites_len;
if pos + 1 > handshake.len() {
return None;
}
let comp_len = *handshake.get(pos)? as usize;
pos += 1 + comp_len;
if pos + 2 > handshake.len() {
return None;
}
let ext_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize;
pos += 2;
let ext_end = pos + ext_len;
if ext_end > handshake.len() {
return None;
}
while pos + 4 <= ext_end {
let etype = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]);
let elen = u16::from_be_bytes([handshake[pos + 2], handshake[pos + 3]]) as usize;
pos += 4;
if pos + elen > ext_end {
break;
}
if etype == 0x0010 && elen >= 3 {
// ALPN
let list_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize;
let mut alpn_pos = pos + 2;
let list_end = std::cmp::min(alpn_pos + list_len, pos + elen);
let mut protocols = Vec::new();
while alpn_pos < list_end {
let proto_len = *handshake.get(alpn_pos)? as usize;
alpn_pos += 1;
if alpn_pos + proto_len > list_end {
break;
}
if let Ok(p) = std::str::from_utf8(&handshake[alpn_pos..alpn_pos + proto_len]) {
protocols.push(p.to_string());
}
alpn_pos += proto_len;
}
return Some(protocols);
}
pos += elen;
}
None
}
/// Check if bytes look like a TLS ClientHello /// Check if bytes look like a TLS ClientHello
pub fn is_tls_handshake(first_bytes: &[u8]) -> bool { pub fn is_tls_handshake(first_bytes: &[u8]) -> bool {
if first_bytes.len() < 3 { if first_bytes.len() < 3 {
@ -746,4 +834,93 @@ mod tests {
// Should return None (no match) but not panic // Should return None (no match) but not panic
assert!(result.is_none()); assert!(result.is_none());
} }
fn build_client_hello_with_exts(exts: Vec<(u16, Vec<u8>)>, host: &str) -> Vec<u8> {
let mut body = Vec::new();
body.extend_from_slice(&TLS_VERSION); // legacy version
body.extend_from_slice(&[0u8; 32]); // random
body.push(0); // session id len
body.extend_from_slice(&2u16.to_be_bytes()); // cipher suites len
body.extend_from_slice(&[0x13, 0x01]); // TLS_AES_128_GCM_SHA256
body.push(1); // compression len
body.push(0); // null compression
// Build SNI extension
let host_bytes = host.as_bytes();
let mut sni_ext = Vec::new();
sni_ext.extend_from_slice(&(host_bytes.len() as u16 + 3).to_be_bytes());
sni_ext.push(0);
sni_ext.extend_from_slice(&(host_bytes.len() as u16).to_be_bytes());
sni_ext.extend_from_slice(host_bytes);
let mut ext_blob = Vec::new();
for (typ, data) in exts {
ext_blob.extend_from_slice(&typ.to_be_bytes());
ext_blob.extend_from_slice(&(data.len() as u16).to_be_bytes());
ext_blob.extend_from_slice(&data);
}
// SNI last
ext_blob.extend_from_slice(&0x0000u16.to_be_bytes());
ext_blob.extend_from_slice(&(sni_ext.len() as u16).to_be_bytes());
ext_blob.extend_from_slice(&sni_ext);
body.extend_from_slice(&(ext_blob.len() as u16).to_be_bytes());
body.extend_from_slice(&ext_blob);
let mut handshake = Vec::new();
handshake.push(0x01); // ClientHello
let len_bytes = (body.len() as u32).to_be_bytes();
handshake.extend_from_slice(&len_bytes[1..4]);
handshake.extend_from_slice(&body);
let mut record = Vec::new();
record.push(TLS_RECORD_HANDSHAKE);
record.extend_from_slice(&[0x03, 0x01]);
record.extend_from_slice(&(handshake.len() as u16).to_be_bytes());
record.extend_from_slice(&handshake);
record
}
#[test]
fn test_extract_sni_with_grease_extension() {
// GREASE type 0x0a0a with zero length before SNI
let ch = build_client_hello_with_exts(vec![(0x0a0a, Vec::new())], "example.com");
let sni = extract_sni_from_client_hello(&ch);
assert_eq!(sni.as_deref(), Some("example.com"));
}
#[test]
fn test_extract_sni_tolerates_empty_unknown_extension() {
let ch = build_client_hello_with_exts(vec![(0x1234, Vec::new())], "test.local");
let sni = extract_sni_from_client_hello(&ch);
assert_eq!(sni.as_deref(), Some("test.local"));
}
#[test]
fn test_extract_alpn_single() {
let mut alpn_data = Vec::new();
// list length = 3 (1 length byte + "h2")
alpn_data.extend_from_slice(&3u16.to_be_bytes());
alpn_data.push(2);
alpn_data.extend_from_slice(b"h2");
let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test");
let alpn = extract_alpn_from_client_hello(&ch).unwrap();
assert_eq!(alpn, vec!["h2"]);
}
#[test]
fn test_extract_alpn_multiple() {
let mut alpn_data = Vec::new();
// list length = 11 (sum of per-proto lengths including length bytes)
alpn_data.extend_from_slice(&11u16.to_be_bytes());
alpn_data.push(2);
alpn_data.extend_from_slice(b"h2");
alpn_data.push(4);
alpn_data.extend_from_slice(b"spdy");
alpn_data.push(2);
alpn_data.extend_from_slice(b"h3");
let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test");
let alpn = extract_alpn_from_client_hello(&ch).unwrap();
assert_eq!(alpn, vec!["h2", "spdy", "h3"]);
}
} }

View File

@ -31,6 +31,7 @@ use crate::stats::{ReplayChecker, Stats};
use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
use crate::transport::middle_proxy::MePool; use crate::transport::middle_proxy::MePool;
use crate::transport::{UpstreamManager, configure_client_socket, parse_proxy_protocol}; use crate::transport::{UpstreamManager, configure_client_socket, parse_proxy_protocol};
use crate::transport::socket::normalize_ip;
use crate::tls_front::TlsFrontCache; use crate::tls_front::TlsFrontCache;
use crate::proxy::direct_relay::handle_via_direct; use crate::proxy::direct_relay::handle_via_direct;
@ -50,14 +51,15 @@ pub async fn handle_client_stream<S>(
me_pool: Option<Arc<MePool>>, me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>, tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>, ip_tracker: Arc<UserIpTracker>,
proxy_protocol_enabled: bool,
) -> Result<()> ) -> Result<()>
where where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static, S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{ {
stats.increment_connects_all(); stats.increment_connects_all();
let mut real_peer = peer; let mut real_peer = normalize_ip(peer);
if config.server.proxy_protocol { if proxy_protocol_enabled {
match parse_proxy_protocol(&mut stream, peer).await { match parse_proxy_protocol(&mut stream, peer).await {
Ok(info) => { Ok(info) => {
debug!( debug!(
@ -66,7 +68,7 @@ where
version = info.version, version = info.version,
"PROXY protocol header parsed" "PROXY protocol header parsed"
); );
real_peer = info.src_addr; real_peer = normalize_ip(info.src_addr);
} }
Err(e) => { Err(e) => {
stats.increment_connects_bad(); stats.increment_connects_bad();
@ -228,6 +230,7 @@ pub struct RunningClientHandler {
me_pool: Option<Arc<MePool>>, me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>, tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>, ip_tracker: Arc<UserIpTracker>,
proxy_protocol_enabled: bool,
} }
impl ClientHandler { impl ClientHandler {
@ -243,6 +246,7 @@ impl ClientHandler {
me_pool: Option<Arc<MePool>>, me_pool: Option<Arc<MePool>>,
tls_cache: Option<Arc<TlsFrontCache>>, tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>, ip_tracker: Arc<UserIpTracker>,
proxy_protocol_enabled: bool,
) -> RunningClientHandler { ) -> RunningClientHandler {
RunningClientHandler { RunningClientHandler {
stream, stream,
@ -256,6 +260,7 @@ impl ClientHandler {
me_pool, me_pool,
tls_cache, tls_cache,
ip_tracker, ip_tracker,
proxy_protocol_enabled,
} }
} }
} }
@ -264,6 +269,7 @@ impl RunningClientHandler {
pub async fn run(mut self) -> Result<()> { pub async fn run(mut self) -> Result<()> {
self.stats.increment_connects_all(); self.stats.increment_connects_all();
self.peer = normalize_ip(self.peer);
let peer = self.peer; let peer = self.peer;
let ip_tracker = self.ip_tracker.clone(); let ip_tracker = self.ip_tracker.clone();
debug!(peer = %peer, "New connection"); debug!(peer = %peer, "New connection");
@ -301,7 +307,7 @@ impl RunningClientHandler {
} }
async fn do_handshake(mut self) -> Result<HandshakeOutcome> { async fn do_handshake(mut self) -> Result<HandshakeOutcome> {
if self.config.server.proxy_protocol { if self.proxy_protocol_enabled {
match parse_proxy_protocol(&mut self.stream, self.peer).await { match parse_proxy_protocol(&mut self.stream, self.peer).await {
Ok(info) => { Ok(info) => {
debug!( debug!(
@ -310,7 +316,7 @@ impl RunningClientHandler {
version = info.version, version = info.version,
"PROXY protocol header parsed" "PROXY protocol header parsed"
); );
self.peer = info.src_addr; self.peer = normalize_ip(info.src_addr);
} }
Err(e) => { Err(e) => {
self.stats.increment_connects_bad(); self.stats.increment_connects_bad();

View File

@ -1,7 +1,7 @@
//! Masking - forward unrecognized traffic to mask host //! Masking - forward unrecognized traffic to mask host
use std::time::Duration;
use std::str; use std::str;
use std::time::Duration;
use tokio::net::TcpStream; use tokio::net::TcpStream;
#[cfg(unix)] #[cfg(unix)]
use tokio::net::UnixStream; use tokio::net::UnixStream;
@ -11,9 +11,9 @@ use tracing::debug;
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
const MASK_TIMEOUT: Duration = Duration::from_secs(5); const MASK_TIMEOUT: Duration = Duration::from_secs(5);
/// Maximum duration for the entire masking relay. /// Maximum duration for the entire masking relay.
/// Limits resource consumption from slow-loris attacks and port scanners. /// Limits resource consumption from slow-loris attacks and port scanners.
const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60); const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60);
const MASK_BUFFER_SIZE: usize = 8192; const MASK_BUFFER_SIZE: usize = 8192;
/// Detect client type based on initial data /// Detect client type based on initial data
@ -78,7 +78,9 @@ where
match connect_result { match connect_result {
Ok(Ok(stream)) => { Ok(Ok(stream)) => {
let (mask_read, mask_write) = stream.into_split(); let (mask_read, mask_write) = stream.into_split();
relay_to_mask(reader, writer, mask_read, mask_write, initial_data).await; if timeout(MASK_RELAY_TIMEOUT, relay_to_mask(reader, writer, mask_read, mask_write, initial_data)).await.is_err() {
debug!("Mask relay timed out (unix socket)");
}
} }
Ok(Err(e)) => { Ok(Err(e)) => {
debug!(error = %e, "Failed to connect to mask unix socket"); debug!(error = %e, "Failed to connect to mask unix socket");
@ -110,7 +112,9 @@ where
match connect_result { match connect_result {
Ok(Ok(stream)) => { Ok(Ok(stream)) => {
let (mask_read, mask_write) = stream.into_split(); let (mask_read, mask_write) = stream.into_split();
relay_to_mask(reader, writer, mask_read, mask_write, initial_data).await; if timeout(MASK_RELAY_TIMEOUT, relay_to_mask(reader, writer, mask_read, mask_write, initial_data)).await.is_err() {
debug!("Mask relay timed out");
}
} }
Ok(Err(e)) => { Ok(Err(e)) => {
debug!(error = %e, "Failed to connect to mask host"); debug!(error = %e, "Failed to connect to mask host");

View File

@ -59,6 +59,45 @@ impl TlsFrontCache {
guard.insert(domain.to_string(), Arc::new(data)); guard.insert(domain.to_string(), Arc::new(data));
} }
pub async fn load_from_disk(&self) {
let path = self.disk_path.clone();
if tokio::fs::create_dir_all(&path).await.is_err() {
return;
}
let mut loaded = 0usize;
if let Ok(mut dir) = tokio::fs::read_dir(&path).await {
while let Ok(Some(entry)) = dir.next_entry().await {
if let Ok(name) = entry.file_name().into_string() {
if !name.ends_with(".json") {
continue;
}
if let Ok(data) = tokio::fs::read(entry.path()).await {
if let Ok(cached) = serde_json::from_slice::<CachedTlsData>(&data) {
let domain = cached.domain.clone();
self.set(&domain, cached).await;
loaded += 1;
}
}
}
}
}
if loaded > 0 {
info!(count = loaded, "Loaded TLS cache entries from disk");
}
}
async fn persist(&self, domain: &str, data: &CachedTlsData) {
if tokio::fs::create_dir_all(&self.disk_path).await.is_err() {
return;
}
let fname = format!("{}.json", domain.replace(['/', '\\'], "_"));
let path = self.disk_path.join(fname);
if let Ok(json) = serde_json::to_vec_pretty(data) {
// best-effort write
let _ = tokio::fs::write(path, json).await;
}
}
/// Spawn background updater that periodically refreshes cached domains using provided fetcher. /// Spawn background updater that periodically refreshes cached domains using provided fetcher.
pub fn spawn_updater<F>( pub fn spawn_updater<F>(
self: Arc<Self>, self: Arc<Self>,
@ -82,14 +121,15 @@ impl TlsFrontCache {
pub async fn update_from_fetch(&self, domain: &str, fetched: TlsFetchResult) { pub async fn update_from_fetch(&self, domain: &str, fetched: TlsFetchResult) {
let data = CachedTlsData { let data = CachedTlsData {
server_hello_template: fetched.server_hello_parsed, server_hello_template: fetched.server_hello_parsed,
cert_info: None, cert_info: fetched.cert_info,
app_data_records_sizes: fetched.app_data_records_sizes.clone(), app_data_records_sizes: fetched.app_data_records_sizes.clone(),
total_app_data_len: fetched.total_app_data_len, total_app_data_len: fetched.total_app_data_len,
fetched_at: SystemTime::now(), fetched_at: SystemTime::now(),
domain: domain.to_string(), domain: domain.to_string(),
}; };
self.set(domain, data).await; self.set(domain, data.clone()).await;
self.persist(domain, &data).await;
debug!(domain = %domain, len = fetched.total_app_data_len, "TLS cache updated"); debug!(domain = %domain, len = fetched.total_app_data_len, "TLS cache updated");
} }

View File

@ -5,6 +5,28 @@ use crate::protocol::constants::{
use crate::protocol::tls::{TLS_DIGEST_LEN, TLS_DIGEST_POS, gen_fake_x25519_key}; use crate::protocol::tls::{TLS_DIGEST_LEN, TLS_DIGEST_POS, gen_fake_x25519_key};
use crate::tls_front::types::CachedTlsData; use crate::tls_front::types::CachedTlsData;
const MIN_APP_DATA: usize = 64;
const MAX_APP_DATA: usize = 16640; // RFC 8446 §5.2 allows up to 2^14 + 256
fn jitter_and_clamp_sizes(sizes: &[usize], rng: &SecureRandom) -> Vec<usize> {
sizes
.iter()
.map(|&size| {
let base = size.max(MIN_APP_DATA).min(MAX_APP_DATA);
let jitter_range = ((base as f64) * 0.03).round() as i64;
if jitter_range == 0 {
return base;
}
let mut rand_bytes = [0u8; 2];
rand_bytes.copy_from_slice(&rng.bytes(2));
let span = 2 * jitter_range + 1;
let delta = (u16::from_le_bytes(rand_bytes) as i64 % span) - jitter_range;
let adjusted = (base as i64 + delta).clamp(MIN_APP_DATA as i64, MAX_APP_DATA as i64);
adjusted as usize
})
.collect()
}
/// Build a ServerHello + CCS + ApplicationData sequence using cached TLS metadata. /// Build a ServerHello + CCS + ApplicationData sequence using cached TLS metadata.
pub fn build_emulated_server_hello( pub fn build_emulated_server_hello(
secret: &[u8], secret: &[u8],
@ -76,6 +98,7 @@ pub fn build_emulated_server_hello(
if sizes.is_empty() { if sizes.is_empty() {
sizes.push(cached.total_app_data_len.max(1024)); sizes.push(cached.total_app_data_len.max(1024));
} }
let sizes = jitter_and_clamp_sizes(&sizes, rng);
let mut app_data = Vec::new(); let mut app_data = Vec::new();
for size in sizes { for size in sizes {
@ -83,7 +106,14 @@ pub fn build_emulated_server_hello(
rec.push(TLS_RECORD_APPLICATION); rec.push(TLS_RECORD_APPLICATION);
rec.extend_from_slice(&TLS_VERSION); rec.extend_from_slice(&TLS_VERSION);
rec.extend_from_slice(&(size as u16).to_be_bytes()); rec.extend_from_slice(&(size as u16).to_be_bytes());
rec.extend_from_slice(&rng.bytes(size)); if size > 17 {
let body_len = size - 17;
rec.extend_from_slice(&rng.bytes(body_len));
rec.push(0x16); // inner content type marker (handshake)
rec.extend_from_slice(&rng.bytes(16)); // AEAD-like tag
} else {
rec.extend_from_slice(&rng.bytes(size));
}
app_data.extend_from_slice(&rec); app_data.extend_from_slice(&rec);
} }

View File

@ -14,9 +14,12 @@ use rustls::client::ClientConfig;
use rustls::pki_types::{CertificateDer, ServerName, UnixTime}; use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
use rustls::{DigitallySignedStruct, Error as RustlsError}; use rustls::{DigitallySignedStruct, Error as RustlsError};
use x509_parser::prelude::FromDer;
use x509_parser::certificate::X509Certificate;
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::protocol::constants::{TLS_RECORD_APPLICATION, TLS_RECORD_HANDSHAKE, TLS_VERSION}; use crate::protocol::constants::{TLS_RECORD_APPLICATION, TLS_RECORD_HANDSHAKE};
use crate::tls_front::types::{ParsedServerHello, TlsExtension, TlsFetchResult}; use crate::tls_front::types::{ParsedServerHello, TlsExtension, TlsFetchResult, ParsedCertificateInfo};
/// No-op verifier: accept any certificate (we only need lengths and metadata). /// No-op verifier: accept any certificate (we only need lengths and metadata).
#[derive(Debug)] #[derive(Debug)]
@ -163,12 +166,15 @@ fn build_client_hello(sni: &str, rng: &SecureRandom) -> Vec<u8> {
exts.extend_from_slice(alpn_proto); exts.extend_from_slice(alpn_proto);
// padding to reduce recognizability and keep length ~500 bytes // padding to reduce recognizability and keep length ~500 bytes
if exts.len() < 180 { const TARGET_EXT_LEN: usize = 180;
let pad_len = 180 - exts.len(); if exts.len() < TARGET_EXT_LEN {
exts.extend_from_slice(&0x0015u16.to_be_bytes()); // padding extension let remaining = TARGET_EXT_LEN - exts.len();
exts.extend_from_slice(&(pad_len as u16 + 2).to_be_bytes()); if remaining > 4 {
exts.extend_from_slice(&(pad_len as u16).to_be_bytes()); let pad_len = remaining - 4; // minus type+len
exts.resize(exts.len() + pad_len, 0); exts.extend_from_slice(&0x0015u16.to_be_bytes()); // padding extension
exts.extend_from_slice(&(pad_len as u16).to_be_bytes());
exts.resize(exts.len() + pad_len, 0);
}
} }
// Extensions length prefix // Extensions length prefix
@ -263,6 +269,52 @@ fn parse_server_hello(body: &[u8]) -> Option<ParsedServerHello> {
}) })
} }
fn parse_cert_info(certs: &[CertificateDer<'static>]) -> Option<ParsedCertificateInfo> {
let first = certs.first()?;
let (_rem, cert) = X509Certificate::from_der(first.as_ref()).ok()?;
let not_before = Some(cert.validity().not_before.to_datetime().unix_timestamp());
let not_after = Some(cert.validity().not_after.to_datetime().unix_timestamp());
let issuer_cn = cert
.issuer()
.iter_common_name()
.next()
.and_then(|cn| cn.as_str().ok())
.map(|s| s.to_string());
let subject_cn = cert
.subject()
.iter_common_name()
.next()
.and_then(|cn| cn.as_str().ok())
.map(|s| s.to_string());
let san_names = cert
.subject_alternative_name()
.ok()
.flatten()
.map(|san| {
san.value
.general_names
.iter()
.filter_map(|gn| match gn {
x509_parser::extensions::GeneralName::DNSName(n) => Some(n.to_string()),
_ => None,
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
Some(ParsedCertificateInfo {
not_after_unix: not_after,
not_before_unix: not_before,
issuer_cn,
subject_cn,
san_names,
})
}
async fn fetch_via_raw_tls( async fn fetch_via_raw_tls(
host: &str, host: &str,
port: u16, port: u16,
@ -315,6 +367,7 @@ async fn fetch_via_raw_tls(
app_sizes app_sizes
}, },
total_app_data_len, total_app_data_len,
cert_info: None,
}) })
} }
@ -324,6 +377,7 @@ pub async fn fetch_real_tls(
port: u16, port: u16,
sni: &str, sni: &str,
connect_timeout: Duration, connect_timeout: Duration,
upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>,
) -> Result<TlsFetchResult> { ) -> Result<TlsFetchResult> {
// Preferred path: raw TLS probe for accurate record sizing // Preferred path: raw TLS probe for accurate record sizing
match fetch_via_raw_tls(host, port, sni, connect_timeout).await { match fetch_via_raw_tls(host, port, sni, connect_timeout).await {
@ -334,8 +388,26 @@ pub async fn fetch_real_tls(
} }
// Fallback: rustls handshake to at least get certificate sizes // Fallback: rustls handshake to at least get certificate sizes
let addr = format!("{host}:{port}"); let stream = if let Some(manager) = upstream {
let stream = timeout(connect_timeout, TcpStream::connect(addr)).await??; // Resolve host to SocketAddr
if let Ok(mut addrs) = tokio::net::lookup_host((host, port)).await {
if let Some(addr) = addrs.find(|a| a.is_ipv4()) {
match manager.connect(addr, None, None).await {
Ok(s) => s,
Err(e) => {
warn!(sni = %sni, error = %e, "Upstream connect failed, using direct connect");
timeout(connect_timeout, TcpStream::connect((host, port))).await??
}
}
} else {
timeout(connect_timeout, TcpStream::connect((host, port))).await??
}
} else {
timeout(connect_timeout, TcpStream::connect((host, port))).await??
}
} else {
timeout(connect_timeout, TcpStream::connect((host, port))).await??
};
let config = build_client_config(); let config = build_client_config();
let connector = TlsConnector::from(config); let connector = TlsConnector::from(config);
@ -359,6 +431,7 @@ pub async fn fetch_real_tls(
.unwrap_or_default(); .unwrap_or_default();
let total_cert_len: usize = certs.iter().map(|c| c.len()).sum::<usize>().max(1024); let total_cert_len: usize = certs.iter().map(|c| c.len()).sum::<usize>().max(1024);
let cert_info = parse_cert_info(&certs);
// Heuristic: split across two records if large to mimic real servers a bit. // Heuristic: split across two records if large to mimic real servers a bit.
let app_data_records_sizes = if total_cert_len > 3000 { let app_data_records_sizes = if total_cert_len > 3000 {
@ -387,5 +460,6 @@ pub async fn fetch_real_tls(
server_hello_parsed: parsed, server_hello_parsed: parsed,
app_data_records_sizes: app_data_records_sizes.clone(), app_data_records_sizes: app_data_records_sizes.clone(),
total_app_data_len: app_data_records_sizes.iter().sum(), total_app_data_len: app_data_records_sizes.iter().sum(),
cert_info,
}) })
} }

View File

@ -1,7 +1,8 @@
use std::time::SystemTime; use std::time::SystemTime;
use serde::{Serialize, Deserialize};
/// Parsed representation of an unencrypted TLS ServerHello. /// Parsed representation of an unencrypted TLS ServerHello.
#[derive(Debug, Clone)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParsedServerHello { pub struct ParsedServerHello {
pub version: [u8; 2], pub version: [u8; 2],
pub random: [u8; 32], pub random: [u8; 32],
@ -12,14 +13,14 @@ pub struct ParsedServerHello {
} }
/// Generic TLS extension container. /// Generic TLS extension container.
#[derive(Debug, Clone)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TlsExtension { pub struct TlsExtension {
pub ext_type: u16, pub ext_type: u16,
pub data: Vec<u8>, pub data: Vec<u8>,
} }
/// Basic certificate metadata (optional, informative). /// Basic certificate metadata (optional, informative).
#[derive(Debug, Clone)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParsedCertificateInfo { pub struct ParsedCertificateInfo {
pub not_after_unix: Option<i64>, pub not_after_unix: Option<i64>,
pub not_before_unix: Option<i64>, pub not_before_unix: Option<i64>,
@ -29,20 +30,26 @@ pub struct ParsedCertificateInfo {
} }
/// Cached data per SNI used by the emulator. /// Cached data per SNI used by the emulator.
#[derive(Debug, Clone)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CachedTlsData { pub struct CachedTlsData {
pub server_hello_template: ParsedServerHello, pub server_hello_template: ParsedServerHello,
pub cert_info: Option<ParsedCertificateInfo>, pub cert_info: Option<ParsedCertificateInfo>,
pub app_data_records_sizes: Vec<usize>, pub app_data_records_sizes: Vec<usize>,
pub total_app_data_len: usize, pub total_app_data_len: usize,
#[serde(default = "now_system_time", skip_serializing, skip_deserializing)]
pub fetched_at: SystemTime, pub fetched_at: SystemTime,
pub domain: String, pub domain: String,
} }
fn now_system_time() -> SystemTime {
SystemTime::now()
}
/// Result of attempting to fetch real TLS artifacts. /// Result of attempting to fetch real TLS artifacts.
#[derive(Debug, Clone)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TlsFetchResult { pub struct TlsFetchResult {
pub server_hello_parsed: ParsedServerHello, pub server_hello_parsed: ParsedServerHello,
pub app_data_records_sizes: Vec<usize>, pub app_data_records_sizes: Vec<usize>,
pub total_app_data_len: usize, pub total_app_data_len: usize,
pub cert_info: Option<ParsedCertificateInfo>,
} }

View File

@ -283,6 +283,58 @@ impl Default for ProxyProtocolV1Builder {
} }
} }
/// Builder for PROXY protocol v2 header
pub struct ProxyProtocolV2Builder {
src: Option<SocketAddr>,
dst: Option<SocketAddr>,
}
impl ProxyProtocolV2Builder {
pub fn new() -> Self {
Self { src: None, dst: None }
}
pub fn with_addrs(mut self, src: SocketAddr, dst: SocketAddr) -> Self {
self.src = Some(src);
self.dst = Some(dst);
self
}
pub fn build(&self) -> Vec<u8> {
let mut header = Vec::new();
header.extend_from_slice(PROXY_V2_SIGNATURE);
// version 2, PROXY command
header.push(0x21);
match (self.src, self.dst) {
(Some(SocketAddr::V4(src)), Some(SocketAddr::V4(dst))) => {
header.push(0x11); // INET + STREAM
header.extend_from_slice(&(12u16).to_be_bytes());
header.extend_from_slice(&src.ip().octets());
header.extend_from_slice(&dst.ip().octets());
header.extend_from_slice(&src.port().to_be_bytes());
header.extend_from_slice(&dst.port().to_be_bytes());
}
(Some(SocketAddr::V6(src)), Some(SocketAddr::V6(dst))) => {
header.push(0x21); // INET6 + STREAM
header.extend_from_slice(&(36u16).to_be_bytes());
header.extend_from_slice(&src.ip().octets());
header.extend_from_slice(&dst.ip().octets());
header.extend_from_slice(&src.port().to_be_bytes());
header.extend_from_slice(&dst.port().to_be_bytes());
}
_ => {
// LOCAL/UNSPEC: no address information
header[12] = 0x20; // version 2, LOCAL command
header.push(0x00);
header.extend_from_slice(&0u16.to_be_bytes());
}
}
header
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -378,4 +430,4 @@ mod tests {
let header = ProxyProtocolV1Builder::new().build(); let header = ProxyProtocolV1Builder::new().build();
assert_eq!(header, b"PROXY UNKNOWN\r\n"); assert_eq!(header, b"PROXY UNKNOWN\r\n");
} }
} }