mirror of https://github.com/telemt/telemt.git
commit
2dcbdbe302
|
|
@ -110,6 +110,75 @@ pub(crate) fn default_reconnect_backoff_cap_ms() -> u64 {
|
||||||
30_000
|
30_000
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_crypto_pending_buffer() -> usize {
|
||||||
|
256 * 1024
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_max_client_frame() -> usize {
|
||||||
|
16 * 1024 * 1024
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_tls_new_session_tickets() -> u8 {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_server_hello_delay_min_ms() -> u64 {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_server_hello_delay_max_ms() -> u64 {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_alpn_enforce() -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_stun_servers() -> Vec<String> {
|
||||||
|
vec![
|
||||||
|
"stun.l.google.com:19302".to_string(),
|
||||||
|
"stun1.l.google.com:19302".to_string(),
|
||||||
|
"stun2.l.google.com:19302".to_string(),
|
||||||
|
"stun.stunprotocol.org:3478".to_string(),
|
||||||
|
"stun.voip.eutelia.it:3478".to_string(),
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_http_ip_detect_urls() -> Vec<String> {
|
||||||
|
vec![
|
||||||
|
"https://ifconfig.me/ip".to_string(),
|
||||||
|
"https://api.ipify.org".to_string(),
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_cache_public_ip_path() -> String {
|
||||||
|
"cache/public_ip.txt".to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_proxy_secret_reload_secs() -> u64 {
|
||||||
|
12 * 60 * 60
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_proxy_config_reload_secs() -> u64 {
|
||||||
|
12 * 60 * 60
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_ntp_check() -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_ntp_servers() -> Vec<String> {
|
||||||
|
vec!["pool.ntp.org".to_string()]
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_fast_mode_min_tls_record() -> usize {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_degradation_min_unavailable_dc_groups() -> u8 {
|
||||||
|
2
|
||||||
|
}
|
||||||
|
|
||||||
// Custom deserializer helpers
|
// Custom deserializer helpers
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
|
|
|
||||||
|
|
@ -96,6 +96,22 @@ pub struct NetworkConfig {
|
||||||
|
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub multipath: bool,
|
pub multipath: bool,
|
||||||
|
|
||||||
|
/// STUN servers list for public IP discovery.
|
||||||
|
#[serde(default = "default_stun_servers")]
|
||||||
|
pub stun_servers: Vec<String>,
|
||||||
|
|
||||||
|
/// Enable TCP STUN fallback when UDP is blocked.
|
||||||
|
#[serde(default)]
|
||||||
|
pub stun_tcp_fallback: bool,
|
||||||
|
|
||||||
|
/// HTTP-based public IP detection endpoints (fallback after STUN).
|
||||||
|
#[serde(default = "default_http_ip_detect_urls")]
|
||||||
|
pub http_ip_detect_urls: Vec<String>,
|
||||||
|
|
||||||
|
/// Cache file path for detected public IP.
|
||||||
|
#[serde(default = "default_cache_public_ip_path")]
|
||||||
|
pub cache_public_ip_path: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for NetworkConfig {
|
impl Default for NetworkConfig {
|
||||||
|
|
@ -105,6 +121,10 @@ impl Default for NetworkConfig {
|
||||||
ipv6: None,
|
ipv6: None,
|
||||||
prefer: 4,
|
prefer: 4,
|
||||||
multipath: false,
|
multipath: false,
|
||||||
|
stun_servers: default_stun_servers(),
|
||||||
|
stun_tcp_fallback: true,
|
||||||
|
http_ip_detect_urls: default_http_ip_detect_urls(),
|
||||||
|
cache_public_ip_path: default_cache_public_ip_path(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -172,6 +192,15 @@ pub struct GeneralConfig {
|
||||||
#[serde(default = "default_true")]
|
#[serde(default = "default_true")]
|
||||||
pub me_keepalive_payload_random: bool,
|
pub me_keepalive_payload_random: bool,
|
||||||
|
|
||||||
|
/// Max pending ciphertext buffer per client writer (bytes).
|
||||||
|
/// Controls FakeTLS backpressure vs throughput.
|
||||||
|
#[serde(default = "default_crypto_pending_buffer")]
|
||||||
|
pub crypto_pending_buffer: usize,
|
||||||
|
|
||||||
|
/// Maximum allowed client MTProto frame size (bytes).
|
||||||
|
#[serde(default = "default_max_client_frame")]
|
||||||
|
pub max_client_frame: usize,
|
||||||
|
|
||||||
/// Enable staggered warmup of extra ME writers.
|
/// Enable staggered warmup of extra ME writers.
|
||||||
#[serde(default = "default_true")]
|
#[serde(default = "default_true")]
|
||||||
pub me_warmup_stagger_enabled: bool,
|
pub me_warmup_stagger_enabled: bool,
|
||||||
|
|
@ -218,6 +247,34 @@ pub struct GeneralConfig {
|
||||||
/// [general.links] — proxy link generation overrides.
|
/// [general.links] — proxy link generation overrides.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub links: LinksConfig,
|
pub links: LinksConfig,
|
||||||
|
|
||||||
|
/// Minimum TLS record size when fast_mode coalescing is enabled (0 = disabled).
|
||||||
|
#[serde(default = "default_fast_mode_min_tls_record")]
|
||||||
|
pub fast_mode_min_tls_record: usize,
|
||||||
|
|
||||||
|
/// Automatically reload proxy-secret every N seconds.
|
||||||
|
#[serde(default = "default_proxy_secret_reload_secs")]
|
||||||
|
pub proxy_secret_auto_reload_secs: u64,
|
||||||
|
|
||||||
|
/// Automatically reload proxy-multi.conf every N seconds.
|
||||||
|
#[serde(default = "default_proxy_config_reload_secs")]
|
||||||
|
pub proxy_config_auto_reload_secs: u64,
|
||||||
|
|
||||||
|
/// Enable NTP drift check at startup.
|
||||||
|
#[serde(default = "default_ntp_check")]
|
||||||
|
pub ntp_check: bool,
|
||||||
|
|
||||||
|
/// NTP servers for drift check.
|
||||||
|
#[serde(default = "default_ntp_servers")]
|
||||||
|
pub ntp_servers: Vec<String>,
|
||||||
|
|
||||||
|
/// Enable auto-degradation from ME to Direct-DC.
|
||||||
|
#[serde(default = "default_true")]
|
||||||
|
pub auto_degradation_enabled: bool,
|
||||||
|
|
||||||
|
/// Minimum unavailable ME DC groups before degrading.
|
||||||
|
#[serde(default = "default_degradation_min_unavailable_dc_groups")]
|
||||||
|
pub degradation_min_unavailable_dc_groups: u8,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for GeneralConfig {
|
impl Default for GeneralConfig {
|
||||||
|
|
@ -251,6 +308,15 @@ impl Default for GeneralConfig {
|
||||||
log_level: LogLevel::Normal,
|
log_level: LogLevel::Normal,
|
||||||
disable_colors: false,
|
disable_colors: false,
|
||||||
links: LinksConfig::default(),
|
links: LinksConfig::default(),
|
||||||
|
crypto_pending_buffer: default_crypto_pending_buffer(),
|
||||||
|
max_client_frame: default_max_client_frame(),
|
||||||
|
fast_mode_min_tls_record: default_fast_mode_min_tls_record(),
|
||||||
|
proxy_secret_auto_reload_secs: default_proxy_secret_reload_secs(),
|
||||||
|
proxy_config_auto_reload_secs: default_proxy_config_reload_secs(),
|
||||||
|
ntp_check: default_ntp_check(),
|
||||||
|
ntp_servers: default_ntp_servers(),
|
||||||
|
auto_degradation_enabled: true,
|
||||||
|
degradation_min_unavailable_dc_groups: default_degradation_min_unavailable_dc_groups(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -395,6 +461,22 @@ pub struct AntiCensorshipConfig {
|
||||||
/// Directory to store TLS front cache (on disk).
|
/// Directory to store TLS front cache (on disk).
|
||||||
#[serde(default = "default_tls_front_dir")]
|
#[serde(default = "default_tls_front_dir")]
|
||||||
pub tls_front_dir: String,
|
pub tls_front_dir: String,
|
||||||
|
|
||||||
|
/// Minimum server_hello delay in milliseconds (anti-fingerprint).
|
||||||
|
#[serde(default = "default_server_hello_delay_min_ms")]
|
||||||
|
pub server_hello_delay_min_ms: u64,
|
||||||
|
|
||||||
|
/// Maximum server_hello delay in milliseconds.
|
||||||
|
#[serde(default = "default_server_hello_delay_max_ms")]
|
||||||
|
pub server_hello_delay_max_ms: u64,
|
||||||
|
|
||||||
|
/// Number of NewSessionTicket messages to emit post-handshake.
|
||||||
|
#[serde(default = "default_tls_new_session_tickets")]
|
||||||
|
pub tls_new_session_tickets: u8,
|
||||||
|
|
||||||
|
/// Enforce ALPN echo of client preference.
|
||||||
|
#[serde(default = "default_alpn_enforce")]
|
||||||
|
pub alpn_enforce: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for AntiCensorshipConfig {
|
impl Default for AntiCensorshipConfig {
|
||||||
|
|
@ -409,6 +491,10 @@ impl Default for AntiCensorshipConfig {
|
||||||
fake_cert_len: default_fake_cert_len(),
|
fake_cert_len: default_fake_cert_len(),
|
||||||
tls_emulation: false,
|
tls_emulation: false,
|
||||||
tls_front_dir: default_tls_front_dir(),
|
tls_front_dir: default_tls_front_dir(),
|
||||||
|
server_hello_delay_min_ms: default_server_hello_delay_min_ms(),
|
||||||
|
server_hello_delay_max_ms: default_server_hello_delay_max_ms(),
|
||||||
|
tls_new_session_tickets: default_tls_new_session_tickets(),
|
||||||
|
alpn_enforce: default_alpn_enforce(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -213,6 +213,9 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
"Modes: classic={} secure={} tls={}",
|
"Modes: classic={} secure={} tls={}",
|
||||||
config.general.modes.classic, config.general.modes.secure, config.general.modes.tls
|
config.general.modes.classic, config.general.modes.secure, config.general.modes.tls
|
||||||
);
|
);
|
||||||
|
if config.general.modes.classic {
|
||||||
|
warn!("Classic mode is vulnerable to DPI detection; enable only for legacy clients");
|
||||||
|
}
|
||||||
info!("TLS domain: {}", config.censorship.tls_domain);
|
info!("TLS domain: {}", config.censorship.tls_domain);
|
||||||
if let Some(ref sock) = config.censorship.mask_unix_sock {
|
if let Some(ref sock) = config.censorship.mask_unix_sock {
|
||||||
info!("Mask: {} -> unix:{}", config.censorship.mask, sock);
|
info!("Mask: {} -> unix:{}", config.censorship.mask, sock);
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
//! Protocol constants and datacenter addresses
|
//! Protocol constants and datacenter addresses
|
||||||
|
|
||||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
||||||
|
|
||||||
|
use crate::crypto::SecureRandom;
|
||||||
use std::sync::LazyLock;
|
use std::sync::LazyLock;
|
||||||
|
|
||||||
// ============= Telegram Datacenters =============
|
// ============= Telegram Datacenters =============
|
||||||
|
|
@ -151,7 +153,18 @@ pub const TLS_RECORD_ALERT: u8 = 0x15;
|
||||||
/// Maximum TLS record size
|
/// Maximum TLS record size
|
||||||
pub const MAX_TLS_RECORD_SIZE: usize = 16384;
|
pub const MAX_TLS_RECORD_SIZE: usize = 16384;
|
||||||
/// Maximum TLS chunk size (with overhead)
|
/// Maximum TLS chunk size (with overhead)
|
||||||
pub const MAX_TLS_CHUNK_SIZE: usize = 16384 + 24;
|
/// RFC 8446 §5.2 allows up to 16384 + 256 bytes of ciphertext
|
||||||
|
pub const MAX_TLS_CHUNK_SIZE: usize = 16384 + 256;
|
||||||
|
|
||||||
|
/// Generate padding length for Secure Intermediate protocol.
|
||||||
|
/// Total (data + padding) must not be divisible by 4 per MTProto spec.
|
||||||
|
pub fn secure_padding_len(data_len: usize, rng: &SecureRandom) -> usize {
|
||||||
|
if data_len % 4 == 0 {
|
||||||
|
(rng.range(3) + 1) as usize // 1-3
|
||||||
|
} else {
|
||||||
|
rng.range(4) as usize // 0-3
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ============= Timeouts =============
|
// ============= Timeouts =============
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -32,6 +32,7 @@ pub const TIME_SKEW_MAX: i64 = 10 * 60; // 10 minutes after
|
||||||
mod extension_type {
|
mod extension_type {
|
||||||
pub const KEY_SHARE: u16 = 0x0033;
|
pub const KEY_SHARE: u16 = 0x0033;
|
||||||
pub const SUPPORTED_VERSIONS: u16 = 0x002b;
|
pub const SUPPORTED_VERSIONS: u16 = 0x002b;
|
||||||
|
pub const ALPN: u16 = 0x0010;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// TLS Cipher Suites
|
/// TLS Cipher Suites
|
||||||
|
|
@ -62,6 +63,7 @@ pub struct TlsValidation {
|
||||||
// ============= TLS Extension Builder =============
|
// ============= TLS Extension Builder =============
|
||||||
|
|
||||||
/// Builder for TLS extensions with correct length calculation
|
/// Builder for TLS extensions with correct length calculation
|
||||||
|
#[derive(Clone)]
|
||||||
struct TlsExtensionBuilder {
|
struct TlsExtensionBuilder {
|
||||||
extensions: Vec<u8>,
|
extensions: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
@ -109,6 +111,27 @@ impl TlsExtensionBuilder {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add ALPN extension with a single selected protocol.
|
||||||
|
fn add_alpn(&mut self, proto: &[u8]) -> &mut Self {
|
||||||
|
// Extension type: ALPN (0x0010)
|
||||||
|
self.extensions.extend_from_slice(&extension_type::ALPN.to_be_bytes());
|
||||||
|
|
||||||
|
// ALPN extension format:
|
||||||
|
// extension_data length (2 bytes)
|
||||||
|
// protocols length (2 bytes)
|
||||||
|
// protocol name length (1 byte)
|
||||||
|
// protocol name bytes
|
||||||
|
let proto_len = proto.len() as u8;
|
||||||
|
let list_len: u16 = 1 + proto_len as u16;
|
||||||
|
let ext_len: u16 = 2 + list_len;
|
||||||
|
|
||||||
|
self.extensions.extend_from_slice(&ext_len.to_be_bytes());
|
||||||
|
self.extensions.extend_from_slice(&list_len.to_be_bytes());
|
||||||
|
self.extensions.push(proto_len);
|
||||||
|
self.extensions.extend_from_slice(proto);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Build final extensions with length prefix
|
/// Build final extensions with length prefix
|
||||||
fn build(self) -> Vec<u8> {
|
fn build(self) -> Vec<u8> {
|
||||||
let mut result = Vec::with_capacity(2 + self.extensions.len());
|
let mut result = Vec::with_capacity(2 + self.extensions.len());
|
||||||
|
|
@ -144,6 +167,8 @@ struct ServerHelloBuilder {
|
||||||
compression: u8,
|
compression: u8,
|
||||||
/// Extensions
|
/// Extensions
|
||||||
extensions: TlsExtensionBuilder,
|
extensions: TlsExtensionBuilder,
|
||||||
|
/// Selected ALPN protocol (if any)
|
||||||
|
alpn: Option<Vec<u8>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServerHelloBuilder {
|
impl ServerHelloBuilder {
|
||||||
|
|
@ -154,6 +179,7 @@ impl ServerHelloBuilder {
|
||||||
cipher_suite: cipher_suite::TLS_AES_128_GCM_SHA256,
|
cipher_suite: cipher_suite::TLS_AES_128_GCM_SHA256,
|
||||||
compression: 0x00,
|
compression: 0x00,
|
||||||
extensions: TlsExtensionBuilder::new(),
|
extensions: TlsExtensionBuilder::new(),
|
||||||
|
alpn: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -168,9 +194,18 @@ impl ServerHelloBuilder {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn with_alpn(mut self, proto: Option<Vec<u8>>) -> Self {
|
||||||
|
self.alpn = proto;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Build ServerHello message (without record header)
|
/// Build ServerHello message (without record header)
|
||||||
fn build_message(&self) -> Vec<u8> {
|
fn build_message(&self) -> Vec<u8> {
|
||||||
let extensions = self.extensions.extensions.clone();
|
let mut ext_builder = self.extensions.clone();
|
||||||
|
if let Some(ref alpn) = self.alpn {
|
||||||
|
ext_builder.add_alpn(alpn);
|
||||||
|
}
|
||||||
|
let extensions = ext_builder.extensions.clone();
|
||||||
let extensions_len = extensions.len() as u16;
|
let extensions_len = extensions.len() as u16;
|
||||||
|
|
||||||
// Calculate total length
|
// Calculate total length
|
||||||
|
|
@ -350,6 +385,8 @@ pub fn build_server_hello(
|
||||||
session_id: &[u8],
|
session_id: &[u8],
|
||||||
fake_cert_len: usize,
|
fake_cert_len: usize,
|
||||||
rng: &SecureRandom,
|
rng: &SecureRandom,
|
||||||
|
alpn: Option<Vec<u8>>,
|
||||||
|
new_session_tickets: u8,
|
||||||
) -> Vec<u8> {
|
) -> Vec<u8> {
|
||||||
const MIN_APP_DATA: usize = 64;
|
const MIN_APP_DATA: usize = 64;
|
||||||
const MAX_APP_DATA: usize = 16640; // RFC 8446 §5.2 upper bound
|
const MAX_APP_DATA: usize = 16640; // RFC 8446 §5.2 upper bound
|
||||||
|
|
@ -360,6 +397,7 @@ pub fn build_server_hello(
|
||||||
let server_hello = ServerHelloBuilder::new(session_id.to_vec())
|
let server_hello = ServerHelloBuilder::new(session_id.to_vec())
|
||||||
.with_x25519_key(&x25519_key)
|
.with_x25519_key(&x25519_key)
|
||||||
.with_tls13_version()
|
.with_tls13_version()
|
||||||
|
.with_alpn(alpn)
|
||||||
.build_record();
|
.build_record();
|
||||||
|
|
||||||
// Build Change Cipher Spec record
|
// Build Change Cipher Spec record
|
||||||
|
|
@ -376,21 +414,35 @@ pub fn build_server_hello(
|
||||||
app_data_record.push(TLS_RECORD_APPLICATION);
|
app_data_record.push(TLS_RECORD_APPLICATION);
|
||||||
app_data_record.extend_from_slice(&TLS_VERSION);
|
app_data_record.extend_from_slice(&TLS_VERSION);
|
||||||
app_data_record.extend_from_slice(&(fake_cert_len as u16).to_be_bytes());
|
app_data_record.extend_from_slice(&(fake_cert_len as u16).to_be_bytes());
|
||||||
if fake_cert_len > 17 {
|
// Fill ApplicationData with fully random bytes of desired length to avoid
|
||||||
app_data_record.extend_from_slice(&fake_cert[..fake_cert_len - 17]);
|
// deterministic DPI fingerprints (fixed inner content type markers).
|
||||||
app_data_record.push(0x16); // inner content type marker
|
app_data_record.extend_from_slice(&fake_cert);
|
||||||
app_data_record.extend_from_slice(&rng.bytes(16)); // AEAD-like tag mimic
|
|
||||||
} else {
|
// Build optional NewSessionTicket records (TLS 1.3 handshake messages are encrypted;
|
||||||
app_data_record.extend_from_slice(&fake_cert);
|
// here we mimic with opaque ApplicationData records of plausible size).
|
||||||
|
let mut tickets = Vec::new();
|
||||||
|
if new_session_tickets > 0 {
|
||||||
|
for _ in 0..new_session_tickets {
|
||||||
|
let ticket_len: usize = rng.range(48) + 48; // 48-95 bytes
|
||||||
|
let mut record = Vec::with_capacity(5 + ticket_len);
|
||||||
|
record.push(TLS_RECORD_APPLICATION);
|
||||||
|
record.extend_from_slice(&TLS_VERSION);
|
||||||
|
record.extend_from_slice(&(ticket_len as u16).to_be_bytes());
|
||||||
|
record.extend_from_slice(&rng.bytes(ticket_len));
|
||||||
|
tickets.push(record);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Combine all records
|
// Combine all records
|
||||||
let mut response = Vec::with_capacity(
|
let mut response = Vec::with_capacity(
|
||||||
server_hello.len() + change_cipher_spec.len() + app_data_record.len()
|
server_hello.len() + change_cipher_spec.len() + app_data_record.len() + tickets.iter().map(|r| r.len()).sum::<usize>()
|
||||||
);
|
);
|
||||||
response.extend_from_slice(&server_hello);
|
response.extend_from_slice(&server_hello);
|
||||||
response.extend_from_slice(&change_cipher_spec);
|
response.extend_from_slice(&change_cipher_spec);
|
||||||
response.extend_from_slice(&app_data_record);
|
response.extend_from_slice(&app_data_record);
|
||||||
|
for t in &tickets {
|
||||||
|
response.extend_from_slice(t);
|
||||||
|
}
|
||||||
|
|
||||||
// Compute HMAC for the response
|
// Compute HMAC for the response
|
||||||
let mut hmac_input = Vec::with_capacity(TLS_DIGEST_LEN + response.len());
|
let mut hmac_input = Vec::with_capacity(TLS_DIGEST_LEN + response.len());
|
||||||
|
|
@ -484,85 +536,53 @@ pub fn extract_sni_from_client_hello(handshake: &[u8]) -> Option<String> {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extract ALPN protocol list from TLS ClientHello.
|
/// Extract ALPN protocol list from ClientHello, return in offered order.
|
||||||
pub fn extract_alpn_from_client_hello(handshake: &[u8]) -> Option<Vec<String>> {
|
pub fn extract_alpn_from_client_hello(handshake: &[u8]) -> Vec<Vec<u8>> {
|
||||||
if handshake.len() < 43 || handshake[0] != TLS_RECORD_HANDSHAKE {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut pos = 5; // after record header
|
let mut pos = 5; // after record header
|
||||||
if handshake.get(pos).copied()? != 0x01 {
|
if handshake.get(pos) != Some(&0x01) {
|
||||||
return None; // not ClientHello
|
return Vec::new();
|
||||||
}
|
}
|
||||||
|
pos += 4; // type + len
|
||||||
// Handshake length bytes
|
pos += 2 + 32; // version + random
|
||||||
pos += 4; // type + len (3)
|
if pos >= handshake.len() { return Vec::new(); }
|
||||||
|
let session_id_len = *handshake.get(pos).unwrap_or(&0) as usize;
|
||||||
// version (2) + random (32)
|
|
||||||
pos += 2 + 32;
|
|
||||||
if pos + 1 > handshake.len() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
let session_id_len = *handshake.get(pos)? as usize;
|
|
||||||
pos += 1 + session_id_len;
|
pos += 1 + session_id_len;
|
||||||
if pos + 2 > handshake.len() {
|
if pos + 2 > handshake.len() { return Vec::new(); }
|
||||||
return None;
|
let cipher_len = u16::from_be_bytes([handshake[pos], handshake[pos+1]]) as usize;
|
||||||
}
|
pos += 2 + cipher_len;
|
||||||
|
if pos >= handshake.len() { return Vec::new(); }
|
||||||
let cipher_suites_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize;
|
let comp_len = *handshake.get(pos).unwrap_or(&0) as usize;
|
||||||
pos += 2 + cipher_suites_len;
|
|
||||||
if pos + 1 > handshake.len() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
let comp_len = *handshake.get(pos)? as usize;
|
|
||||||
pos += 1 + comp_len;
|
pos += 1 + comp_len;
|
||||||
if pos + 2 > handshake.len() {
|
if pos + 2 > handshake.len() { return Vec::new(); }
|
||||||
return None;
|
let ext_len = u16::from_be_bytes([handshake[pos], handshake[pos+1]]) as usize;
|
||||||
}
|
|
||||||
|
|
||||||
let ext_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize;
|
|
||||||
pos += 2;
|
pos += 2;
|
||||||
let ext_end = pos + ext_len;
|
let ext_end = pos + ext_len;
|
||||||
if ext_end > handshake.len() {
|
if ext_end > handshake.len() { return Vec::new(); }
|
||||||
return None;
|
let mut out = Vec::new();
|
||||||
}
|
|
||||||
|
|
||||||
while pos + 4 <= ext_end {
|
while pos + 4 <= ext_end {
|
||||||
let etype = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]);
|
let etype = u16::from_be_bytes([handshake[pos], handshake[pos+1]]);
|
||||||
let elen = u16::from_be_bytes([handshake[pos + 2], handshake[pos + 3]]) as usize;
|
let elen = u16::from_be_bytes([handshake[pos+2], handshake[pos+3]]) as usize;
|
||||||
pos += 4;
|
pos += 4;
|
||||||
if pos + elen > ext_end {
|
if pos + elen > ext_end { break; }
|
||||||
|
if etype == extension_type::ALPN && elen >= 3 {
|
||||||
|
let list_len = u16::from_be_bytes([handshake[pos], handshake[pos+1]]) as usize;
|
||||||
|
let mut lp = pos + 2;
|
||||||
|
let list_end = (pos + 2).saturating_add(list_len).min(pos + elen);
|
||||||
|
while lp + 1 <= list_end {
|
||||||
|
let plen = handshake[lp] as usize;
|
||||||
|
lp += 1;
|
||||||
|
if lp + plen > list_end { break; }
|
||||||
|
out.push(handshake[lp..lp+plen].to_vec());
|
||||||
|
lp += plen;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if etype == 0x0010 && elen >= 3 {
|
|
||||||
// ALPN
|
|
||||||
let list_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize;
|
|
||||||
let mut alpn_pos = pos + 2;
|
|
||||||
let list_end = std::cmp::min(alpn_pos + list_len, pos + elen);
|
|
||||||
let mut protocols = Vec::new();
|
|
||||||
while alpn_pos < list_end {
|
|
||||||
let proto_len = *handshake.get(alpn_pos)? as usize;
|
|
||||||
alpn_pos += 1;
|
|
||||||
if alpn_pos + proto_len > list_end {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if let Ok(p) = std::str::from_utf8(&handshake[alpn_pos..alpn_pos + proto_len]) {
|
|
||||||
protocols.push(p.to_string());
|
|
||||||
}
|
|
||||||
alpn_pos += proto_len;
|
|
||||||
}
|
|
||||||
return Some(protocols);
|
|
||||||
}
|
|
||||||
|
|
||||||
pos += elen;
|
pos += elen;
|
||||||
}
|
}
|
||||||
|
out
|
||||||
None
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Check if bytes look like a TLS ClientHello
|
/// Check if bytes look like a TLS ClientHello
|
||||||
pub fn is_tls_handshake(first_bytes: &[u8]) -> bool {
|
pub fn is_tls_handshake(first_bytes: &[u8]) -> bool {
|
||||||
if first_bytes.len() < 3 {
|
if first_bytes.len() < 3 {
|
||||||
|
|
@ -741,7 +761,7 @@ mod tests {
|
||||||
let session_id = vec![0xAA; 32];
|
let session_id = vec![0xAA; 32];
|
||||||
|
|
||||||
let rng = SecureRandom::new();
|
let rng = SecureRandom::new();
|
||||||
let response = build_server_hello(secret, &client_digest, &session_id, 2048, &rng);
|
let response = build_server_hello(secret, &client_digest, &session_id, 2048, &rng, None, 0);
|
||||||
|
|
||||||
// Should have at least 3 records
|
// Should have at least 3 records
|
||||||
assert!(response.len() > 100);
|
assert!(response.len() > 100);
|
||||||
|
|
@ -774,8 +794,8 @@ mod tests {
|
||||||
let session_id = vec![0xAA; 32];
|
let session_id = vec![0xAA; 32];
|
||||||
|
|
||||||
let rng = SecureRandom::new();
|
let rng = SecureRandom::new();
|
||||||
let response1 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng);
|
let response1 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng, None, 0);
|
||||||
let response2 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng);
|
let response2 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng, None, 0);
|
||||||
|
|
||||||
// Digest position should have non-zero data
|
// Digest position should have non-zero data
|
||||||
let digest1 = &response1[TLS_DIGEST_POS..TLS_DIGEST_POS + TLS_DIGEST_LEN];
|
let digest1 = &response1[TLS_DIGEST_POS..TLS_DIGEST_POS + TLS_DIGEST_LEN];
|
||||||
|
|
@ -904,8 +924,12 @@ mod tests {
|
||||||
alpn_data.push(2);
|
alpn_data.push(2);
|
||||||
alpn_data.extend_from_slice(b"h2");
|
alpn_data.extend_from_slice(b"h2");
|
||||||
let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test");
|
let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test");
|
||||||
let alpn = extract_alpn_from_client_hello(&ch).unwrap();
|
let alpn = extract_alpn_from_client_hello(&ch);
|
||||||
assert_eq!(alpn, vec!["h2"]);
|
let alpn_str: Vec<String> = alpn
|
||||||
|
.iter()
|
||||||
|
.map(|p| std::str::from_utf8(p).unwrap().to_string())
|
||||||
|
.collect();
|
||||||
|
assert_eq!(alpn_str, vec!["h2"]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
@ -920,7 +944,11 @@ mod tests {
|
||||||
alpn_data.push(2);
|
alpn_data.push(2);
|
||||||
alpn_data.extend_from_slice(b"h3");
|
alpn_data.extend_from_slice(b"h3");
|
||||||
let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test");
|
let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test");
|
||||||
let alpn = extract_alpn_from_client_hello(&ch).unwrap();
|
let alpn = extract_alpn_from_client_hello(&ch);
|
||||||
assert_eq!(alpn, vec!["h2", "spdy", "h3"]);
|
let alpn_str: Vec<String> = alpn
|
||||||
|
.iter()
|
||||||
|
.map(|p| std::str::from_utf8(p).unwrap().to_string())
|
||||||
|
.collect();
|
||||||
|
assert_eq!(alpn_str, vec!["h2", "spdy", "h3"]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -178,8 +178,9 @@ async fn do_tg_handshake_static(
|
||||||
|
|
||||||
let (read_half, write_half) = stream.into_split();
|
let (read_half, write_half) = stream.into_split();
|
||||||
|
|
||||||
|
let max_pending = config.general.crypto_pending_buffer;
|
||||||
Ok((
|
Ok((
|
||||||
CryptoReader::new(read_half, tg_decryptor),
|
CryptoReader::new(read_half, tg_decryptor),
|
||||||
CryptoWriter::new(write_half, tg_encryptor),
|
CryptoWriter::new(write_half, tg_encryptor, max_pending),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ use tracing::{debug, warn, trace, info};
|
||||||
use zeroize::Zeroize;
|
use zeroize::Zeroize;
|
||||||
|
|
||||||
use crate::crypto::{sha256, AesCtr, SecureRandom};
|
use crate::crypto::{sha256, AesCtr, SecureRandom};
|
||||||
|
use rand::Rng;
|
||||||
use crate::protocol::constants::*;
|
use crate::protocol::constants::*;
|
||||||
use crate::protocol::tls;
|
use crate::protocol::tls;
|
||||||
use crate::stream::{FakeTlsReader, FakeTlsWriter, CryptoReader, CryptoWriter};
|
use crate::stream::{FakeTlsReader, FakeTlsWriter, CryptoReader, CryptoWriter};
|
||||||
|
|
@ -119,6 +120,23 @@ where
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let alpn_list = if config.censorship.alpn_enforce {
|
||||||
|
tls::extract_alpn_from_client_hello(handshake)
|
||||||
|
} else {
|
||||||
|
Vec::new()
|
||||||
|
};
|
||||||
|
let selected_alpn = if config.censorship.alpn_enforce {
|
||||||
|
if alpn_list.iter().any(|p| p == b"h2") {
|
||||||
|
Some(b"h2".to_vec())
|
||||||
|
} else if alpn_list.iter().any(|p| p == b"http/1.1") {
|
||||||
|
Some(b"http/1.1".to_vec())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let response = if let Some(cached_entry) = cached {
|
let response = if let Some(cached_entry) = cached {
|
||||||
emulator::build_emulated_server_hello(
|
emulator::build_emulated_server_hello(
|
||||||
secret,
|
secret,
|
||||||
|
|
@ -126,6 +144,8 @@ where
|
||||||
&validation.session_id,
|
&validation.session_id,
|
||||||
&cached_entry,
|
&cached_entry,
|
||||||
rng,
|
rng,
|
||||||
|
selected_alpn.clone(),
|
||||||
|
config.censorship.tls_new_session_tickets,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
tls::build_server_hello(
|
tls::build_server_hello(
|
||||||
|
|
@ -134,9 +154,25 @@ where
|
||||||
&validation.session_id,
|
&validation.session_id,
|
||||||
config.censorship.fake_cert_len,
|
config.censorship.fake_cert_len,
|
||||||
rng,
|
rng,
|
||||||
|
selected_alpn.clone(),
|
||||||
|
config.censorship.tls_new_session_tickets,
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Optional anti-fingerprint delay before sending ServerHello.
|
||||||
|
if config.censorship.server_hello_delay_max_ms > 0 {
|
||||||
|
let min = config.censorship.server_hello_delay_min_ms;
|
||||||
|
let max = config.censorship.server_hello_delay_max_ms.max(min);
|
||||||
|
let delay_ms = if max == min {
|
||||||
|
max
|
||||||
|
} else {
|
||||||
|
rand::rng().random_range(min..=max)
|
||||||
|
};
|
||||||
|
if delay_ms > 0 {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
debug!(peer = %peer, response_len = response.len(), "Sending TLS ServerHello");
|
debug!(peer = %peer, response_len = response.len(), "Sending TLS ServerHello");
|
||||||
|
|
||||||
if let Err(e) = writer.write_all(&response).await {
|
if let Err(e) = writer.write_all(&response).await {
|
||||||
|
|
@ -264,9 +300,10 @@ where
|
||||||
"MTProto handshake successful"
|
"MTProto handshake successful"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let max_pending = config.general.crypto_pending_buffer;
|
||||||
return HandshakeResult::Success((
|
return HandshakeResult::Success((
|
||||||
CryptoReader::new(reader, decryptor),
|
CryptoReader::new(reader, decryptor),
|
||||||
CryptoWriter::new(writer, encryptor),
|
CryptoWriter::new(writer, encryptor, max_pending),
|
||||||
success,
|
success,
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,12 +2,13 @@ use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||||
use tracing::{debug, info, trace};
|
use tokio::sync::oneshot;
|
||||||
|
use tracing::{debug, info, trace, warn};
|
||||||
|
|
||||||
use crate::config::ProxyConfig;
|
use crate::config::ProxyConfig;
|
||||||
use crate::crypto::SecureRandom;
|
use crate::crypto::SecureRandom;
|
||||||
use crate::error::{ProxyError, Result};
|
use crate::error::{ProxyError, Result};
|
||||||
use crate::protocol::constants::*;
|
use crate::protocol::constants::{*, secure_padding_len};
|
||||||
use crate::proxy::handshake::HandshakeSuccess;
|
use crate::proxy::handshake::HandshakeSuccess;
|
||||||
use crate::stats::Stats;
|
use crate::stats::Stats;
|
||||||
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
|
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
|
||||||
|
|
@ -15,11 +16,11 @@ use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
|
||||||
|
|
||||||
pub(crate) async fn handle_via_middle_proxy<R, W>(
|
pub(crate) async fn handle_via_middle_proxy<R, W>(
|
||||||
mut crypto_reader: CryptoReader<R>,
|
mut crypto_reader: CryptoReader<R>,
|
||||||
mut crypto_writer: CryptoWriter<W>,
|
crypto_writer: CryptoWriter<W>,
|
||||||
success: HandshakeSuccess,
|
success: HandshakeSuccess,
|
||||||
me_pool: Arc<MePool>,
|
me_pool: Arc<MePool>,
|
||||||
stats: Arc<Stats>,
|
stats: Arc<Stats>,
|
||||||
_config: Arc<ProxyConfig>,
|
config: Arc<ProxyConfig>,
|
||||||
_buffer_pool: Arc<BufferPool>,
|
_buffer_pool: Arc<BufferPool>,
|
||||||
local_addr: SocketAddr,
|
local_addr: SocketAddr,
|
||||||
rng: Arc<SecureRandom>,
|
rng: Arc<SecureRandom>,
|
||||||
|
|
@ -41,7 +42,7 @@ where
|
||||||
"Routing via Middle-End"
|
"Routing via Middle-End"
|
||||||
);
|
);
|
||||||
|
|
||||||
let (conn_id, mut me_rx) = me_pool.registry().register().await;
|
let (conn_id, me_rx) = me_pool.registry().register().await;
|
||||||
|
|
||||||
stats.increment_user_connects(&user);
|
stats.increment_user_connects(&user);
|
||||||
stats.increment_user_curr_connects(&user);
|
stats.increment_user_curr_connects(&user);
|
||||||
|
|
@ -56,59 +57,90 @@ where
|
||||||
|
|
||||||
let translated_local_addr = me_pool.translate_our_addr(local_addr);
|
let translated_local_addr = me_pool.translate_our_addr(local_addr);
|
||||||
|
|
||||||
let result: Result<()> = loop {
|
let frame_limit = config.general.max_client_frame;
|
||||||
tokio::select! {
|
|
||||||
client_frame = read_client_payload(&mut crypto_reader, proto_tag) => {
|
let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
|
||||||
match client_frame {
|
let mut me_rx_task = me_rx;
|
||||||
Ok(Some((payload, quickack))) => {
|
let stats_clone = stats.clone();
|
||||||
trace!(conn_id, bytes = payload.len(), "C->ME frame");
|
let rng_clone = rng.clone();
|
||||||
stats.add_user_octets_from(&user, payload.len() as u64);
|
let user_clone = user.clone();
|
||||||
let mut flags = proto_flags;
|
let me_writer = tokio::spawn(async move {
|
||||||
if quickack {
|
let mut writer = crypto_writer;
|
||||||
flags |= RPC_FLAG_QUICKACK;
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
msg = me_rx_task.recv() => {
|
||||||
|
match msg {
|
||||||
|
Some(MeResponse::Data { flags, data }) => {
|
||||||
|
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
|
||||||
|
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
|
||||||
|
write_client_payload(&mut writer, proto_tag, flags, &data, rng_clone.as_ref()).await?;
|
||||||
}
|
}
|
||||||
if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) {
|
Some(MeResponse::Ack(confirm)) => {
|
||||||
flags |= RPC_FLAG_NOT_ENCRYPTED;
|
trace!(conn_id, confirm, "ME->C quickack");
|
||||||
|
write_client_ack(&mut writer, proto_tag, confirm).await?;
|
||||||
|
}
|
||||||
|
Some(MeResponse::Close) => {
|
||||||
|
debug!(conn_id, "ME sent close");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
debug!(conn_id, "ME channel closed");
|
||||||
|
return Err(ProxyError::Proxy("ME connection lost".into()));
|
||||||
}
|
}
|
||||||
me_pool.send_proxy_req(
|
|
||||||
conn_id,
|
|
||||||
success.dc_idx,
|
|
||||||
peer,
|
|
||||||
translated_local_addr,
|
|
||||||
&payload,
|
|
||||||
flags,
|
|
||||||
).await?;
|
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
|
||||||
debug!(conn_id, "Client EOF");
|
|
||||||
let _ = me_pool.send_close(conn_id).await;
|
|
||||||
break Ok(());
|
|
||||||
}
|
|
||||||
Err(e) => break Err(e),
|
|
||||||
}
|
}
|
||||||
}
|
_ = &mut stop_rx => {
|
||||||
me_msg = me_rx.recv() => {
|
debug!(conn_id, "ME writer stop signal");
|
||||||
match me_msg {
|
return Ok(());
|
||||||
Some(MeResponse::Data { flags, data }) => {
|
|
||||||
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
|
|
||||||
stats.add_user_octets_to(&user, data.len() as u64);
|
|
||||||
write_client_payload(&mut crypto_writer, proto_tag, flags, &data, rng.as_ref()).await?;
|
|
||||||
}
|
|
||||||
Some(MeResponse::Ack(confirm)) => {
|
|
||||||
trace!(conn_id, confirm, "ME->C quickack");
|
|
||||||
write_client_ack(&mut crypto_writer, proto_tag, confirm).await?;
|
|
||||||
}
|
|
||||||
Some(MeResponse::Close) => {
|
|
||||||
debug!(conn_id, "ME sent close");
|
|
||||||
break Ok(());
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
debug!(conn_id, "ME channel closed");
|
|
||||||
break Err(ProxyError::Proxy("ME connection lost".into()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut main_result: Result<()> = Ok(());
|
||||||
|
loop {
|
||||||
|
match read_client_payload(&mut crypto_reader, proto_tag, frame_limit, &user).await {
|
||||||
|
Ok(Some((payload, quickack))) => {
|
||||||
|
trace!(conn_id, bytes = payload.len(), "C->ME frame");
|
||||||
|
stats.add_user_octets_from(&user, payload.len() as u64);
|
||||||
|
let mut flags = proto_flags;
|
||||||
|
if quickack {
|
||||||
|
flags |= RPC_FLAG_QUICKACK;
|
||||||
|
}
|
||||||
|
if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) {
|
||||||
|
flags |= RPC_FLAG_NOT_ENCRYPTED;
|
||||||
|
}
|
||||||
|
if let Err(e) = me_pool.send_proxy_req(
|
||||||
|
conn_id,
|
||||||
|
success.dc_idx,
|
||||||
|
peer,
|
||||||
|
translated_local_addr,
|
||||||
|
&payload,
|
||||||
|
flags,
|
||||||
|
).await {
|
||||||
|
main_result = Err(e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
debug!(conn_id, "Client EOF");
|
||||||
|
let _ = me_pool.send_close(conn_id).await;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
main_result = Err(e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = stop_tx.send(());
|
||||||
|
let writer_result = me_writer.await.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}"))));
|
||||||
|
|
||||||
|
let result = match (main_result, writer_result) {
|
||||||
|
(Ok(()), Ok(())) => Ok(()),
|
||||||
|
(Err(e), _) => Err(e),
|
||||||
|
(_, Err(e)) => Err(e),
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!(user = %user, conn_id, "ME relay cleanup");
|
debug!(user = %user, conn_id, "ME relay cleanup");
|
||||||
|
|
@ -120,6 +152,8 @@ where
|
||||||
async fn read_client_payload<R>(
|
async fn read_client_payload<R>(
|
||||||
client_reader: &mut CryptoReader<R>,
|
client_reader: &mut CryptoReader<R>,
|
||||||
proto_tag: ProtoTag,
|
proto_tag: ProtoTag,
|
||||||
|
max_frame: usize,
|
||||||
|
user: &str,
|
||||||
) -> Result<Option<(Vec<u8>, bool)>>
|
) -> Result<Option<(Vec<u8>, bool)>>
|
||||||
where
|
where
|
||||||
R: AsyncRead + Unpin + Send + 'static,
|
R: AsyncRead + Unpin + Send + 'static,
|
||||||
|
|
@ -162,8 +196,15 @@ where
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if len > 16 * 1024 * 1024 {
|
if len > max_frame {
|
||||||
return Err(ProxyError::Proxy(format!("Frame too large: {len}")));
|
warn!(
|
||||||
|
user = %user,
|
||||||
|
raw_len = len,
|
||||||
|
raw_len_hex = format_args!("0x{:08x}", len),
|
||||||
|
proto = ?proto_tag,
|
||||||
|
"Frame too large — possible crypto desync or TLS record error"
|
||||||
|
);
|
||||||
|
return Err(ProxyError::Proxy(format!("Frame too large: {len} (max {max_frame})")));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut payload = vec![0u8; len];
|
let mut payload = vec![0u8; len];
|
||||||
|
|
@ -237,7 +278,7 @@ where
|
||||||
}
|
}
|
||||||
ProtoTag::Intermediate | ProtoTag::Secure => {
|
ProtoTag::Intermediate | ProtoTag::Secure => {
|
||||||
let padding_len = if proto_tag == ProtoTag::Secure {
|
let padding_len = if proto_tag == ProtoTag::Secure {
|
||||||
(rng.bytes(1)[0] % 4) as usize
|
secure_padding_len(data.len(), rng)
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@
|
||||||
//! └────────────────────────────────────────┘
|
//! └────────────────────────────────────────┘
|
||||||
//!
|
//!
|
||||||
//! Backpressure
|
//! Backpressure
|
||||||
//! - pending ciphertext buffer is bounded (MAX_PENDING_WRITE)
|
//! - pending ciphertext buffer is bounded (configurable per connection)
|
||||||
//! - pending is full and upstream is pending
|
//! - pending is full and upstream is pending
|
||||||
//! -> poll_write returns Poll::Pending
|
//! -> poll_write returns Poll::Pending
|
||||||
//! -> do not accept any plaintext
|
//! -> do not accept any plaintext
|
||||||
|
|
@ -62,10 +62,9 @@ use super::state::{StreamState, YieldBuffer};
|
||||||
|
|
||||||
// ============= Constants =============
|
// ============= Constants =============
|
||||||
|
|
||||||
/// Maximum size for pending ciphertext buffer (bounded backpressure).
|
/// Default size for pending ciphertext buffer (bounded backpressure).
|
||||||
/// Reduced to 64KB to prevent bufferbloat on mobile networks.
|
/// Actual limit is supplied at runtime from configuration.
|
||||||
/// 512KB was causing high latency on 3G/LTE connections.
|
const DEFAULT_MAX_PENDING_WRITE: usize = 64 * 1024;
|
||||||
const MAX_PENDING_WRITE: usize = 64 * 1024;
|
|
||||||
|
|
||||||
/// Default read buffer capacity (reader mostly decrypts in-place into caller buffer).
|
/// Default read buffer capacity (reader mostly decrypts in-place into caller buffer).
|
||||||
const DEFAULT_READ_CAPACITY: usize = 16 * 1024;
|
const DEFAULT_READ_CAPACITY: usize = 16 * 1024;
|
||||||
|
|
@ -427,15 +426,22 @@ pub struct CryptoWriter<W> {
|
||||||
encryptor: AesCtr,
|
encryptor: AesCtr,
|
||||||
state: CryptoWriterState,
|
state: CryptoWriterState,
|
||||||
scratch: BytesMut,
|
scratch: BytesMut,
|
||||||
|
max_pending_write: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<W> CryptoWriter<W> {
|
impl<W> CryptoWriter<W> {
|
||||||
pub fn new(upstream: W, encryptor: AesCtr) -> Self {
|
pub fn new(upstream: W, encryptor: AesCtr, max_pending_write: usize) -> Self {
|
||||||
|
let max_pending = if max_pending_write == 0 {
|
||||||
|
DEFAULT_MAX_PENDING_WRITE
|
||||||
|
} else {
|
||||||
|
max_pending_write
|
||||||
|
};
|
||||||
Self {
|
Self {
|
||||||
upstream,
|
upstream,
|
||||||
encryptor,
|
encryptor,
|
||||||
state: CryptoWriterState::Idle,
|
state: CryptoWriterState::Idle,
|
||||||
scratch: BytesMut::with_capacity(16 * 1024),
|
scratch: BytesMut::with_capacity(16 * 1024),
|
||||||
|
max_pending_write: max_pending.max(4 * 1024),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -484,10 +490,10 @@ impl<W> CryptoWriter<W> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ensure we are in Flushing state and return mutable pending buffer.
|
/// Ensure we are in Flushing state and return mutable pending buffer.
|
||||||
fn ensure_pending<'a>(state: &'a mut CryptoWriterState) -> &'a mut PendingCiphertext {
|
fn ensure_pending<'a>(state: &'a mut CryptoWriterState, max_pending: usize) -> &'a mut PendingCiphertext {
|
||||||
if matches!(state, CryptoWriterState::Idle) {
|
if matches!(state, CryptoWriterState::Idle) {
|
||||||
*state = CryptoWriterState::Flushing {
|
*state = CryptoWriterState::Flushing {
|
||||||
pending: PendingCiphertext::new(MAX_PENDING_WRITE),
|
pending: PendingCiphertext::new(max_pending),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -498,14 +504,14 @@ impl<W> CryptoWriter<W> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Select how many plaintext bytes can be accepted in buffering path
|
/// Select how many plaintext bytes can be accepted in buffering path
|
||||||
fn select_to_accept_for_buffering(state: &CryptoWriterState, buf_len: usize) -> usize {
|
fn select_to_accept_for_buffering(state: &CryptoWriterState, buf_len: usize, max_pending: usize) -> usize {
|
||||||
if buf_len == 0 {
|
if buf_len == 0 {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
match state {
|
match state {
|
||||||
CryptoWriterState::Flushing { pending } => buf_len.min(pending.remaining_capacity()),
|
CryptoWriterState::Flushing { pending } => buf_len.min(pending.remaining_capacity()),
|
||||||
CryptoWriterState::Idle => buf_len.min(MAX_PENDING_WRITE),
|
CryptoWriterState::Idle => buf_len.min(max_pending),
|
||||||
CryptoWriterState::Poisoned { .. } => 0,
|
CryptoWriterState::Poisoned { .. } => 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -603,7 +609,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
// Upstream blocked. Apply ideal backpressure
|
// Upstream blocked. Apply ideal backpressure
|
||||||
let to_accept =
|
let to_accept =
|
||||||
Self::select_to_accept_for_buffering(&this.state, buf.len());
|
Self::select_to_accept_for_buffering(&this.state, buf.len(), this.max_pending_write);
|
||||||
|
|
||||||
if to_accept == 0 {
|
if to_accept == 0 {
|
||||||
trace!(
|
trace!(
|
||||||
|
|
@ -618,7 +624,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
|
||||||
|
|
||||||
// Disjoint borrows
|
// Disjoint borrows
|
||||||
let encryptor = &mut this.encryptor;
|
let encryptor = &mut this.encryptor;
|
||||||
let pending = Self::ensure_pending(&mut this.state);
|
let pending = Self::ensure_pending(&mut this.state, this.max_pending_write);
|
||||||
|
|
||||||
if let Err(e) = pending.push_encrypted(encryptor, plaintext) {
|
if let Err(e) = pending.push_encrypted(encryptor, plaintext) {
|
||||||
if e.kind() == ErrorKind::WouldBlock {
|
if e.kind() == ErrorKind::WouldBlock {
|
||||||
|
|
@ -635,7 +641,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
|
||||||
// 2) Fast path: pending empty -> write-through
|
// 2) Fast path: pending empty -> write-through
|
||||||
debug_assert!(matches!(this.state, CryptoWriterState::Idle));
|
debug_assert!(matches!(this.state, CryptoWriterState::Idle));
|
||||||
|
|
||||||
let to_accept = buf.len().min(MAX_PENDING_WRITE);
|
let to_accept = buf.len().min(this.max_pending_write);
|
||||||
let plaintext = &buf[..to_accept];
|
let plaintext = &buf[..to_accept];
|
||||||
|
|
||||||
Self::encrypt_into_scratch(&mut this.encryptor, &mut this.scratch, plaintext);
|
Self::encrypt_into_scratch(&mut this.encryptor, &mut this.scratch, plaintext);
|
||||||
|
|
@ -645,7 +651,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
|
||||||
// Upstream blocked: buffer FULL ciphertext for accepted bytes.
|
// Upstream blocked: buffer FULL ciphertext for accepted bytes.
|
||||||
let ciphertext = std::mem::take(&mut this.scratch);
|
let ciphertext = std::mem::take(&mut this.scratch);
|
||||||
|
|
||||||
let pending = Self::ensure_pending(&mut this.state);
|
let pending = Self::ensure_pending(&mut this.state, this.max_pending_write);
|
||||||
pending.replace_with(ciphertext);
|
pending.replace_with(ciphertext);
|
||||||
|
|
||||||
Poll::Ready(Ok(to_accept))
|
Poll::Ready(Ok(to_accept))
|
||||||
|
|
@ -672,7 +678,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
|
||||||
let remainder = this.scratch.split_off(n);
|
let remainder = this.scratch.split_off(n);
|
||||||
this.scratch.clear();
|
this.scratch.clear();
|
||||||
|
|
||||||
let pending = Self::ensure_pending(&mut this.state);
|
let pending = Self::ensure_pending(&mut this.state, this.max_pending_write);
|
||||||
pending.replace_with(remainder);
|
pending.replace_with(remainder);
|
||||||
|
|
||||||
Poll::Ready(Ok(to_accept))
|
Poll::Ready(Ok(to_accept))
|
||||||
|
|
|
||||||
|
|
@ -267,8 +267,8 @@ impl<W: AsyncWrite + Unpin> SecureIntermediateFrameWriter<W> {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add random padding (0-3 bytes)
|
// Add padding so total length is never divisible by 4 (MTProto Secure)
|
||||||
let padding_len = self.rng.range(4);
|
let padding_len = secure_padding_len(data.len(), &self.rng);
|
||||||
let padding = self.rng.bytes(padding_len);
|
let padding = self.rng.bytes(padding_len);
|
||||||
|
|
||||||
let total_len = data.len() + padding_len;
|
let total_len = data.len() + padding_len;
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,8 @@
|
||||||
//! - However, the on-the-wire record length can exceed 16384 because TLS 1.3
|
//! - However, the on-the-wire record length can exceed 16384 because TLS 1.3
|
||||||
//! uses AEAD and can include tag/overhead/padding.
|
//! uses AEAD and can include tag/overhead/padding.
|
||||||
//! - Telegram FakeTLS clients (notably iOS) may send Application Data records
|
//! - Telegram FakeTLS clients (notably iOS) may send Application Data records
|
||||||
//! with length up to 16384 + 24 bytes. We accept that as MAX_TLS_CHUNK_SIZE.
|
//! with length up to 16384 + 256 bytes (RFC 8446 §5.2). We accept that as
|
||||||
|
//! MAX_TLS_CHUNK_SIZE.
|
||||||
//!
|
//!
|
||||||
//! If you reject those (e.g. validate length <= 16384), you will see errors like:
|
//! If you reject those (e.g. validate length <= 16384), you will see errors like:
|
||||||
//! "TLS record too large: 16408 bytes"
|
//! "TLS record too large: 16408 bytes"
|
||||||
|
|
@ -52,9 +53,8 @@ use super::state::{StreamState, HeaderBuffer, YieldBuffer, WriteBuffer};
|
||||||
const TLS_HEADER_SIZE: usize = 5;
|
const TLS_HEADER_SIZE: usize = 5;
|
||||||
|
|
||||||
/// Maximum TLS fragment size we emit for Application Data.
|
/// Maximum TLS fragment size we emit for Application Data.
|
||||||
/// Real TLS 1.3 ciphertexts often add ~16-24 bytes AEAD overhead, so to mimic
|
/// Real TLS 1.3 allows up to 16384 + 256 bytes of ciphertext (incl. tag).
|
||||||
/// on-the-wire record sizes we allow up to 16384 + 24 bytes of plaintext.
|
const MAX_TLS_PAYLOAD: usize = 16384 + 256;
|
||||||
const MAX_TLS_PAYLOAD: usize = 16384 + 24;
|
|
||||||
|
|
||||||
/// Maximum pending write buffer for one record remainder.
|
/// Maximum pending write buffer for one record remainder.
|
||||||
/// Note: we never queue unlimited amount of data here; state holds at most one record.
|
/// Note: we never queue unlimited amount of data here; state holds at most one record.
|
||||||
|
|
@ -91,7 +91,7 @@ impl TlsRecordHeader {
|
||||||
/// - We accept TLS 1.0 header version for ClientHello-like records (0x03 0x01),
|
/// - We accept TLS 1.0 header version for ClientHello-like records (0x03 0x01),
|
||||||
/// and TLS 1.2/1.3 style version bytes for the rest (we use TLS_VERSION = 0x03 0x03).
|
/// and TLS 1.2/1.3 style version bytes for the rest (we use TLS_VERSION = 0x03 0x03).
|
||||||
/// - For Application Data, Telegram FakeTLS may send payload length up to
|
/// - For Application Data, Telegram FakeTLS may send payload length up to
|
||||||
/// MAX_TLS_CHUNK_SIZE (16384 + 24).
|
/// MAX_TLS_CHUNK_SIZE (16384 + 256).
|
||||||
/// - For other record types we keep stricter bounds to avoid memory abuse.
|
/// - For other record types we keep stricter bounds to avoid memory abuse.
|
||||||
fn validate(&self) -> Result<()> {
|
fn validate(&self) -> Result<()> {
|
||||||
// Version: accept TLS 1.0 header (ClientHello quirk) and TLS_VERSION (0x0303).
|
// Version: accept TLS 1.0 header (ClientHello quirk) and TLS_VERSION (0x0303).
|
||||||
|
|
@ -105,7 +105,7 @@ impl TlsRecordHeader {
|
||||||
let len = self.length as usize;
|
let len = self.length as usize;
|
||||||
|
|
||||||
// Length checks depend on record type.
|
// Length checks depend on record type.
|
||||||
// Telegram FakeTLS: ApplicationData length may be 16384 + 24.
|
// Telegram FakeTLS: ApplicationData length may be 16384 + 256.
|
||||||
match self.record_type {
|
match self.record_type {
|
||||||
TLS_RECORD_APPLICATION => {
|
TLS_RECORD_APPLICATION => {
|
||||||
if len > MAX_TLS_CHUNK_SIZE {
|
if len > MAX_TLS_CHUNK_SIZE {
|
||||||
|
|
@ -755,9 +755,6 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for FakeTlsWriter<W> {
|
||||||
payload_size: chunk_size,
|
payload_size: chunk_size,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Wake to retry flushing soon.
|
|
||||||
cx.waker().wake_by_ref();
|
|
||||||
|
|
||||||
Poll::Ready(Ok(chunk_size))
|
Poll::Ready(Ok(chunk_size))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -72,7 +72,27 @@ impl TlsFrontCache {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if let Ok(data) = tokio::fs::read(entry.path()).await {
|
if let Ok(data) = tokio::fs::read(entry.path()).await {
|
||||||
if let Ok(cached) = serde_json::from_slice::<CachedTlsData>(&data) {
|
if let Ok(mut cached) = serde_json::from_slice::<CachedTlsData>(&data) {
|
||||||
|
if cached.domain.is_empty()
|
||||||
|
|| cached.domain.len() > 255
|
||||||
|
|| !cached.domain.chars().all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-')
|
||||||
|
{
|
||||||
|
warn!(file = %name, "Skipping TLS cache entry with invalid domain");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// fetched_at is skipped during deserialization; approximate with file mtime if available.
|
||||||
|
if let Ok(meta) = entry.metadata().await {
|
||||||
|
if let Ok(modified) = meta.modified() {
|
||||||
|
cached.fetched_at = modified;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Drop entries older than 72h
|
||||||
|
if let Ok(age) = cached.fetched_at.elapsed() {
|
||||||
|
if age > Duration::from_secs(72 * 3600) {
|
||||||
|
warn!(domain = %cached.domain, "Skipping stale TLS cache entry (>72h)");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
let domain = cached.domain.clone();
|
let domain = cached.domain.clone();
|
||||||
self.set(&domain, cached).await;
|
self.set(&domain, cached).await;
|
||||||
loaded += 1;
|
loaded += 1;
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,8 @@ pub fn build_emulated_server_hello(
|
||||||
session_id: &[u8],
|
session_id: &[u8],
|
||||||
cached: &CachedTlsData,
|
cached: &CachedTlsData,
|
||||||
rng: &SecureRandom,
|
rng: &SecureRandom,
|
||||||
|
alpn: Option<Vec<u8>>,
|
||||||
|
new_session_tickets: u8,
|
||||||
) -> Vec<u8> {
|
) -> Vec<u8> {
|
||||||
// --- ServerHello ---
|
// --- ServerHello ---
|
||||||
let mut extensions = Vec::new();
|
let mut extensions = Vec::new();
|
||||||
|
|
@ -48,6 +50,15 @@ pub fn build_emulated_server_hello(
|
||||||
extensions.extend_from_slice(&0x002bu16.to_be_bytes());
|
extensions.extend_from_slice(&0x002bu16.to_be_bytes());
|
||||||
extensions.extend_from_slice(&(2u16).to_be_bytes());
|
extensions.extend_from_slice(&(2u16).to_be_bytes());
|
||||||
extensions.extend_from_slice(&0x0304u16.to_be_bytes());
|
extensions.extend_from_slice(&0x0304u16.to_be_bytes());
|
||||||
|
if let Some(alpn_proto) = &alpn {
|
||||||
|
extensions.extend_from_slice(&0x0010u16.to_be_bytes());
|
||||||
|
let list_len: u16 = 1 + alpn_proto.len() as u16;
|
||||||
|
let ext_len: u16 = 2 + list_len;
|
||||||
|
extensions.extend_from_slice(&ext_len.to_be_bytes());
|
||||||
|
extensions.extend_from_slice(&list_len.to_be_bytes());
|
||||||
|
extensions.push(alpn_proto.len() as u8);
|
||||||
|
extensions.extend_from_slice(alpn_proto);
|
||||||
|
}
|
||||||
|
|
||||||
let extensions_len = extensions.len() as u16;
|
let extensions_len = extensions.len() as u16;
|
||||||
|
|
||||||
|
|
@ -118,10 +129,25 @@ pub fn build_emulated_server_hello(
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Combine ---
|
// --- Combine ---
|
||||||
let mut response = Vec::with_capacity(server_hello.len() + change_cipher_spec.len() + app_data.len());
|
// Optional NewSessionTicket mimic records (opaque ApplicationData for fingerprint).
|
||||||
|
let mut tickets = Vec::new();
|
||||||
|
if new_session_tickets > 0 {
|
||||||
|
for _ in 0..new_session_tickets {
|
||||||
|
let ticket_len: usize = rng.range(48) + 48;
|
||||||
|
let mut rec = Vec::with_capacity(5 + ticket_len);
|
||||||
|
rec.push(TLS_RECORD_APPLICATION);
|
||||||
|
rec.extend_from_slice(&TLS_VERSION);
|
||||||
|
rec.extend_from_slice(&(ticket_len as u16).to_be_bytes());
|
||||||
|
rec.extend_from_slice(&rng.bytes(ticket_len));
|
||||||
|
tickets.extend_from_slice(&rec);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut response = Vec::with_capacity(server_hello.len() + change_cipher_spec.len() + app_data.len() + tickets.len());
|
||||||
response.extend_from_slice(&server_hello);
|
response.extend_from_slice(&server_hello);
|
||||||
response.extend_from_slice(&change_cipher_spec);
|
response.extend_from_slice(&change_cipher_spec);
|
||||||
response.extend_from_slice(&app_data);
|
response.extend_from_slice(&app_data);
|
||||||
|
response.extend_from_slice(&tickets);
|
||||||
|
|
||||||
// --- HMAC ---
|
// --- HMAC ---
|
||||||
let mut hmac_input = Vec::with_capacity(TLS_DIGEST_LEN + response.len());
|
let mut hmac_input = Vec::with_capacity(TLS_DIGEST_LEN + response.len());
|
||||||
|
|
|
||||||
|
|
@ -567,12 +567,14 @@ impl MePool {
|
||||||
let cancel_keepalive = cancel_keepalive_token;
|
let cancel_keepalive = cancel_keepalive_token;
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// Per-writer jittered start to avoid phase sync.
|
// Per-writer jittered start to avoid phase sync.
|
||||||
let initial_jitter_ms = rand::rng().random_range(0..=keepalive_jitter.as_millis().max(1) as u64);
|
let jitter_cap_ms = keepalive_interval.as_millis() / 2;
|
||||||
|
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
|
||||||
|
let initial_jitter_ms = rand::rng().random_range(0..=effective_jitter_ms as u64);
|
||||||
tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await;
|
tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await;
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = cancel_keepalive.cancelled() => break,
|
_ = cancel_keepalive.cancelled() => break,
|
||||||
_ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=keepalive_jitter.as_millis() as u64))) => {}
|
_ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))) => {}
|
||||||
}
|
}
|
||||||
if tx_keepalive.send(WriterCommand::Keepalive).await.is_err() {
|
if tx_keepalive.send(WriterCommand::Keepalive).await.is_err() {
|
||||||
break;
|
break;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue