mirror of
https://github.com/telemt/telemt.git
synced 2026-06-15 07:21:43 +03:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d81d7dba62 | ||
|
|
04b8d8365c | ||
|
|
2e26bfb86e | ||
|
|
d414c73c9b | ||
|
|
b153782597 |
@@ -208,6 +208,8 @@ pub(crate) async fn initialize_me_pool(
|
|||||||
me_nat_probe,
|
me_nat_probe,
|
||||||
None,
|
None,
|
||||||
config.network.stun_servers.clone(),
|
config.network.stun_servers.clone(),
|
||||||
|
config.network.stun_tcp_fallback,
|
||||||
|
config.network.http_ip_detect_urls.clone(),
|
||||||
config.general.stun_nat_probe_concurrency,
|
config.general.stun_nat_probe_concurrency,
|
||||||
probe.detected_ipv6,
|
probe.detected_ipv6,
|
||||||
config.timeouts.me_one_retry,
|
config.timeouts.me_one_retry,
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ use tracing::{debug, info, warn};
|
|||||||
use crate::config::{NetworkConfig, UpstreamConfig, UpstreamType};
|
use crate::config::{NetworkConfig, UpstreamConfig, UpstreamType};
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
use crate::network::stun::{
|
use crate::network::stun::{
|
||||||
DualStunResult, IpFamily, StunProbeResult, stun_probe_family_with_bind,
|
DualStunResult, IpFamily, StunProbeResult, stun_probe_family_with_bind_and_tcp_fallback,
|
||||||
};
|
};
|
||||||
use crate::transport::UpstreamManager;
|
use crate::transport::UpstreamManager;
|
||||||
|
|
||||||
@@ -58,6 +58,7 @@ impl NetworkDecision {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const STUN_BATCH_TIMEOUT: Duration = Duration::from_secs(5);
|
const STUN_BATCH_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
|
const STUN_BATCH_TCP_FALLBACK_TIMEOUT: Duration = Duration::from_secs(12);
|
||||||
|
|
||||||
pub async fn run_probe(
|
pub async fn run_probe(
|
||||||
config: &NetworkConfig,
|
config: &NetworkConfig,
|
||||||
@@ -81,7 +82,13 @@ pub async fn run_probe(
|
|||||||
warn!("STUN probe is enabled but network.stun_servers is empty");
|
warn!("STUN probe is enabled but network.stun_servers is empty");
|
||||||
DualStunResult::default()
|
DualStunResult::default()
|
||||||
} else {
|
} else {
|
||||||
probe_stun_servers_parallel(&servers, stun_nat_probe_concurrency.max(1), None, None)
|
probe_stun_servers_parallel(
|
||||||
|
&servers,
|
||||||
|
stun_nat_probe_concurrency.max(1),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
config.stun_tcp_fallback,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
} else if nat_probe {
|
} else if nat_probe {
|
||||||
@@ -163,6 +170,7 @@ pub async fn run_probe(
|
|||||||
stun_nat_probe_concurrency.max(1),
|
stun_nat_probe_concurrency.max(1),
|
||||||
bind_v4,
|
bind_v4,
|
||||||
bind_v6,
|
bind_v6,
|
||||||
|
config.stun_tcp_fallback,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
if let Some(reflected) = direct_stun_res.v4.map(|r| r.reflected_addr) {
|
if let Some(reflected) = direct_stun_res.v4.map(|r| r.reflected_addr) {
|
||||||
@@ -234,7 +242,7 @@ pub async fn run_probe(
|
|||||||
Ok(probe)
|
Ok(probe)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn detect_public_ipv4_http(urls: &[String]) -> Option<Ipv4Addr> {
|
pub(crate) async fn detect_public_ipv4_http(urls: &[String]) -> Option<Ipv4Addr> {
|
||||||
let client = reqwest::Client::builder()
|
let client = reqwest::Client::builder()
|
||||||
.timeout(Duration::from_secs(3))
|
.timeout(Duration::from_secs(3))
|
||||||
.build()
|
.build()
|
||||||
@@ -277,6 +285,7 @@ async fn probe_stun_servers_parallel(
|
|||||||
concurrency: usize,
|
concurrency: usize,
|
||||||
bind_v4: Option<IpAddr>,
|
bind_v4: Option<IpAddr>,
|
||||||
bind_v6: Option<IpAddr>,
|
bind_v6: Option<IpAddr>,
|
||||||
|
tcp_fallback: bool,
|
||||||
) -> DualStunResult {
|
) -> DualStunResult {
|
||||||
let mut join_set = JoinSet::new();
|
let mut join_set = JoinSet::new();
|
||||||
let mut next_idx = 0usize;
|
let mut next_idx = 0usize;
|
||||||
@@ -288,9 +297,26 @@ async fn probe_stun_servers_parallel(
|
|||||||
let stun_addr = servers[next_idx].clone();
|
let stun_addr = servers[next_idx].clone();
|
||||||
next_idx += 1;
|
next_idx += 1;
|
||||||
join_set.spawn(async move {
|
join_set.spawn(async move {
|
||||||
let res = timeout(STUN_BATCH_TIMEOUT, async {
|
let batch_timeout = if tcp_fallback {
|
||||||
let v4 = stun_probe_family_with_bind(&stun_addr, IpFamily::V4, bind_v4).await?;
|
STUN_BATCH_TCP_FALLBACK_TIMEOUT
|
||||||
let v6 = stun_probe_family_with_bind(&stun_addr, IpFamily::V6, bind_v6).await?;
|
} else {
|
||||||
|
STUN_BATCH_TIMEOUT
|
||||||
|
};
|
||||||
|
let res = timeout(batch_timeout, async {
|
||||||
|
let v4 = stun_probe_family_with_bind_and_tcp_fallback(
|
||||||
|
&stun_addr,
|
||||||
|
IpFamily::V4,
|
||||||
|
bind_v4,
|
||||||
|
tcp_fallback,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let v6 = stun_probe_family_with_bind_and_tcp_fallback(
|
||||||
|
&stun_addr,
|
||||||
|
IpFamily::V6,
|
||||||
|
bind_v6,
|
||||||
|
tcp_fallback,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
Ok::<DualStunResult, crate::error::ProxyError>(DualStunResult { v4, v6 })
|
Ok::<DualStunResult, crate::error::ProxyError>(DualStunResult { v4, v6 })
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|||||||
@@ -4,7 +4,8 @@
|
|||||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
|
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||||
use std::sync::OnceLock;
|
use std::sync::OnceLock;
|
||||||
|
|
||||||
use tokio::net::{UdpSocket, lookup_host};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use tokio::net::{TcpSocket, UdpSocket, lookup_host};
|
||||||
use tokio::time::{Duration, sleep, timeout};
|
use tokio::time::{Duration, sleep, timeout};
|
||||||
|
|
||||||
use crate::crypto::SecureRandom;
|
use crate::crypto::SecureRandom;
|
||||||
@@ -36,9 +37,16 @@ pub struct DualStunResult {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn stun_probe_dual(stun_addr: &str) -> Result<DualStunResult> {
|
pub async fn stun_probe_dual(stun_addr: &str) -> Result<DualStunResult> {
|
||||||
|
stun_probe_dual_with_tcp_fallback(stun_addr, false).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn stun_probe_dual_with_tcp_fallback(
|
||||||
|
stun_addr: &str,
|
||||||
|
tcp_fallback: bool,
|
||||||
|
) -> Result<DualStunResult> {
|
||||||
let (v4, v6) = tokio::join!(
|
let (v4, v6) = tokio::join!(
|
||||||
stun_probe_family(stun_addr, IpFamily::V4),
|
stun_probe_family_with_tcp_fallback(stun_addr, IpFamily::V4, tcp_fallback),
|
||||||
stun_probe_family(stun_addr, IpFamily::V6),
|
stun_probe_family_with_tcp_fallback(stun_addr, IpFamily::V6, tcp_fallback),
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(DualStunResult { v4: v4?, v6: v6? })
|
Ok(DualStunResult { v4: v4?, v6: v6? })
|
||||||
@@ -48,13 +56,44 @@ pub async fn stun_probe_family(
|
|||||||
stun_addr: &str,
|
stun_addr: &str,
|
||||||
family: IpFamily,
|
family: IpFamily,
|
||||||
) -> Result<Option<StunProbeResult>> {
|
) -> Result<Option<StunProbeResult>> {
|
||||||
stun_probe_family_with_bind(stun_addr, family, None).await
|
stun_probe_family_with_tcp_fallback(stun_addr, family, false).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn stun_probe_family_with_tcp_fallback(
|
||||||
|
stun_addr: &str,
|
||||||
|
family: IpFamily,
|
||||||
|
tcp_fallback: bool,
|
||||||
|
) -> Result<Option<StunProbeResult>> {
|
||||||
|
stun_probe_family_with_bind_and_tcp_fallback(stun_addr, family, None, tcp_fallback).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn stun_probe_family_with_bind(
|
pub async fn stun_probe_family_with_bind(
|
||||||
stun_addr: &str,
|
stun_addr: &str,
|
||||||
family: IpFamily,
|
family: IpFamily,
|
||||||
bind_ip: Option<IpAddr>,
|
bind_ip: Option<IpAddr>,
|
||||||
|
) -> Result<Option<StunProbeResult>> {
|
||||||
|
stun_probe_family_with_bind_and_tcp_fallback(stun_addr, family, bind_ip, false).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn stun_probe_family_with_bind_and_tcp_fallback(
|
||||||
|
stun_addr: &str,
|
||||||
|
family: IpFamily,
|
||||||
|
bind_ip: Option<IpAddr>,
|
||||||
|
tcp_fallback: bool,
|
||||||
|
) -> Result<Option<StunProbeResult>> {
|
||||||
|
let udp_attempts = if tcp_fallback { 1 } else { 3 };
|
||||||
|
let udp_result = stun_probe_family_udp(stun_addr, family, bind_ip, udp_attempts).await?;
|
||||||
|
if udp_result.is_some() || !tcp_fallback {
|
||||||
|
return Ok(udp_result);
|
||||||
|
}
|
||||||
|
stun_probe_family_tcp(stun_addr, family, bind_ip).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn stun_probe_family_udp(
|
||||||
|
stun_addr: &str,
|
||||||
|
family: IpFamily,
|
||||||
|
bind_ip: Option<IpAddr>,
|
||||||
|
max_attempts: u8,
|
||||||
) -> Result<Option<StunProbeResult>> {
|
) -> Result<Option<StunProbeResult>> {
|
||||||
let bind_addr = match (family, bind_ip) {
|
let bind_addr = match (family, bind_ip) {
|
||||||
(IpFamily::V4, Some(IpAddr::V4(ip))) => SocketAddr::new(IpAddr::V4(ip), 0),
|
(IpFamily::V4, Some(IpAddr::V4(ip))) => SocketAddr::new(IpAddr::V4(ip), 0),
|
||||||
@@ -94,12 +133,7 @@ pub async fn stun_probe_family_with_bind(
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut req = [0u8; 20];
|
let req = build_binding_request();
|
||||||
req[0..2].copy_from_slice(&0x0001u16.to_be_bytes()); // Binding Request
|
|
||||||
req[2..4].copy_from_slice(&0u16.to_be_bytes()); // length
|
|
||||||
req[4..8].copy_from_slice(&0x2112A442u32.to_be_bytes()); // magic cookie
|
|
||||||
stun_rng().fill(&mut req[8..20]); // transaction ID
|
|
||||||
|
|
||||||
let mut buf = [0u8; 256];
|
let mut buf = [0u8; 256];
|
||||||
let mut attempt = 0;
|
let mut attempt = 0;
|
||||||
let mut backoff = Duration::from_secs(1);
|
let mut backoff = Duration::from_secs(1);
|
||||||
@@ -115,7 +149,7 @@ pub async fn stun_probe_family_with_bind(
|
|||||||
Ok(Err(e)) => return Err(ProxyError::Proxy(format!("STUN recv failed: {e}"))),
|
Ok(Err(e)) => return Err(ProxyError::Proxy(format!("STUN recv failed: {e}"))),
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
attempt += 1;
|
attempt += 1;
|
||||||
if attempt >= 3 {
|
if attempt >= max_attempts {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
sleep(backoff).await;
|
sleep(backoff).await;
|
||||||
@@ -128,19 +162,139 @@ pub async fn stun_probe_family_with_bind(
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
let magic = 0x2112A442u32.to_be_bytes();
|
|
||||||
let txid = &req[8..20];
|
let txid = &req[8..20];
|
||||||
|
if let Some(reflected_addr) = parse_reflected_addr(&buf[..n], txid) {
|
||||||
|
let local_addr = socket
|
||||||
|
.local_addr()
|
||||||
|
.map_err(|e| ProxyError::Proxy(format!("STUN local_addr failed: {e}")))?;
|
||||||
|
return Ok(Some(StunProbeResult {
|
||||||
|
local_addr,
|
||||||
|
reflected_addr,
|
||||||
|
family,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn stun_probe_family_tcp(
|
||||||
|
stun_addr: &str,
|
||||||
|
family: IpFamily,
|
||||||
|
bind_ip: Option<IpAddr>,
|
||||||
|
) -> Result<Option<StunProbeResult>> {
|
||||||
|
let target_addr = match resolve_stun_addr(stun_addr, family).await? {
|
||||||
|
Some(addr) => addr,
|
||||||
|
None => return Ok(None),
|
||||||
|
};
|
||||||
|
let socket = match family {
|
||||||
|
IpFamily::V4 => TcpSocket::new_v4(),
|
||||||
|
IpFamily::V6 => TcpSocket::new_v6(),
|
||||||
|
}
|
||||||
|
.map_err(|e| ProxyError::Proxy(format!("STUN TCP socket failed: {e}")))?;
|
||||||
|
match (family, bind_ip) {
|
||||||
|
(IpFamily::V4, Some(IpAddr::V4(ip))) => {
|
||||||
|
if socket.bind(SocketAddr::new(IpAddr::V4(ip), 0)).is_err() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(IpFamily::V6, Some(IpAddr::V6(ip))) => {
|
||||||
|
if socket.bind(SocketAddr::new(IpAddr::V6(ip), 0)).is_err() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(IpFamily::V4, Some(IpAddr::V6(_))) | (IpFamily::V6, Some(IpAddr::V4(_))) => {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
(_, None) => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
let connect_res = timeout(Duration::from_secs(3), socket.connect(target_addr)).await;
|
||||||
|
let mut stream = match connect_res {
|
||||||
|
Ok(Ok(stream)) => stream,
|
||||||
|
Ok(Err(e))
|
||||||
|
if family == IpFamily::V6
|
||||||
|
&& matches!(
|
||||||
|
e.kind(),
|
||||||
|
std::io::ErrorKind::NetworkUnreachable
|
||||||
|
| std::io::ErrorKind::HostUnreachable
|
||||||
|
| std::io::ErrorKind::Unsupported
|
||||||
|
| std::io::ErrorKind::NetworkDown
|
||||||
|
) =>
|
||||||
|
{
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => return Err(ProxyError::Proxy(format!("STUN TCP connect failed: {e}"))),
|
||||||
|
Err(_) => return Ok(None),
|
||||||
|
};
|
||||||
|
|
||||||
|
let req = build_binding_request();
|
||||||
|
timeout(Duration::from_secs(3), stream.write_all(&req))
|
||||||
|
.await
|
||||||
|
.map_err(|_| ProxyError::Proxy("STUN TCP send timeout".to_string()))?
|
||||||
|
.map_err(|e| ProxyError::Proxy(format!("STUN TCP send failed: {e}")))?;
|
||||||
|
|
||||||
|
let mut header = [0u8; 20];
|
||||||
|
timeout(Duration::from_secs(3), stream.read_exact(&mut header))
|
||||||
|
.await
|
||||||
|
.map_err(|_| ProxyError::Proxy("STUN TCP header timeout".to_string()))?
|
||||||
|
.map_err(|e| ProxyError::Proxy(format!("STUN TCP header read failed: {e}")))?;
|
||||||
|
let body_len = u16::from_be_bytes([header[2], header[3]]) as usize;
|
||||||
|
if body_len > 236 {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
let mut buf = [0u8; 256];
|
||||||
|
buf[..20].copy_from_slice(&header);
|
||||||
|
if body_len > 0 {
|
||||||
|
timeout(
|
||||||
|
Duration::from_secs(3),
|
||||||
|
stream.read_exact(&mut buf[20..20 + body_len]),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|_| ProxyError::Proxy("STUN TCP body timeout".to_string()))?
|
||||||
|
.map_err(|e| ProxyError::Proxy(format!("STUN TCP body read failed: {e}")))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let txid = &req[8..20];
|
||||||
|
let Some(reflected_addr) = parse_reflected_addr(&buf[..20 + body_len], txid) else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
let local_addr = stream
|
||||||
|
.local_addr()
|
||||||
|
.map_err(|e| ProxyError::Proxy(format!("STUN TCP local_addr failed: {e}")))?;
|
||||||
|
Ok(Some(StunProbeResult {
|
||||||
|
local_addr,
|
||||||
|
reflected_addr,
|
||||||
|
family,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_binding_request() -> [u8; 20] {
|
||||||
|
let mut req = [0u8; 20];
|
||||||
|
req[0..2].copy_from_slice(&0x0001u16.to_be_bytes());
|
||||||
|
req[2..4].copy_from_slice(&0u16.to_be_bytes());
|
||||||
|
req[4..8].copy_from_slice(&0x2112A442u32.to_be_bytes());
|
||||||
|
stun_rng().fill(&mut req[8..20]);
|
||||||
|
req
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_reflected_addr(buf: &[u8], txid: &[u8]) -> Option<SocketAddr> {
|
||||||
|
if buf.len() < 20 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let magic = 0x2112A442u32.to_be_bytes();
|
||||||
let mut idx = 20;
|
let mut idx = 20;
|
||||||
while idx + 4 <= n {
|
while idx + 4 <= buf.len() {
|
||||||
let atype = u16::from_be_bytes(buf[idx..idx + 2].try_into().unwrap());
|
let atype = u16::from_be_bytes(buf[idx..idx + 2].try_into().ok()?);
|
||||||
let alen = u16::from_be_bytes(buf[idx + 2..idx + 4].try_into().unwrap()) as usize;
|
let alen = u16::from_be_bytes(buf[idx + 2..idx + 4].try_into().ok()?) as usize;
|
||||||
idx += 4;
|
idx += 4;
|
||||||
if idx + alen > n {
|
if idx + alen > buf.len() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
match atype {
|
match atype {
|
||||||
0x0020 /* XOR-MAPPED-ADDRESS */ | 0x0001 /* MAPPED-ADDRESS */ => {
|
0x0020 | 0x0001 => {
|
||||||
if alen < 8 {
|
if alen < 8 {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -157,7 +311,6 @@ pub async fn stun_probe_family_with_bind(
|
|||||||
|
|
||||||
let raw_ip = &buf[idx + 4..idx + 4 + len_check];
|
let raw_ip = &buf[idx + 4..idx + 4 + len_check];
|
||||||
let mut port = u16::from_be_bytes(port_bytes);
|
let mut port = u16::from_be_bytes(port_bytes);
|
||||||
|
|
||||||
let reflected_ip = if atype == 0x0020 {
|
let reflected_ip = if atype == 0x0020 {
|
||||||
port ^= ((magic[0] as u16) << 8) | magic[1] as u16;
|
port ^= ((magic[0] as u16) << 8) | magic[1] as u16;
|
||||||
match family_byte {
|
match family_byte {
|
||||||
@@ -172,7 +325,9 @@ pub async fn stun_probe_family_with_bind(
|
|||||||
}
|
}
|
||||||
0x02 => {
|
0x02 => {
|
||||||
let mut ip = [0u8; 16];
|
let mut ip = [0u8; 16];
|
||||||
let xor_key = [magic.as_slice(), txid].concat();
|
let mut xor_key = [0u8; 16];
|
||||||
|
xor_key[..4].copy_from_slice(&magic);
|
||||||
|
xor_key[4..].copy_from_slice(txid.get(..12)?);
|
||||||
for (i, b) in raw_ip.iter().enumerate().take(16) {
|
for (i, b) in raw_ip.iter().enumerate().take(16) {
|
||||||
ip[i] = *b ^ xor_key[i];
|
ip[i] = *b ^ xor_key[i];
|
||||||
}
|
}
|
||||||
@@ -185,34 +340,24 @@ pub async fn stun_probe_family_with_bind(
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
match family_byte {
|
match family_byte {
|
||||||
0x01 => IpAddr::V4(Ipv4Addr::new(raw_ip[0], raw_ip[1], raw_ip[2], raw_ip[3])),
|
0x01 => {
|
||||||
0x02 => IpAddr::V6(Ipv6Addr::from(<[u8; 16]>::try_from(raw_ip).unwrap())),
|
IpAddr::V4(Ipv4Addr::new(raw_ip[0], raw_ip[1], raw_ip[2], raw_ip[3]))
|
||||||
|
}
|
||||||
|
0x02 => IpAddr::V6(Ipv6Addr::from(<[u8; 16]>::try_from(raw_ip).ok()?)),
|
||||||
_ => {
|
_ => {
|
||||||
idx += (alen + 3) & !3;
|
idx += (alen + 3) & !3;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
return Some(SocketAddr::new(reflected_ip, port));
|
||||||
let reflected_addr = SocketAddr::new(reflected_ip, port);
|
|
||||||
let local_addr = socket
|
|
||||||
.local_addr()
|
|
||||||
.map_err(|e| ProxyError::Proxy(format!("STUN local_addr failed: {e}")))?;
|
|
||||||
|
|
||||||
return Ok(Some(StunProbeResult {
|
|
||||||
local_addr,
|
|
||||||
reflected_addr,
|
|
||||||
family,
|
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
idx += (alen + 3) & !3;
|
idx += (alen + 3) & !3;
|
||||||
}
|
}
|
||||||
}
|
None
|
||||||
|
|
||||||
Ok(None)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resolve_stun_addr(stun_addr: &str, family: IpFamily) -> Result<Option<SocketAddr>> {
|
async fn resolve_stun_addr(stun_addr: &str, family: IpFamily) -> Result<Option<SocketAddr>> {
|
||||||
@@ -245,3 +390,58 @@ async fn resolve_stun_addr(stun_addr: &str, family: IpFamily) -> Result<Option<S
|
|||||||
});
|
});
|
||||||
Ok(target)
|
Ok(target)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_reflected_addr_reads_mapped_ipv4() {
|
||||||
|
let txid = [0u8; 12];
|
||||||
|
let mut response = [0u8; 32];
|
||||||
|
response[0..2].copy_from_slice(&0x0101u16.to_be_bytes());
|
||||||
|
response[2..4].copy_from_slice(&12u16.to_be_bytes());
|
||||||
|
response[4..8].copy_from_slice(&0x2112A442u32.to_be_bytes());
|
||||||
|
response[20..22].copy_from_slice(&0x0001u16.to_be_bytes());
|
||||||
|
response[22..24].copy_from_slice(&8u16.to_be_bytes());
|
||||||
|
response[25] = 0x01;
|
||||||
|
response[26..28].copy_from_slice(&443u16.to_be_bytes());
|
||||||
|
response[28..32].copy_from_slice(&[203, 0, 113, 9]);
|
||||||
|
|
||||||
|
let reflected = parse_reflected_addr(&response, &txid).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
reflected,
|
||||||
|
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 9)), 443)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_reflected_addr_reads_xor_mapped_ipv4() {
|
||||||
|
let txid = [0u8; 12];
|
||||||
|
let magic = 0x2112A442u32.to_be_bytes();
|
||||||
|
let port = 443u16;
|
||||||
|
let ip = [203u8, 0, 113, 9];
|
||||||
|
let xport = port ^ (((magic[0] as u16) << 8) | magic[1] as u16);
|
||||||
|
let xip = [
|
||||||
|
ip[0] ^ magic[0],
|
||||||
|
ip[1] ^ magic[1],
|
||||||
|
ip[2] ^ magic[2],
|
||||||
|
ip[3] ^ magic[3],
|
||||||
|
];
|
||||||
|
let mut response = [0u8; 32];
|
||||||
|
response[0..2].copy_from_slice(&0x0101u16.to_be_bytes());
|
||||||
|
response[2..4].copy_from_slice(&12u16.to_be_bytes());
|
||||||
|
response[4..8].copy_from_slice(&0x2112A442u32.to_be_bytes());
|
||||||
|
response[20..22].copy_from_slice(&0x0020u16.to_be_bytes());
|
||||||
|
response[22..24].copy_from_slice(&8u16.to_be_bytes());
|
||||||
|
response[25] = 0x01;
|
||||||
|
response[26..28].copy_from_slice(&xport.to_be_bytes());
|
||||||
|
response[28..32].copy_from_slice(&xip);
|
||||||
|
|
||||||
|
let reflected = parse_reflected_addr(&response, &txid).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
reflected,
|
||||||
|
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 9)), 443)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -236,7 +236,8 @@ pub fn is_valid_secure_payload_len(data_len: usize) -> bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Compute Secure Intermediate payload length from wire length.
|
/// Compute Secure Intermediate payload length from wire length.
|
||||||
/// Secure mode strips up to 3 random tail bytes by truncating to 4-byte boundary.
|
/// Secure mode cannot distinguish full-word padding from payload, so only the
|
||||||
|
/// non-aligned tail bytes are stripped.
|
||||||
pub fn secure_payload_len_from_wire_len(wire_len: usize) -> Option<usize> {
|
pub fn secure_payload_len_from_wire_len(wire_len: usize) -> Option<usize> {
|
||||||
if wire_len < 4 {
|
if wire_len < 4 {
|
||||||
return None;
|
return None;
|
||||||
@@ -245,13 +246,13 @@ pub fn secure_payload_len_from_wire_len(wire_len: usize) -> Option<usize> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Generate padding length for Secure Intermediate protocol.
|
/// Generate padding length for Secure Intermediate protocol.
|
||||||
/// Data must be 4-byte aligned; padding is 1..=3 so total is never divisible by 4.
|
/// Telegram Desktop uses a 4-bit random padding length for VersionD packets.
|
||||||
pub fn secure_padding_len(data_len: usize, rng: &SecureRandom) -> usize {
|
pub fn secure_padding_len(data_len: usize, rng: &SecureRandom) -> usize {
|
||||||
debug_assert!(
|
debug_assert!(
|
||||||
is_valid_secure_payload_len(data_len),
|
is_valid_secure_payload_len(data_len),
|
||||||
"Secure payload must be 4-byte aligned, got {data_len}"
|
"Secure payload must be 4-byte aligned, got {data_len}"
|
||||||
);
|
);
|
||||||
rng.range(3) + 1
|
rng.range(16)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============= Timeouts =============
|
// ============= Timeouts =============
|
||||||
@@ -424,21 +425,15 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn secure_padding_never_produces_aligned_total() {
|
fn secure_padding_matches_tdesktop_range() {
|
||||||
let rng = SecureRandom::new();
|
let rng = SecureRandom::new();
|
||||||
for data_len in (0..1000).step_by(4) {
|
for data_len in (0..1000).step_by(4) {
|
||||||
for _ in 0..100 {
|
for _ in 0..100 {
|
||||||
let padding = secure_padding_len(data_len, &rng);
|
let padding = secure_padding_len(data_len, &rng);
|
||||||
assert!(
|
assert!(
|
||||||
padding <= 3,
|
padding <= 15,
|
||||||
"padding out of range: data_len={data_len}, padding={padding}"
|
"padding out of range: data_len={data_len}, padding={padding}"
|
||||||
);
|
);
|
||||||
assert_ne!(
|
|
||||||
(data_len + padding) % 4,
|
|
||||||
0,
|
|
||||||
"invariant violated: data_len={data_len}, padding={padding}, total={}",
|
|
||||||
data_len + padding
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -454,6 +449,16 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn secure_wire_len_preserves_full_word_tail() {
|
||||||
|
let payload_len = 64;
|
||||||
|
for padding in [4usize, 8, 12] {
|
||||||
|
let wire_len = payload_len + padding;
|
||||||
|
let recovered = secure_payload_len_from_wire_len(wire_len);
|
||||||
|
assert_eq!(recovered, Some(wire_len));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn secure_wire_len_rejects_too_short_frames() {
|
fn secure_wire_len_rejects_too_short_frames() {
|
||||||
assert_eq!(secure_payload_len_from_wire_len(0), None);
|
assert_eq!(secure_payload_len_from_wire_len(0), None);
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use crate::network::dns_overrides::resolve_socket_addr;
|
|||||||
use crate::protocol::tls;
|
use crate::protocol::tls;
|
||||||
use crate::stats::beobachten::BeobachtenStore;
|
use crate::stats::beobachten::BeobachtenStore;
|
||||||
use crate::transport::proxy_protocol::{ProxyProtocolV1Builder, ProxyProtocolV2Builder};
|
use crate::transport::proxy_protocol::{ProxyProtocolV1Builder, ProxyProtocolV2Builder};
|
||||||
|
use crate::transport::socket::configure_tcp_socket;
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
use nix::ifaddrs::getifaddrs;
|
use nix::ifaddrs::getifaddrs;
|
||||||
use rand::rngs::StdRng;
|
use rand::rngs::StdRng;
|
||||||
@@ -36,6 +37,8 @@ const MASK_RELAY_TIMEOUT: Duration = Duration::from_millis(200);
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
|
const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
|
||||||
const MASK_BUFFER_SIZE: usize = 8192;
|
const MASK_BUFFER_SIZE: usize = 8192;
|
||||||
|
const MASK_BUFFER_GROW_AFTER_BYTES: usize = 256 * 1024;
|
||||||
|
const MASK_BUFFER_MAX_SIZE: usize = 64 * 1024;
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
#[cfg(not(test))]
|
#[cfg(not(test))]
|
||||||
const LOCAL_INTERFACE_CACHE_TTL: Duration = Duration::from_secs(300);
|
const LOCAL_INTERFACE_CACHE_TTL: Duration = Duration::from_secs(300);
|
||||||
@@ -53,6 +56,27 @@ struct MaskTcpTarget<'a> {
|
|||||||
port: u16,
|
port: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn mask_copy_read_len(total: usize, byte_cap: usize) -> usize {
|
||||||
|
// Keep short scanner probes on the small baseline buffer and grow only
|
||||||
|
// after the session has proven to be sustained masking relay traffic.
|
||||||
|
let active_buffer_size = if total >= MASK_BUFFER_GROW_AFTER_BYTES {
|
||||||
|
MASK_BUFFER_MAX_SIZE
|
||||||
|
} else {
|
||||||
|
MASK_BUFFER_SIZE
|
||||||
|
};
|
||||||
|
|
||||||
|
if byte_cap == 0 {
|
||||||
|
return active_buffer_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
let remaining_budget = byte_cap.saturating_sub(total);
|
||||||
|
if remaining_budget == 0 {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
remaining_budget.min(active_buffer_size)
|
||||||
|
}
|
||||||
|
|
||||||
async fn copy_with_idle_timeout<R, W>(
|
async fn copy_with_idle_timeout<R, W>(
|
||||||
reader: &mut R,
|
reader: &mut R,
|
||||||
writer: &mut W,
|
writer: &mut W,
|
||||||
@@ -64,21 +88,18 @@ where
|
|||||||
R: AsyncRead + Unpin,
|
R: AsyncRead + Unpin,
|
||||||
W: AsyncWrite + Unpin,
|
W: AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
let mut buf = Box::new([0u8; MASK_BUFFER_SIZE]);
|
let mut buf = vec![0u8; MASK_BUFFER_SIZE];
|
||||||
let mut total = 0usize;
|
let mut total = 0usize;
|
||||||
let mut ended_by_eof = false;
|
let mut ended_by_eof = false;
|
||||||
let unlimited = byte_cap == 0;
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let read_len = if unlimited {
|
let read_len = mask_copy_read_len(total, byte_cap);
|
||||||
MASK_BUFFER_SIZE
|
if read_len == 0 {
|
||||||
} else {
|
|
||||||
let remaining_budget = byte_cap.saturating_sub(total);
|
|
||||||
if remaining_budget == 0 {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
remaining_budget.min(MASK_BUFFER_SIZE)
|
if buf.len() < read_len {
|
||||||
};
|
buf.resize(read_len, 0);
|
||||||
|
}
|
||||||
let read_res = timeout(idle_timeout, reader.read(&mut buf[..read_len])).await;
|
let read_res = timeout(idle_timeout, reader.read(&mut buf[..read_len])).await;
|
||||||
let n = match read_res {
|
let n = match read_res {
|
||||||
Ok(Ok(n)) => n,
|
Ok(Ok(n)) => n,
|
||||||
@@ -877,6 +898,12 @@ fn build_mask_proxy_header(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn configure_mask_backend_socket(stream: &TcpStream) {
|
||||||
|
if let Err(e) = configure_tcp_socket(stream, false, Duration::from_secs(0)) {
|
||||||
|
debug!(error = %e, "Failed to configure mask backend socket");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle a bad client by forwarding to mask host
|
/// Handle a bad client by forwarding to mask host
|
||||||
pub async fn handle_bad_client<R, W>(
|
pub async fn handle_bad_client<R, W>(
|
||||||
reader: R,
|
reader: R,
|
||||||
@@ -1047,6 +1074,7 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
let connect_result = timeout(MASK_TIMEOUT, TcpStream::connect(&mask_addr)).await;
|
let connect_result = timeout(MASK_TIMEOUT, TcpStream::connect(&mask_addr)).await;
|
||||||
match connect_result {
|
match connect_result {
|
||||||
Ok(Ok(stream)) => {
|
Ok(Ok(stream)) => {
|
||||||
|
configure_mask_backend_socket(&stream);
|
||||||
let proxy_header =
|
let proxy_header =
|
||||||
build_mask_proxy_header(config.censorship.mask_proxy_protocol, peer, local_addr);
|
build_mask_proxy_header(config.censorship.mask_proxy_protocol, peer, local_addr);
|
||||||
|
|
||||||
@@ -1190,20 +1218,17 @@ async fn consume_client_data<R: AsyncRead + Unpin>(
|
|||||||
idle_timeout: Duration,
|
idle_timeout: Duration,
|
||||||
) {
|
) {
|
||||||
// Keep drain path fail-closed under slow-loris stalls.
|
// Keep drain path fail-closed under slow-loris stalls.
|
||||||
let mut buf = Box::new([0u8; MASK_BUFFER_SIZE]);
|
let mut buf = vec![0u8; MASK_BUFFER_SIZE];
|
||||||
let mut total = 0usize;
|
let mut total = 0usize;
|
||||||
let unlimited = byte_cap == 0;
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let read_len = if unlimited {
|
let read_len = mask_copy_read_len(total, byte_cap);
|
||||||
MASK_BUFFER_SIZE
|
if read_len == 0 {
|
||||||
} else {
|
|
||||||
let remaining_budget = byte_cap.saturating_sub(total);
|
|
||||||
if remaining_budget == 0 {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
remaining_budget.min(MASK_BUFFER_SIZE)
|
if buf.len() < read_len {
|
||||||
};
|
buf.resize(read_len, 0);
|
||||||
|
}
|
||||||
let n = match timeout(idle_timeout, reader.read(&mut buf[..read_len])).await {
|
let n = match timeout(idle_timeout, reader.read(&mut buf[..read_len])).await {
|
||||||
Ok(Ok(n)) => n,
|
Ok(Ok(n)) => n,
|
||||||
Ok(Err(_)) | Err(_) => break,
|
Ok(Err(_)) | Err(_) => break,
|
||||||
@@ -1214,7 +1239,7 @@ async fn consume_client_data<R: AsyncRead + Unpin>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
total = total.saturating_add(n);
|
total = total.saturating_add(n);
|
||||||
if !unlimited && total >= byte_cap {
|
if byte_cap != 0 && total >= byte_cap {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1332,6 +1357,10 @@ mod masking_interface_cache_concurrency_security_tests;
|
|||||||
#[path = "tests/masking_production_cap_regression_security_tests.rs"]
|
#[path = "tests/masking_production_cap_regression_security_tests.rs"]
|
||||||
mod masking_production_cap_regression_security_tests;
|
mod masking_production_cap_regression_security_tests;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
#[path = "tests/masking_relay_manual_perf_tests.rs"]
|
||||||
|
mod masking_relay_manual_perf_tests;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
#[path = "tests/masking_extended_attack_surface_security_tests.rs"]
|
#[path = "tests/masking_extended_attack_surface_security_tests.rs"]
|
||||||
mod masking_extended_attack_surface_security_tests;
|
mod masking_extended_attack_surface_security_tests;
|
||||||
|
|||||||
@@ -331,7 +331,8 @@ where
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Secure Intermediate: strip validated trailing padding bytes.
|
// Secure Intermediate strips only non-aligned tail padding; full-word
|
||||||
|
// padding is indistinguishable from payload in VersionD framing.
|
||||||
if proto_tag == ProtoTag::Secure {
|
if proto_tag == ProtoTag::Secure {
|
||||||
payload.truncate(secure_payload_len);
|
payload.truncate(secure_payload_len);
|
||||||
}
|
}
|
||||||
|
|||||||
111
src/proxy/tests/masking_relay_manual_perf_tests.rs
Normal file
111
src/proxy/tests/masking_relay_manual_perf_tests.rs
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
use super::*;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
|
use tokio::time::{Duration, Instant};
|
||||||
|
|
||||||
|
const PERF_TOTAL_BYTES: usize = 64 * 1024 * 1024;
|
||||||
|
|
||||||
|
struct PatternReader {
|
||||||
|
remaining: usize,
|
||||||
|
chunk: usize,
|
||||||
|
read_calls: AtomicUsize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PatternReader {
|
||||||
|
fn new(total: usize, chunk: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
remaining: total,
|
||||||
|
chunk,
|
||||||
|
read_calls: AtomicUsize::new(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_calls(&self) -> usize {
|
||||||
|
self.read_calls.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncRead for PatternReader {
|
||||||
|
fn poll_read(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
_cx: &mut Context<'_>,
|
||||||
|
buf: &mut ReadBuf<'_>,
|
||||||
|
) -> Poll<std::io::Result<()>> {
|
||||||
|
self.read_calls.fetch_add(1, Ordering::Relaxed);
|
||||||
|
if self.remaining == 0 {
|
||||||
|
return Poll::Ready(Ok(()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let take = self.remaining.min(self.chunk).min(buf.remaining());
|
||||||
|
if take == 0 {
|
||||||
|
return Poll::Ready(Ok(()));
|
||||||
|
}
|
||||||
|
|
||||||
|
static PATTERN: [u8; MASK_BUFFER_MAX_SIZE] = [0xA5; MASK_BUFFER_MAX_SIZE];
|
||||||
|
buf.put_slice(&PATTERN[..take]);
|
||||||
|
self.remaining -= take;
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct CountingWriter {
|
||||||
|
written: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncWrite for CountingWriter {
|
||||||
|
fn poll_write(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
_cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<std::io::Result<usize>> {
|
||||||
|
self.written = self.written.saturating_add(buf.len());
|
||||||
|
Poll::Ready(Ok(buf.len()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[ignore = "manual benchmark: throughput-sensitive and host-dependent"]
|
||||||
|
async fn masking_copy_with_idle_timeout_manual_throughput() {
|
||||||
|
let mut reader = PatternReader::new(PERF_TOTAL_BYTES, MASK_BUFFER_MAX_SIZE);
|
||||||
|
let mut writer = CountingWriter::default();
|
||||||
|
let started = Instant::now();
|
||||||
|
|
||||||
|
let outcome = copy_with_idle_timeout(
|
||||||
|
&mut reader,
|
||||||
|
&mut writer,
|
||||||
|
PERF_TOTAL_BYTES,
|
||||||
|
true,
|
||||||
|
Duration::from_secs(30),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let elapsed = started.elapsed();
|
||||||
|
let mb = PERF_TOTAL_BYTES as f64 / (1024.0 * 1024.0);
|
||||||
|
let mbps = mb / elapsed.as_secs_f64();
|
||||||
|
|
||||||
|
assert_eq!(outcome.total, PERF_TOTAL_BYTES);
|
||||||
|
assert_eq!(writer.written, PERF_TOTAL_BYTES);
|
||||||
|
assert!(
|
||||||
|
!outcome.ended_by_eof,
|
||||||
|
"manual throughput run should terminate at byte cap"
|
||||||
|
);
|
||||||
|
|
||||||
|
eprintln!(
|
||||||
|
"masking manual throughput: bytes={} elapsed_ms={} mib_per_sec={:.2} read_calls={}",
|
||||||
|
PERF_TOTAL_BYTES,
|
||||||
|
elapsed.as_millis(),
|
||||||
|
mbps,
|
||||||
|
reader.read_calls()
|
||||||
|
);
|
||||||
|
}
|
||||||
@@ -317,7 +317,7 @@ fn encode_secure(frame: &Frame, dst: &mut BytesMut, rng: &SecureRandom) -> io::R
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate padding that keeps total length non-divisible by 4.
|
// Telegram Desktop VersionD uses a 4-bit random padding length.
|
||||||
let padding_len = secure_padding_len(data.len(), rng);
|
let padding_len = secure_padding_len(data.len(), rng);
|
||||||
|
|
||||||
let total_len = data.len() + padding_len;
|
let total_len = data.len() + padding_len;
|
||||||
@@ -523,6 +523,16 @@ mod tests {
|
|||||||
use tokio::io::duplex;
|
use tokio::io::duplex;
|
||||||
use tokio_util::codec::{FramedRead, FramedWrite};
|
use tokio_util::codec::{FramedRead, FramedWrite};
|
||||||
|
|
||||||
|
fn assert_secure_decoded_payload(decoded: &[u8], original: &[u8]) {
|
||||||
|
assert!(decoded.starts_with(original));
|
||||||
|
assert!(
|
||||||
|
(original.len()..=original.len() + 12).contains(&decoded.len()),
|
||||||
|
"Secure decoded payload may retain up to 12 bytes of full-word padding, got {}",
|
||||||
|
decoded.len()
|
||||||
|
);
|
||||||
|
assert_eq!(decoded.len() % 4, 0);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_framed_abridged() {
|
async fn test_framed_abridged() {
|
||||||
let (client, server) = duplex(4096);
|
let (client, server) = duplex(4096);
|
||||||
@@ -565,7 +575,7 @@ mod tests {
|
|||||||
writer.send(frame).await.unwrap();
|
writer.send(frame).await.unwrap();
|
||||||
|
|
||||||
let received = reader.next().await.unwrap().unwrap();
|
let received = reader.next().await.unwrap().unwrap();
|
||||||
assert_eq!(&received.data[..], &original[..]);
|
assert_secure_decoded_payload(&received.data, &original);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -588,7 +598,11 @@ mod tests {
|
|||||||
writer.send(frame).await.unwrap();
|
writer.send(frame).await.unwrap();
|
||||||
|
|
||||||
let received = reader.next().await.unwrap().unwrap();
|
let received = reader.next().await.unwrap().unwrap();
|
||||||
assert_eq!(received.data.len(), 8);
|
if proto_tag == ProtoTag::Secure {
|
||||||
|
assert_secure_decoded_payload(&received.data, &original);
|
||||||
|
} else {
|
||||||
|
assert_eq!(received.data.len(), original.len());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -642,7 +656,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn secure_codec_always_adds_padding_and_jitters_wire_length() {
|
fn secure_codec_uses_tdesktop_padding_range_and_jitters_wire_length() {
|
||||||
let codec = SecureCodec::new(Arc::new(SecureRandom::new()));
|
let codec = SecureCodec::new(Arc::new(SecureRandom::new()));
|
||||||
let payload = Bytes::from_static(&[1, 2, 3, 4, 5, 6, 7, 8]);
|
let payload = Bytes::from_static(&[1, 2, 3, 4, 5, 6, 7, 8]);
|
||||||
let mut wire_lens = HashSet::new();
|
let mut wire_lens = HashSet::new();
|
||||||
@@ -652,13 +666,12 @@ mod tests {
|
|||||||
let mut out = BytesMut::new();
|
let mut out = BytesMut::new();
|
||||||
codec.encode(&frame, &mut out).unwrap();
|
codec.encode(&frame, &mut out).unwrap();
|
||||||
|
|
||||||
assert!(out.len() > 4 + payload.len());
|
|
||||||
let wire_len = u32::from_le_bytes([out[0], out[1], out[2], out[3]]) as usize;
|
let wire_len = u32::from_le_bytes([out[0], out[1], out[2], out[3]]) as usize;
|
||||||
|
assert_eq!(out.len(), 4 + wire_len);
|
||||||
assert!(
|
assert!(
|
||||||
(payload.len() + 1..=payload.len() + 3).contains(&wire_len),
|
(payload.len()..=payload.len() + 15).contains(&wire_len),
|
||||||
"Secure wire length must be payload+1..3, got {wire_len}"
|
"Secure wire length must be payload+0..15, got {wire_len}"
|
||||||
);
|
);
|
||||||
assert_ne!(wire_len % 4, 0, "Secure wire length must be non-4-aligned");
|
|
||||||
wire_lens.insert(wire_len);
|
wire_lens.insert(wire_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -311,7 +311,7 @@ impl<W: AsyncWrite + Unpin> SecureIntermediateFrameWriter<W> {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add padding so total length is never divisible by 4 (MTProto Secure)
|
// Telegram Desktop VersionD uses a 4-bit random padding length.
|
||||||
let padding_len = secure_padding_len(data.len(), &self.rng);
|
let padding_len = secure_padding_len(data.len(), &self.rng);
|
||||||
let padding = self.rng.bytes(padding_len);
|
let padding = self.rng.bytes(padding_len);
|
||||||
|
|
||||||
@@ -559,6 +559,16 @@ mod tests {
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::io::duplex;
|
use tokio::io::duplex;
|
||||||
|
|
||||||
|
fn assert_secure_decoded_payload(decoded: &[u8], original: &[u8]) {
|
||||||
|
assert!(decoded.starts_with(original));
|
||||||
|
assert!(
|
||||||
|
(original.len()..=original.len() + 12).contains(&decoded.len()),
|
||||||
|
"Secure decoded payload may retain up to 12 bytes of full-word padding, got {}",
|
||||||
|
decoded.len()
|
||||||
|
);
|
||||||
|
assert_eq!(decoded.len() % 4, 0);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_abridged_roundtrip() {
|
async fn test_abridged_roundtrip() {
|
||||||
let (client, server) = duplex(1024);
|
let (client, server) = duplex(1024);
|
||||||
@@ -625,7 +635,7 @@ mod tests {
|
|||||||
writer.flush().await.unwrap();
|
writer.flush().await.unwrap();
|
||||||
|
|
||||||
let (received, _meta) = reader.read_frame().await.unwrap();
|
let (received, _meta) = reader.read_frame().await.unwrap();
|
||||||
assert_eq!(received.len(), data.len());
|
assert_secure_decoded_payload(&received, &data);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use tokio::time::timeout;
|
|||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
use crate::config::MeSocksKdfPolicy;
|
use crate::config::MeSocksKdfPolicy;
|
||||||
use crate::crypto::{SecureRandom, build_middleproxy_prekey, derive_middleproxy_keys, sha256};
|
use crate::crypto::{SecureRandom, derive_middleproxy_keys};
|
||||||
use crate::error::{ProxyError, Result};
|
use crate::error::{ProxyError, Result};
|
||||||
use crate::network::IpFamily;
|
use crate::network::IpFamily;
|
||||||
use crate::network::probe::is_bogon;
|
use crate::network::probe::is_bogon;
|
||||||
@@ -292,14 +292,15 @@ impl MePool {
|
|||||||
BndPortStatus::Error
|
BndPortStatus::Error
|
||||||
};
|
};
|
||||||
record_bnd_status(bnd_addr_status, bnd_port_status, raw_socks_bound_addr);
|
record_bnd_status(bnd_addr_status, bnd_port_status, raw_socks_bound_addr);
|
||||||
let reflected = if let Some(bound) = socks_bound_addr {
|
let socks_bound_kdf_addr = socks_bound_addr.filter(|bound| bound.port() != 0);
|
||||||
|
let reflected = if let Some(bound) = socks_bound_kdf_addr {
|
||||||
Some(bound)
|
Some(bound)
|
||||||
} else if is_socks_route {
|
} else if is_socks_route {
|
||||||
match self.socks_kdf_policy() {
|
match self.socks_kdf_policy() {
|
||||||
MeSocksKdfPolicy::Strict => {
|
MeSocksKdfPolicy::Strict => {
|
||||||
self.stats.increment_me_socks_kdf_strict_reject();
|
self.stats.increment_me_socks_kdf_strict_reject();
|
||||||
return Err(ProxyError::InvalidHandshake(
|
return Err(ProxyError::InvalidHandshake(
|
||||||
"SOCKS route returned no valid BND.ADDR for ME KDF (strict policy)"
|
"SOCKS route returned no valid BND tuple for ME KDF (strict policy)"
|
||||||
.to_string(),
|
.to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
@@ -323,16 +324,14 @@ impl MePool {
|
|||||||
let local_addr_nat = self.translate_our_addr_with_reflection(local_addr, reflected);
|
let local_addr_nat = self.translate_our_addr_with_reflection(local_addr, reflected);
|
||||||
let peer_addr_nat =
|
let peer_addr_nat =
|
||||||
SocketAddr::new(self.translate_ip_for_nat(peer_addr.ip()), peer_addr.port());
|
SocketAddr::new(self.translate_ip_for_nat(peer_addr.ip()), peer_addr.port());
|
||||||
|
let client_addr_for_kdf = socks_bound_kdf_addr.unwrap_or(local_addr_nat);
|
||||||
if let Some(upstream_info) = upstream_egress {
|
if let Some(upstream_info) = upstream_egress {
|
||||||
let client_ip_for_kdf = socks_bound_addr
|
|
||||||
.map(|value| value.ip())
|
|
||||||
.unwrap_or(local_addr_nat.ip());
|
|
||||||
record_upstream_bnd_status(
|
record_upstream_bnd_status(
|
||||||
upstream_info.upstream_id,
|
upstream_info.upstream_id,
|
||||||
bnd_addr_status,
|
bnd_addr_status,
|
||||||
bnd_port_status,
|
bnd_port_status,
|
||||||
raw_socks_bound_addr,
|
raw_socks_bound_addr,
|
||||||
Some(client_ip_for_kdf),
|
Some(client_addr_for_kdf.ip()),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
let (mut rd, mut wr) = tokio::io::split(stream);
|
let (mut rd, mut wr) = tokio::io::split(stream);
|
||||||
@@ -409,6 +408,7 @@ impl MePool {
|
|||||||
info!(
|
info!(
|
||||||
%local_addr,
|
%local_addr,
|
||||||
%local_addr_nat,
|
%local_addr_nat,
|
||||||
|
%client_addr_for_kdf,
|
||||||
reflected_ip = reflected.map(|r| r.ip()).as_ref().map(ToString::to_string),
|
reflected_ip = reflected.map(|r| r.ip()).as_ref().map(ToString::to_string),
|
||||||
%peer_addr,
|
%peer_addr,
|
||||||
%transport_peer_addr,
|
%transport_peer_addr,
|
||||||
@@ -422,16 +422,14 @@ impl MePool {
|
|||||||
|
|
||||||
let ts_bytes = crypto_ts.to_le_bytes();
|
let ts_bytes = crypto_ts.to_le_bytes();
|
||||||
let server_port_bytes = peer_addr_nat.port().to_le_bytes();
|
let server_port_bytes = peer_addr_nat.port().to_le_bytes();
|
||||||
let socks_bound_port = socks_bound_addr
|
let socks_bound_port = socks_bound_kdf_addr.map(|bound| bound.port());
|
||||||
.map(|bound| bound.port())
|
let client_port_for_kdf = client_addr_for_kdf.port();
|
||||||
.filter(|port| *port != 0);
|
|
||||||
let client_port_for_kdf = socks_bound_port.unwrap_or(local_addr_nat.port());
|
|
||||||
let client_port_source = KdfClientPortSource::from_socks_bound_port(socks_bound_port);
|
let client_port_source = KdfClientPortSource::from_socks_bound_port(socks_bound_port);
|
||||||
let kdf_fingerprint = Self::kdf_material_fingerprint(
|
let kdf_fingerprint = Self::kdf_material_fingerprint(
|
||||||
local_addr_nat.ip(),
|
client_addr_for_kdf.ip(),
|
||||||
peer_addr_nat,
|
peer_addr_nat,
|
||||||
reflected.map(|value| value.ip()),
|
reflected.map(|value| value.ip()),
|
||||||
socks_bound_addr.map(|value| value.ip()),
|
socks_bound_kdf_addr.map(|value| value.ip()),
|
||||||
client_port_source,
|
client_port_source,
|
||||||
);
|
);
|
||||||
let previous_kdf_fingerprint = {
|
let previous_kdf_fingerprint = {
|
||||||
@@ -473,7 +471,7 @@ impl MePool {
|
|||||||
let client_port_bytes = client_port_for_kdf.to_le_bytes();
|
let client_port_bytes = client_port_for_kdf.to_le_bytes();
|
||||||
|
|
||||||
let server_ip = extract_ip_material(peer_addr_nat);
|
let server_ip = extract_ip_material(peer_addr_nat);
|
||||||
let client_ip = extract_ip_material(local_addr_nat);
|
let client_ip = extract_ip_material(client_addr_for_kdf);
|
||||||
|
|
||||||
let (srv_ip_opt, clt_ip_opt, clt_v6_opt, srv_v6_opt, hs_our_ip, hs_peer_ip) =
|
let (srv_ip_opt, clt_ip_opt, clt_v6_opt, srv_v6_opt, hs_our_ip, hs_peer_ip) =
|
||||||
match (server_ip, client_ip) {
|
match (server_ip, client_ip) {
|
||||||
@@ -494,38 +492,6 @@ impl MePool {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let diag_level: u8 = std::env::var("ME_DIAG")
|
|
||||||
.ok()
|
|
||||||
.and_then(|v| v.parse().ok())
|
|
||||||
.unwrap_or(0);
|
|
||||||
|
|
||||||
let prekey_client = build_middleproxy_prekey(
|
|
||||||
&srv_nonce,
|
|
||||||
&my_nonce,
|
|
||||||
&ts_bytes,
|
|
||||||
srv_ip_opt.as_ref().map(|x| &x[..]),
|
|
||||||
&client_port_bytes,
|
|
||||||
b"CLIENT",
|
|
||||||
clt_ip_opt.as_ref().map(|x| &x[..]),
|
|
||||||
&server_port_bytes,
|
|
||||||
&secret,
|
|
||||||
clt_v6_opt.as_ref(),
|
|
||||||
srv_v6_opt.as_ref(),
|
|
||||||
);
|
|
||||||
let prekey_server = build_middleproxy_prekey(
|
|
||||||
&srv_nonce,
|
|
||||||
&my_nonce,
|
|
||||||
&ts_bytes,
|
|
||||||
srv_ip_opt.as_ref().map(|x| &x[..]),
|
|
||||||
&client_port_bytes,
|
|
||||||
b"SERVER",
|
|
||||||
clt_ip_opt.as_ref().map(|x| &x[..]),
|
|
||||||
&server_port_bytes,
|
|
||||||
&secret,
|
|
||||||
clt_v6_opt.as_ref(),
|
|
||||||
srv_v6_opt.as_ref(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let (wk, wi) = derive_middleproxy_keys(
|
let (wk, wi) = derive_middleproxy_keys(
|
||||||
&srv_nonce,
|
&srv_nonce,
|
||||||
&my_nonce,
|
&my_nonce,
|
||||||
@@ -556,47 +522,14 @@ impl MePool {
|
|||||||
let requested_crc_mode = RpcChecksumMode::Crc32c;
|
let requested_crc_mode = RpcChecksumMode::Crc32c;
|
||||||
let hs_payload = build_handshake_payload(
|
let hs_payload = build_handshake_payload(
|
||||||
hs_our_ip,
|
hs_our_ip,
|
||||||
local_addr.port(),
|
client_port_for_kdf,
|
||||||
hs_peer_ip,
|
hs_peer_ip,
|
||||||
peer_addr.port(),
|
peer_addr_nat.port(),
|
||||||
requested_crc_mode.advertised_flags(),
|
requested_crc_mode.advertised_flags(),
|
||||||
);
|
);
|
||||||
let hs_frame = build_rpc_frame(-1, &hs_payload, RpcChecksumMode::Crc32);
|
let hs_frame = build_rpc_frame(-1, &hs_payload, RpcChecksumMode::Crc32);
|
||||||
if diag_level >= 1 {
|
|
||||||
info!(
|
|
||||||
write_key = %hex_dump(&wk),
|
|
||||||
write_iv = %hex_dump(&wi),
|
|
||||||
read_key = %hex_dump(&rk),
|
|
||||||
read_iv = %hex_dump(&ri),
|
|
||||||
srv_ip = %srv_ip_opt.map(|ip| hex_dump(&ip)).unwrap_or_default(),
|
|
||||||
clt_ip = %clt_ip_opt.map(|ip| hex_dump(&ip)).unwrap_or_default(),
|
|
||||||
srv_port = %hex_dump(&server_port_bytes),
|
|
||||||
clt_port = %hex_dump(&client_port_bytes),
|
|
||||||
crypto_ts = %hex_dump(&ts_bytes),
|
|
||||||
nonce_srv = %hex_dump(&srv_nonce),
|
|
||||||
nonce_clt = %hex_dump(&my_nonce),
|
|
||||||
prekey_sha256_client = %hex_dump(&sha256(&prekey_client)),
|
|
||||||
prekey_sha256_server = %hex_dump(&sha256(&prekey_server)),
|
|
||||||
hs_plain = %hex_dump(&hs_frame),
|
|
||||||
proxy_secret_sha256 = %hex_dump(&sha256(&secret)),
|
|
||||||
"ME diag: derived keys and handshake plaintext"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
if diag_level >= 2 {
|
|
||||||
info!(
|
|
||||||
prekey_client = %hex_dump(&prekey_client),
|
|
||||||
prekey_server = %hex_dump(&prekey_server),
|
|
||||||
"ME diag: full prekey buffers"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
let (encrypted_hs, write_iv) = cbc_encrypt_padded(&wk, &wi, &hs_frame)?;
|
let (encrypted_hs, write_iv) = cbc_encrypt_padded(&wk, &wi, &hs_frame)?;
|
||||||
if diag_level >= 1 {
|
|
||||||
info!(
|
|
||||||
hs_cipher = %hex_dump(&encrypted_hs),
|
|
||||||
"ME diag: handshake ciphertext"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
wr.write_all(&encrypted_hs).await.map_err(ProxyError::Io)?;
|
wr.write_all(&encrypted_hs).await.map_err(ProxyError::Io)?;
|
||||||
wr.flush().await.map_err(ProxyError::Io)?;
|
wr.flush().await.map_err(ProxyError::Io)?;
|
||||||
|
|
||||||
|
|||||||
@@ -1728,6 +1728,8 @@ mod tests {
|
|||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
Vec::new(),
|
Vec::new(),
|
||||||
|
false,
|
||||||
|
Vec::new(),
|
||||||
1,
|
1,
|
||||||
None,
|
None,
|
||||||
12,
|
12,
|
||||||
|
|||||||
@@ -336,6 +336,8 @@ pub(super) struct NatRuntimeCore {
|
|||||||
pub(super) nat_probe: bool,
|
pub(super) nat_probe: bool,
|
||||||
pub(super) nat_stun: Option<String>,
|
pub(super) nat_stun: Option<String>,
|
||||||
pub(super) nat_stun_servers: Vec<String>,
|
pub(super) nat_stun_servers: Vec<String>,
|
||||||
|
pub(super) stun_tcp_fallback: bool,
|
||||||
|
pub(super) http_ip_detect_urls: Vec<String>,
|
||||||
pub(super) nat_stun_live_servers: Arc<RwLock<Vec<String>>>,
|
pub(super) nat_stun_live_servers: Arc<RwLock<Vec<String>>>,
|
||||||
pub(super) nat_probe_concurrency: usize,
|
pub(super) nat_probe_concurrency: usize,
|
||||||
pub(super) detected_ipv6: Option<Ipv6Addr>,
|
pub(super) detected_ipv6: Option<Ipv6Addr>,
|
||||||
@@ -484,6 +486,8 @@ impl MePool {
|
|||||||
nat_probe: bool,
|
nat_probe: bool,
|
||||||
nat_stun: Option<String>,
|
nat_stun: Option<String>,
|
||||||
nat_stun_servers: Vec<String>,
|
nat_stun_servers: Vec<String>,
|
||||||
|
stun_tcp_fallback: bool,
|
||||||
|
http_ip_detect_urls: Vec<String>,
|
||||||
nat_probe_concurrency: usize,
|
nat_probe_concurrency: usize,
|
||||||
detected_ipv6: Option<Ipv6Addr>,
|
detected_ipv6: Option<Ipv6Addr>,
|
||||||
me_one_retry: u8,
|
me_one_retry: u8,
|
||||||
@@ -706,6 +710,8 @@ impl MePool {
|
|||||||
nat_probe,
|
nat_probe,
|
||||||
nat_stun,
|
nat_stun,
|
||||||
nat_stun_servers,
|
nat_stun_servers,
|
||||||
|
stun_tcp_fallback,
|
||||||
|
http_ip_detect_urls,
|
||||||
nat_stun_live_servers: Arc::new(RwLock::new(Vec::new())),
|
nat_stun_live_servers: Arc::new(RwLock::new(Vec::new())),
|
||||||
nat_probe_concurrency: nat_probe_concurrency.max(1),
|
nat_probe_concurrency: nat_probe_concurrency.max(1),
|
||||||
detected_ipv6,
|
detected_ipv6,
|
||||||
|
|||||||
@@ -1,19 +1,22 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::net::{IpAddr, Ipv4Addr};
|
use std::net::IpAddr;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use tokio::task::JoinSet;
|
use tokio::task::JoinSet;
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info};
|
||||||
|
|
||||||
use crate::error::{ProxyError, Result};
|
use crate::error::{ProxyError, Result};
|
||||||
use crate::network::probe::is_bogon;
|
use crate::network::probe::{detect_public_ipv4_http, is_bogon};
|
||||||
use crate::network::stun::{IpFamily, stun_probe_dual, stun_probe_family_with_bind};
|
use crate::network::stun::{
|
||||||
|
IpFamily, stun_probe_dual_with_tcp_fallback, stun_probe_family_with_bind_and_tcp_fallback,
|
||||||
|
};
|
||||||
|
|
||||||
use super::MePool;
|
use super::MePool;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
const STUN_BATCH_TIMEOUT: Duration = Duration::from_secs(5);
|
const STUN_BATCH_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
|
const STUN_BATCH_TCP_FALLBACK_TIMEOUT: Duration = Duration::from_secs(12);
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub async fn stun_probe(stun_addr: Option<String>) -> Result<crate::network::stun::DualStunResult> {
|
pub async fn stun_probe(stun_addr: Option<String>) -> Result<crate::network::stun::DualStunResult> {
|
||||||
@@ -28,16 +31,13 @@ pub async fn stun_probe(stun_addr: Option<String>) -> Result<crate::network::stu
|
|||||||
"STUN server is not configured".to_string(),
|
"STUN server is not configured".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
stun_probe_dual(&stun_addr).await
|
stun_probe_dual_with_tcp_fallback(&stun_addr, false).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub async fn detect_public_ip() -> Option<IpAddr> {
|
pub async fn detect_public_ip() -> Option<IpAddr> {
|
||||||
fetch_public_ipv4_with_retry()
|
let urls = crate::config::defaults::default_http_ip_detect_urls();
|
||||||
.await
|
detect_public_ipv4_http(&urls).await.map(IpAddr::V4)
|
||||||
.ok()
|
|
||||||
.flatten()
|
|
||||||
.map(IpAddr::V4)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MePool {
|
impl MePool {
|
||||||
@@ -65,15 +65,26 @@ impl MePool {
|
|||||||
let mut live_servers = Vec::new();
|
let mut live_servers = Vec::new();
|
||||||
let mut best_by_ip: HashMap<IpAddr, (usize, std::net::SocketAddr)> = HashMap::new();
|
let mut best_by_ip: HashMap<IpAddr, (usize, std::net::SocketAddr)> = HashMap::new();
|
||||||
let concurrency = self.nat_runtime.nat_probe_concurrency.max(1);
|
let concurrency = self.nat_runtime.nat_probe_concurrency.max(1);
|
||||||
|
let tcp_fallback = self.nat_runtime.stun_tcp_fallback;
|
||||||
|
|
||||||
while next_idx < servers.len() || !join_set.is_empty() {
|
while next_idx < servers.len() || !join_set.is_empty() {
|
||||||
while next_idx < servers.len() && join_set.len() < concurrency {
|
while next_idx < servers.len() && join_set.len() < concurrency {
|
||||||
let stun_addr = servers[next_idx].clone();
|
let stun_addr = servers[next_idx].clone();
|
||||||
next_idx += 1;
|
next_idx += 1;
|
||||||
join_set.spawn(async move {
|
join_set.spawn(async move {
|
||||||
|
let batch_timeout = if tcp_fallback {
|
||||||
|
STUN_BATCH_TCP_FALLBACK_TIMEOUT
|
||||||
|
} else {
|
||||||
|
STUN_BATCH_TIMEOUT
|
||||||
|
};
|
||||||
let res = timeout(
|
let res = timeout(
|
||||||
STUN_BATCH_TIMEOUT,
|
batch_timeout,
|
||||||
stun_probe_family_with_bind(&stun_addr, family, bind_ip),
|
stun_probe_family_with_bind_and_tcp_fallback(
|
||||||
|
&stun_addr,
|
||||||
|
family,
|
||||||
|
bind_ip,
|
||||||
|
tcp_fallback,
|
||||||
|
),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
(stun_addr, res)
|
(stun_addr, res)
|
||||||
@@ -193,6 +204,10 @@ impl MePool {
|
|||||||
return self.nat_runtime.nat_ip_cfg;
|
return self.nat_runtime.nat_ip_cfg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !self.nat_runtime.nat_probe {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
if !(is_bogon(local_ip) || local_ip.is_loopback() || local_ip.is_unspecified()) {
|
if !(is_bogon(local_ip) || local_ip.is_loopback() || local_ip.is_unspecified()) {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
@@ -201,8 +216,9 @@ impl MePool {
|
|||||||
return Some(ip);
|
return Some(ip);
|
||||||
}
|
}
|
||||||
|
|
||||||
match fetch_public_ipv4_with_retry().await {
|
let Some(ip) = detect_public_ipv4_http(&self.nat_runtime.http_ip_detect_urls).await else {
|
||||||
Ok(Some(ip)) => {
|
return None;
|
||||||
|
};
|
||||||
{
|
{
|
||||||
let mut guard = self.nat_runtime.nat_ip_detected.write().await;
|
let mut guard = self.nat_runtime.nat_ip_detected.write().await;
|
||||||
*guard = Some(IpAddr::V4(ip));
|
*guard = Some(IpAddr::V4(ip));
|
||||||
@@ -210,13 +226,6 @@ impl MePool {
|
|||||||
info!(public_ip = %ip, "Auto-detected public IP for NAT translation");
|
info!(public_ip = %ip, "Auto-detected public IP for NAT translation");
|
||||||
Some(IpAddr::V4(ip))
|
Some(IpAddr::V4(ip))
|
||||||
}
|
}
|
||||||
Ok(None) => None,
|
|
||||||
Err(e) => {
|
|
||||||
warn!(error = %e, "Failed to auto-detect public IP");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) async fn maybe_reflect_public_addr(
|
pub(super) async fn maybe_reflect_public_addr(
|
||||||
&self,
|
&self,
|
||||||
@@ -365,31 +374,3 @@ impl MePool {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_public_ipv4_with_retry() -> Result<Option<Ipv4Addr>> {
|
|
||||||
let providers = [
|
|
||||||
"https://checkip.amazonaws.com",
|
|
||||||
"http://v4.ident.me",
|
|
||||||
"http://ipv4.icanhazip.com",
|
|
||||||
];
|
|
||||||
for url in providers {
|
|
||||||
if let Ok(Some(ip)) = fetch_public_ipv4_once(url).await {
|
|
||||||
return Ok(Some(ip));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn fetch_public_ipv4_once(url: &str) -> Result<Option<Ipv4Addr>> {
|
|
||||||
let res = reqwest::get(url)
|
|
||||||
.await
|
|
||||||
.map_err(|e| ProxyError::Proxy(format!("public IP detection request failed: {e}")))?;
|
|
||||||
|
|
||||||
let text = res
|
|
||||||
.text()
|
|
||||||
.await
|
|
||||||
.map_err(|e| ProxyError::Proxy(format!("public IP detection read failed: {e}")))?;
|
|
||||||
|
|
||||||
let ip = text.trim().parse().ok();
|
|
||||||
Ok(ip)
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -38,6 +38,8 @@ async fn make_pool(
|
|||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
Vec::new(),
|
Vec::new(),
|
||||||
|
false,
|
||||||
|
Vec::new(),
|
||||||
1,
|
1,
|
||||||
None,
|
None,
|
||||||
12,
|
12,
|
||||||
|
|||||||
@@ -36,6 +36,8 @@ async fn make_pool(
|
|||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
Vec::new(),
|
Vec::new(),
|
||||||
|
false,
|
||||||
|
Vec::new(),
|
||||||
1,
|
1,
|
||||||
None,
|
None,
|
||||||
12,
|
12,
|
||||||
|
|||||||
@@ -31,6 +31,8 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
|
|||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
Vec::new(),
|
Vec::new(),
|
||||||
|
false,
|
||||||
|
Vec::new(),
|
||||||
1,
|
1,
|
||||||
None,
|
None,
|
||||||
12,
|
12,
|
||||||
|
|||||||
@@ -20,6 +20,8 @@ async fn make_pool() -> Arc<MePool> {
|
|||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
Vec::new(),
|
Vec::new(),
|
||||||
|
false,
|
||||||
|
Vec::new(),
|
||||||
1,
|
1,
|
||||||
None,
|
None,
|
||||||
12,
|
12,
|
||||||
|
|||||||
@@ -25,6 +25,8 @@ async fn make_pool() -> Arc<MePool> {
|
|||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
Vec::new(),
|
Vec::new(),
|
||||||
|
false,
|
||||||
|
Vec::new(),
|
||||||
1,
|
1,
|
||||||
None,
|
None,
|
||||||
12,
|
12,
|
||||||
|
|||||||
@@ -31,6 +31,8 @@ async fn make_pool() -> (Arc<MePool>, Arc<SecureRandom>) {
|
|||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
Vec::new(),
|
Vec::new(),
|
||||||
|
false,
|
||||||
|
Vec::new(),
|
||||||
1,
|
1,
|
||||||
None,
|
None,
|
||||||
12,
|
12,
|
||||||
|
|||||||
Reference in New Issue
Block a user