telemt/src/transport/middle_proxy/handshake.rs

800 lines
29 KiB
Rust

#[cfg(target_os = "linux")]
use libc;
use socket2::{SockRef, TcpKeepalive};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::net::{IpAddr, SocketAddr};
#[cfg(target_os = "linux")]
use std::os::fd::{AsRawFd, RawFd};
#[cfg(target_os = "linux")]
use std::os::raw::c_int;
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
use bytes::BytesMut;
use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
use tokio::net::{TcpSocket, TcpStream};
use tokio::time::timeout;
use tracing::{debug, info, warn};
use crate::config::MeSocksKdfPolicy;
use crate::crypto::{SecureRandom, build_middleproxy_prekey, derive_middleproxy_keys, sha256};
use crate::error::{ProxyError, Result};
use crate::network::IpFamily;
use crate::network::probe::is_bogon;
use crate::protocol::constants::{
ME_CONNECT_TIMEOUT_SECS, ME_HANDSHAKE_TIMEOUT_SECS, RPC_CRYPTO_AES_U32,
RPC_HANDSHAKE_ERROR_U32, rpc_crypto_flags,
};
use crate::transport::{UpstreamEgressInfo, UpstreamRouteKind};
use super::MePool;
use super::codec::{
RpcChecksumMode, build_handshake_payload, build_nonce_payload, build_rpc_frame,
cbc_decrypt_inplace, cbc_encrypt_padded, parse_handshake_flags, parse_nonce_payload,
read_rpc_frame_plaintext, rpc_crc,
};
use super::selftest::{
BndAddrStatus, BndPortStatus, record_bnd_status, record_upstream_bnd_status,
};
use super::wire::{IpMaterial, extract_ip_material};
const ME_KDF_DRIFT_STRICT: bool = false;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
enum KdfClientPortSource {
LocalSocket = 0,
SocksBound = 1,
}
impl KdfClientPortSource {
fn from_socks_bound_port(socks_bound_port: Option<u16>) -> Self {
if socks_bound_port.is_some() {
Self::SocksBound
} else {
Self::LocalSocket
}
}
}
/// Result of a successful ME handshake with timings.
pub(crate) struct HandshakeOutput {
pub rd: ReadHalf<TcpStream>,
pub wr: WriteHalf<TcpStream>,
pub source_ip: IpAddr,
pub read_key: [u8; 32],
pub read_iv: [u8; 16],
pub write_key: [u8; 32],
pub write_iv: [u8; 16],
pub crc_mode: RpcChecksumMode,
pub handshake_ms: f64,
}
impl MePool {
fn kdf_material_fingerprint(
local_ip_nat: IpAddr,
peer_addr_nat: SocketAddr,
reflected_ip: Option<IpAddr>,
socks_bound_ip: Option<IpAddr>,
client_port_source: KdfClientPortSource,
) -> u64 {
let mut hasher = DefaultHasher::new();
local_ip_nat.hash(&mut hasher);
peer_addr_nat.hash(&mut hasher);
reflected_ip.hash(&mut hasher);
socks_bound_ip.hash(&mut hasher);
client_port_source.hash(&mut hasher);
hasher.finish()
}
async fn resolve_dc_idx_for_endpoint(&self, addr: SocketAddr) -> Option<i16> {
i16::try_from(self.resolve_dc_for_endpoint(addr).await).ok()
}
fn direct_bind_ip_for_stun(
family: IpFamily,
upstream_egress: Option<UpstreamEgressInfo>,
) -> Option<IpAddr> {
let info = upstream_egress?;
if info.route_kind != UpstreamRouteKind::Direct {
return None;
}
match (family, info.direct_bind_ip) {
(IpFamily::V4, Some(IpAddr::V4(ip))) => Some(IpAddr::V4(ip)),
(IpFamily::V6, Some(IpAddr::V6(ip))) => Some(IpAddr::V6(ip)),
_ => None,
}
}
fn select_socks_bound_addr(
family: IpFamily,
upstream_egress: Option<UpstreamEgressInfo>,
) -> Option<SocketAddr> {
let info = upstream_egress?;
if !matches!(
info.route_kind,
UpstreamRouteKind::Socks4 | UpstreamRouteKind::Socks5
) {
return None;
}
let bound = info.socks_bound_addr?;
let family_matches = matches!(
(family, bound.ip()),
(IpFamily::V4, IpAddr::V4(_)) | (IpFamily::V6, IpAddr::V6(_))
);
if !family_matches || is_bogon(bound.ip()) || bound.ip().is_unspecified() {
return None;
}
Some(bound)
}
fn is_socks_route(upstream_egress: Option<UpstreamEgressInfo>) -> bool {
matches!(
upstream_egress.map(|info| info.route_kind),
Some(UpstreamRouteKind::Socks4 | UpstreamRouteKind::Socks5)
)
}
fn bnd_port_status(bound: Option<SocketAddr>) -> BndPortStatus {
match bound {
Some(addr) if addr.port() == 0 => BndPortStatus::Zero,
Some(_) => BndPortStatus::Ok,
None => BndPortStatus::Error,
}
}
/// TCP connect with timeout + return RTT in milliseconds.
pub(crate) async fn connect_tcp(
&self,
addr: SocketAddr,
dc_idx_override: Option<i16>,
) -> Result<(TcpStream, f64, Option<UpstreamEgressInfo>)> {
let start = Instant::now();
let (stream, upstream_egress) = if let Some(upstream) = &self.upstream {
let dc_idx = if let Some(dc_idx) = dc_idx_override {
Some(dc_idx)
} else {
self.resolve_dc_idx_for_endpoint(addr).await
};
let (stream, egress) = upstream.connect_with_details(addr, dc_idx, None).await?;
(stream, Some(egress))
} else {
let connect_fut = async {
if addr.is_ipv6()
&& let Some(v6) = self.nat_runtime.detected_ipv6
{
match TcpSocket::new_v6() {
Ok(sock) => {
if let Err(e) = sock.bind(SocketAddr::new(IpAddr::V6(v6), 0)) {
debug!(error = %e, bind_ip = %v6, "ME IPv6 bind failed, falling back to default bind");
} else {
match sock.connect(addr).await {
Ok(stream) => return Ok(stream),
Err(e) => {
debug!(error = %e, target = %addr, "ME IPv6 bound connect failed, retrying default connect")
}
}
}
}
Err(e) => {
debug!(error = %e, "ME IPv6 socket creation failed, falling back to default connect")
}
}
}
TcpStream::connect(addr).await
};
let stream = timeout(Duration::from_secs(ME_CONNECT_TIMEOUT_SECS), connect_fut)
.await
.map_err(|_| ProxyError::ConnectionTimeout {
addr: addr.to_string(),
})??;
(stream, None)
};
let connect_ms = start.elapsed().as_secs_f64() * 1000.0;
stream.set_nodelay(true).ok();
if let Err(e) = Self::configure_keepalive(&stream) {
warn!(error = %e, "ME keepalive setup failed");
}
#[cfg(target_os = "linux")]
if let Err(e) = Self::configure_user_timeout(stream.as_raw_fd()) {
warn!(error = %e, "ME TCP_USER_TIMEOUT setup failed");
}
Ok((stream, connect_ms, upstream_egress))
}
fn configure_keepalive(stream: &TcpStream) -> std::io::Result<()> {
let sock = SockRef::from(stream);
let ka = TcpKeepalive::new().with_time(Duration::from_secs(30));
// Mirror socket2 v0.5.10 target gate for with_retries(), the stricter method.
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "illumos",
target_os = "ios",
target_os = "visionos",
target_os = "linux",
target_os = "macos",
target_os = "netbsd",
target_os = "tvos",
target_os = "watchos",
target_os = "cygwin",
))]
let ka = ka.with_interval(Duration::from_secs(10)).with_retries(3);
sock.set_tcp_keepalive(&ka)?;
sock.set_keepalive(true)?;
Ok(())
}
#[cfg(target_os = "linux")]
fn configure_user_timeout(fd: RawFd) -> std::io::Result<()> {
let timeout_ms: c_int = 30_000;
let rc = unsafe {
libc::setsockopt(
fd,
libc::IPPROTO_TCP,
libc::TCP_USER_TIMEOUT,
&timeout_ms as *const _ as *const libc::c_void,
std::mem::size_of_val(&timeout_ms) as libc::socklen_t,
)
};
if rc != 0 {
return Err(std::io::Error::last_os_error());
}
Ok(())
}
/// Perform full ME RPC handshake on an established TCP stream.
/// Returns cipher keys/ivs and split halves; does not register writer.
pub(crate) async fn handshake_only(
&self,
stream: TcpStream,
addr: SocketAddr,
upstream_egress: Option<UpstreamEgressInfo>,
rng: &SecureRandom,
) -> Result<HandshakeOutput> {
let hs_start = Instant::now();
let local_addr = stream.local_addr().map_err(ProxyError::Io)?;
let transport_peer_addr = stream.peer_addr().map_err(ProxyError::Io)?;
let peer_addr = addr;
let _ = self.maybe_detect_nat_ip(local_addr.ip()).await;
let family = if local_addr.ip().is_ipv4() {
IpFamily::V4
} else {
IpFamily::V6
};
let is_socks_route = Self::is_socks_route(upstream_egress);
let raw_socks_bound_addr = if is_socks_route {
upstream_egress.and_then(|info| info.socks_bound_addr)
} else {
None
};
let socks_bound_addr = Self::select_socks_bound_addr(family, upstream_egress);
let bnd_addr_status = if !is_socks_route {
BndAddrStatus::Error
} else if raw_socks_bound_addr.is_some() && socks_bound_addr.is_none() {
BndAddrStatus::Bogon
} else if socks_bound_addr.is_some() {
BndAddrStatus::Ok
} else {
BndAddrStatus::Error
};
let bnd_port_status = if is_socks_route {
Self::bnd_port_status(raw_socks_bound_addr)
} else {
BndPortStatus::Error
};
record_bnd_status(bnd_addr_status, bnd_port_status, raw_socks_bound_addr);
let reflected = if let Some(bound) = socks_bound_addr {
Some(bound)
} else if is_socks_route {
match self.socks_kdf_policy() {
MeSocksKdfPolicy::Strict => {
self.stats.increment_me_socks_kdf_strict_reject();
return Err(ProxyError::InvalidHandshake(
"SOCKS route returned no valid BND.ADDR for ME KDF (strict policy)"
.to_string(),
));
}
MeSocksKdfPolicy::Compat => {
self.stats.increment_me_socks_kdf_compat_fallback();
if self.nat_runtime.nat_probe {
let bind_ip = Self::direct_bind_ip_for_stun(family, upstream_egress);
self.maybe_reflect_public_addr(family, bind_ip).await
} else {
None
}
}
}
} else if self.nat_runtime.nat_probe {
let bind_ip = Self::direct_bind_ip_for_stun(family, upstream_egress);
self.maybe_reflect_public_addr(family, bind_ip).await
} else {
None
};
let local_addr_nat = self.translate_our_addr_with_reflection(local_addr, reflected);
let peer_addr_nat =
SocketAddr::new(self.translate_ip_for_nat(peer_addr.ip()), peer_addr.port());
if let Some(upstream_info) = upstream_egress {
let client_ip_for_kdf = socks_bound_addr
.map(|value| value.ip())
.unwrap_or(local_addr_nat.ip());
record_upstream_bnd_status(
upstream_info.upstream_id,
bnd_addr_status,
bnd_port_status,
raw_socks_bound_addr,
Some(client_ip_for_kdf),
);
}
let (mut rd, mut wr) = tokio::io::split(stream);
let my_nonce: [u8; 16] = rng.bytes(16).try_into().unwrap();
let crypto_ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as u32;
let secret_atomic_snapshot = self
.writer_selection_policy
.secret_atomic_snapshot
.load(Ordering::Relaxed);
let (ks, secret) = if secret_atomic_snapshot {
let snapshot = self.secret_snapshot().await;
(snapshot.key_selector, snapshot.secret)
} else {
// Backward-compatible mode: key selector and secret may come from different updates.
let key_selector = self.key_selector().await;
let secret = self.secret_snapshot().await.secret;
(key_selector, secret)
};
let nonce_payload = build_nonce_payload(ks, crypto_ts, &my_nonce);
let nonce_frame = build_rpc_frame(-2, &nonce_payload, RpcChecksumMode::Crc32);
let dump = hex_dump(&nonce_frame[..nonce_frame.len().min(44)]);
debug!(
key_selector = format_args!("0x{ks:08x}"),
crypto_ts,
frame_len = nonce_frame.len(),
nonce_frame_hex = %dump,
"Sending ME nonce frame"
);
wr.write_all(&nonce_frame).await.map_err(ProxyError::Io)?;
wr.flush().await.map_err(ProxyError::Io)?;
let (srv_seq, srv_nonce_payload) = timeout(
Duration::from_secs(ME_HANDSHAKE_TIMEOUT_SECS),
read_rpc_frame_plaintext(&mut rd),
)
.await
.map_err(|_| ProxyError::TgHandshakeTimeout)??;
if srv_seq != -2 {
return Err(ProxyError::InvalidHandshake(format!(
"Expected seq=-2, got {srv_seq}"
)));
}
let (srv_key_select, schema, srv_ts, srv_nonce) = parse_nonce_payload(&srv_nonce_payload)?;
if schema != RPC_CRYPTO_AES_U32 {
warn!(
schema = format_args!("0x{schema:08x}"),
"Unsupported ME crypto schema"
);
return Err(ProxyError::InvalidHandshake(format!(
"Unsupported crypto schema: 0x{schema:x}"
)));
}
if srv_key_select != ks {
return Err(ProxyError::InvalidHandshake(format!(
"Server key_select 0x{srv_key_select:08x} != client 0x{ks:08x}"
)));
}
let skew = crypto_ts.abs_diff(srv_ts);
if skew > 30 {
return Err(ProxyError::InvalidHandshake(format!(
"nonce crypto_ts skew too large: client={crypto_ts}, server={srv_ts}, skew={skew}s"
)));
}
info!(
%local_addr,
%local_addr_nat,
reflected_ip = reflected.map(|r| r.ip()).as_ref().map(ToString::to_string),
%peer_addr,
%transport_peer_addr,
%peer_addr_nat,
socks_bound_addr = socks_bound_addr.map(|v| v.to_string()),
key_selector = format_args!("0x{ks:08x}"),
crypto_schema = format_args!("0x{schema:08x}"),
skew_secs = skew,
"ME key derivation parameters"
);
let ts_bytes = crypto_ts.to_le_bytes();
let server_port_bytes = peer_addr_nat.port().to_le_bytes();
let socks_bound_port = socks_bound_addr
.map(|bound| bound.port())
.filter(|port| *port != 0);
let client_port_for_kdf = socks_bound_port.unwrap_or(local_addr_nat.port());
let client_port_source = KdfClientPortSource::from_socks_bound_port(socks_bound_port);
let kdf_fingerprint = Self::kdf_material_fingerprint(
local_addr_nat.ip(),
peer_addr_nat,
reflected.map(|value| value.ip()),
socks_bound_addr.map(|value| value.ip()),
client_port_source,
);
let previous_kdf_fingerprint = {
let kdf_fingerprint_guard = self.kdf_material_fingerprint.read().await;
kdf_fingerprint_guard.get(&peer_addr_nat).copied()
};
if let Some((prev_fingerprint, prev_client_port)) = previous_kdf_fingerprint {
if prev_fingerprint != kdf_fingerprint {
self.stats.increment_me_kdf_drift_total();
warn!(
%peer_addr_nat,
%local_addr_nat,
client_port_for_kdf,
client_port_source = ?client_port_source,
"ME KDF material drift detected for endpoint"
);
if ME_KDF_DRIFT_STRICT {
return Err(ProxyError::InvalidHandshake(
"ME KDF material drift detected (strict mode)".to_string(),
));
}
} else if prev_client_port != client_port_for_kdf {
self.stats.increment_me_kdf_port_only_drift_total();
debug!(
%peer_addr_nat,
previous_client_port_for_kdf = prev_client_port,
client_port_for_kdf,
client_port_source = ?client_port_source,
"ME KDF client port changed with stable material"
);
}
}
// Keep fingerprint updates eventually consistent for diagnostics while avoiding
// serializing all concurrent handshakes on a single async mutex.
let mut kdf_fingerprint_guard = self.kdf_material_fingerprint.write().await;
kdf_fingerprint_guard.insert(peer_addr_nat, (kdf_fingerprint, client_port_for_kdf));
drop(kdf_fingerprint_guard);
let client_port_bytes = client_port_for_kdf.to_le_bytes();
let server_ip = extract_ip_material(peer_addr_nat);
let client_ip = extract_ip_material(local_addr_nat);
let (srv_ip_opt, clt_ip_opt, clt_v6_opt, srv_v6_opt, hs_our_ip, hs_peer_ip) =
match (server_ip, client_ip) {
(IpMaterial::V4(mut srv), IpMaterial::V4(mut clt)) => {
srv.reverse();
clt.reverse();
(Some(srv), Some(clt), None, None, clt, srv)
}
(IpMaterial::V6(srv), IpMaterial::V6(clt)) => {
let zero = [0u8; 4];
(None, None, Some(clt), Some(srv), zero, zero)
}
_ => {
return Err(ProxyError::InvalidHandshake(
"mixed IPv4/IPv6 endpoints are not supported for ME key derivation"
.to_string(),
));
}
};
let diag_level: u8 = std::env::var("ME_DIAG")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(0);
let prekey_client = build_middleproxy_prekey(
&srv_nonce,
&my_nonce,
&ts_bytes,
srv_ip_opt.as_ref().map(|x| &x[..]),
&client_port_bytes,
b"CLIENT",
clt_ip_opt.as_ref().map(|x| &x[..]),
&server_port_bytes,
&secret,
clt_v6_opt.as_ref(),
srv_v6_opt.as_ref(),
);
let prekey_server = build_middleproxy_prekey(
&srv_nonce,
&my_nonce,
&ts_bytes,
srv_ip_opt.as_ref().map(|x| &x[..]),
&client_port_bytes,
b"SERVER",
clt_ip_opt.as_ref().map(|x| &x[..]),
&server_port_bytes,
&secret,
clt_v6_opt.as_ref(),
srv_v6_opt.as_ref(),
);
let (wk, wi) = derive_middleproxy_keys(
&srv_nonce,
&my_nonce,
&ts_bytes,
srv_ip_opt.as_ref().map(|x| &x[..]),
&client_port_bytes,
b"CLIENT",
clt_ip_opt.as_ref().map(|x| &x[..]),
&server_port_bytes,
&secret,
clt_v6_opt.as_ref(),
srv_v6_opt.as_ref(),
);
let (rk, ri) = derive_middleproxy_keys(
&srv_nonce,
&my_nonce,
&ts_bytes,
srv_ip_opt.as_ref().map(|x| &x[..]),
&client_port_bytes,
b"SERVER",
clt_ip_opt.as_ref().map(|x| &x[..]),
&server_port_bytes,
&secret,
clt_v6_opt.as_ref(),
srv_v6_opt.as_ref(),
);
let requested_crc_mode = RpcChecksumMode::Crc32c;
let hs_payload = build_handshake_payload(
hs_our_ip,
local_addr.port(),
hs_peer_ip,
peer_addr.port(),
requested_crc_mode.advertised_flags(),
);
let hs_frame = build_rpc_frame(-1, &hs_payload, RpcChecksumMode::Crc32);
if diag_level >= 1 {
info!(
write_key = %hex_dump(&wk),
write_iv = %hex_dump(&wi),
read_key = %hex_dump(&rk),
read_iv = %hex_dump(&ri),
srv_ip = %srv_ip_opt.map(|ip| hex_dump(&ip)).unwrap_or_default(),
clt_ip = %clt_ip_opt.map(|ip| hex_dump(&ip)).unwrap_or_default(),
srv_port = %hex_dump(&server_port_bytes),
clt_port = %hex_dump(&client_port_bytes),
crypto_ts = %hex_dump(&ts_bytes),
nonce_srv = %hex_dump(&srv_nonce),
nonce_clt = %hex_dump(&my_nonce),
prekey_sha256_client = %hex_dump(&sha256(&prekey_client)),
prekey_sha256_server = %hex_dump(&sha256(&prekey_server)),
hs_plain = %hex_dump(&hs_frame),
proxy_secret_sha256 = %hex_dump(&sha256(&secret)),
"ME diag: derived keys and handshake plaintext"
);
}
if diag_level >= 2 {
info!(
prekey_client = %hex_dump(&prekey_client),
prekey_server = %hex_dump(&prekey_server),
"ME diag: full prekey buffers"
);
}
let (encrypted_hs, write_iv) = cbc_encrypt_padded(&wk, &wi, &hs_frame)?;
if diag_level >= 1 {
info!(
hs_cipher = %hex_dump(&encrypted_hs),
"ME diag: handshake ciphertext"
);
}
wr.write_all(&encrypted_hs).await.map_err(ProxyError::Io)?;
wr.flush().await.map_err(ProxyError::Io)?;
let deadline = Instant::now() + Duration::from_secs(ME_HANDSHAKE_TIMEOUT_SECS);
let mut enc_buf = BytesMut::with_capacity(256);
let mut dec_buf = BytesMut::with_capacity(256);
let mut read_iv = ri;
let mut negotiated_crc_mode = RpcChecksumMode::Crc32;
let mut handshake_ok = false;
while Instant::now() < deadline && !handshake_ok {
let remaining = deadline - Instant::now();
let mut tmp = [0u8; 256];
let n = match timeout(remaining, rd.read(&mut tmp)).await {
Ok(Ok(0)) => {
return Err(ProxyError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"ME closed during handshake",
)));
}
Ok(Ok(n)) => n,
Ok(Err(e)) => return Err(ProxyError::Io(e)),
Err(_) => return Err(ProxyError::TgHandshakeTimeout),
};
enc_buf.extend_from_slice(&tmp[..n]);
let blocks = enc_buf.len() / 16 * 16;
if blocks > 0 {
let mut chunk = vec![0u8; blocks];
chunk.copy_from_slice(&enc_buf[..blocks]);
read_iv = cbc_decrypt_inplace(&rk, &read_iv, &mut chunk)?;
dec_buf.extend_from_slice(&chunk);
let _ = enc_buf.split_to(blocks);
}
while dec_buf.len() >= 4 {
let fl = u32::from_le_bytes(dec_buf[0..4].try_into().unwrap()) as usize;
if fl == 4 {
let _ = dec_buf.split_to(4);
continue;
}
if !(12..=(1 << 24)).contains(&fl) {
return Err(ProxyError::InvalidHandshake(format!(
"Bad HS response frame len: {fl}"
)));
}
if dec_buf.len() < fl {
break;
}
let frame = dec_buf.split_to(fl);
let pe = fl - 4;
let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap());
let ac = rpc_crc(RpcChecksumMode::Crc32, &frame[..pe]);
if ec != ac {
return Err(ProxyError::InvalidHandshake(format!(
"HS CRC mismatch: 0x{ec:08x} vs 0x{ac:08x}"
)));
}
let hs_payload = &frame[8..pe];
if hs_payload.len() < 4 {
return Err(ProxyError::InvalidHandshake(
"Handshake payload too short".to_string(),
));
}
let hs_type = u32::from_le_bytes(hs_payload[0..4].try_into().unwrap());
if hs_type == RPC_HANDSHAKE_ERROR_U32 {
let err_code = if hs_payload.len() >= 8 {
i32::from_le_bytes(hs_payload[4..8].try_into().unwrap())
} else {
-1
};
self.stats.increment_me_handshake_reject_total();
self.stats.increment_me_handshake_error_code(err_code);
return Err(ProxyError::InvalidHandshake(format!(
"ME rejected handshake (error={err_code})"
)));
}
let hs_flags = parse_handshake_flags(hs_payload)?;
if hs_flags & 0xff != 0 {
return Err(ProxyError::InvalidHandshake(format!(
"Unsupported handshake flags: 0x{hs_flags:08x}"
)));
}
negotiated_crc_mode = if (hs_flags & requested_crc_mode.advertised_flags()) != 0 {
RpcChecksumMode::from_handshake_flags(hs_flags)
} else if (hs_flags & rpc_crypto_flags::USE_CRC32C) != 0 {
return Err(ProxyError::InvalidHandshake(format!(
"Peer negotiated unsupported CRC flags: 0x{hs_flags:08x}"
)));
} else {
RpcChecksumMode::Crc32
};
handshake_ok = true;
break;
}
}
if !handshake_ok {
return Err(ProxyError::TgHandshakeTimeout);
}
let handshake_ms = hs_start.elapsed().as_secs_f64() * 1000.0;
info!(%addr, "RPC handshake OK");
Ok(HandshakeOutput {
rd,
wr,
source_ip: local_addr_nat.ip(),
read_key: rk,
read_iv,
write_key: wk,
write_iv,
crc_mode: negotiated_crc_mode,
handshake_ms,
})
}
}
fn hex_dump(data: &[u8]) -> String {
const MAX: usize = 64;
let mut out = String::with_capacity(data.len() * 2 + 3);
for (i, b) in data.iter().take(MAX).enumerate() {
if i > 0 {
out.push(' ');
}
out.push_str(&format!("{b:02x}"));
}
if data.len() > MAX {
out.push_str("");
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::ErrorKind;
use tokio::net::{TcpListener, TcpStream};
#[tokio::test]
async fn test_configure_keepalive_loopback() {
let listener = match TcpListener::bind("127.0.0.1:0").await {
Ok(listener) => listener,
Err(error) if error.kind() == ErrorKind::PermissionDenied => return,
Err(error) => panic!("bind failed: {error}"),
};
let addr = match listener.local_addr() {
Ok(addr) => addr,
Err(error) => panic!("local_addr failed: {error}"),
};
let stream = match TcpStream::connect(addr).await {
Ok(stream) => stream,
Err(error) if error.kind() == ErrorKind::PermissionDenied => return,
Err(error) => panic!("connect failed: {error}"),
};
if let Err(error) = MePool::configure_keepalive(&stream) {
if error.kind() == ErrorKind::PermissionDenied {
return;
}
panic!("configure_keepalive failed: {error}");
}
}
#[test]
#[cfg(target_os = "openbsd")]
fn test_openbsd_keepalive_cfg_path_compiles() {
let _ka = TcpKeepalive::new().with_time(Duration::from_secs(30));
}
#[test]
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "illumos",
target_os = "ios",
target_os = "visionos",
target_os = "linux",
target_os = "macos",
target_os = "netbsd",
target_os = "tvos",
target_os = "watchos",
target_os = "cygwin",
))]
fn test_retry_keepalive_cfg_path_compiles() {
let _ka = TcpKeepalive::new()
.with_time(Duration::from_secs(30))
.with_interval(Duration::from_secs(10))
.with_retries(3);
}
}