mirror of https://github.com/telemt/telemt.git
Drafting Fake-TLS V2
This commit is contained in:
parent
af35ad3923
commit
5e98b35fb7
|
|
@ -23,6 +23,10 @@ pub(crate) fn default_fake_cert_len() -> usize {
|
||||||
2048
|
2048
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_tls_front_dir() -> String {
|
||||||
|
"tlsfront".to_string()
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_replay_check_len() -> usize {
|
pub(crate) fn default_replay_check_len() -> usize {
|
||||||
65_536
|
65_536
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -163,6 +163,21 @@ impl ProxyConfig {
|
||||||
config.censorship.mask_host = Some(config.censorship.tls_domain.clone());
|
config.censorship.mask_host = Some(config.censorship.tls_domain.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Merge primary + extra TLS domains, deduplicate (primary always first).
|
||||||
|
if !config.censorship.tls_domains.is_empty() {
|
||||||
|
let mut all = Vec::with_capacity(1 + config.censorship.tls_domains.len());
|
||||||
|
all.push(config.censorship.tls_domain.clone());
|
||||||
|
for d in std::mem::take(&mut config.censorship.tls_domains) {
|
||||||
|
if !d.is_empty() && !all.contains(&d) {
|
||||||
|
all.push(d);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// keep primary as tls_domain; store remaining back to tls_domains
|
||||||
|
if all.len() > 1 {
|
||||||
|
config.censorship.tls_domains = all[1..].to_vec();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Migration: prefer_ipv6 -> network.prefer.
|
// Migration: prefer_ipv6 -> network.prefer.
|
||||||
if config.general.prefer_ipv6 {
|
if config.general.prefer_ipv6 {
|
||||||
if config.network.prefer == 4 {
|
if config.network.prefer == 4 {
|
||||||
|
|
@ -180,7 +195,7 @@ impl ProxyConfig {
|
||||||
validate_network_cfg(&mut config.network)?;
|
validate_network_cfg(&mut config.network)?;
|
||||||
|
|
||||||
// Random fake_cert_len only when default is in use.
|
// Random fake_cert_len only when default is in use.
|
||||||
if config.censorship.fake_cert_len == default_fake_cert_len() {
|
if !config.censorship.tls_emulation && config.censorship.fake_cert_len == default_fake_cert_len() {
|
||||||
config.censorship.fake_cert_len = rand::rng().gen_range(1024..4096);
|
config.censorship.fake_cert_len = rand::rng().gen_range(1024..4096);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -235,7 +250,7 @@ impl ProxyConfig {
|
||||||
// Migration: Populate upstreams if empty (Default Direct).
|
// Migration: Populate upstreams if empty (Default Direct).
|
||||||
if config.upstreams.is_empty() {
|
if config.upstreams.is_empty() {
|
||||||
config.upstreams.push(UpstreamConfig {
|
config.upstreams.push(UpstreamConfig {
|
||||||
upstream_type: UpstreamType::Direct { interface: None },
|
upstream_type: UpstreamType::Direct { interface: None, bind_addresses: None },
|
||||||
weight: 1,
|
weight: 1,
|
||||||
enabled: true,
|
enabled: true,
|
||||||
scopes: String::new(),
|
scopes: String::new(),
|
||||||
|
|
|
||||||
|
|
@ -295,6 +295,11 @@ pub struct ServerConfig {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub listen_tcp: Option<bool>,
|
pub listen_tcp: Option<bool>,
|
||||||
|
|
||||||
|
/// Accept HAProxy PROXY protocol headers on incoming connections.
|
||||||
|
/// When enabled, real client IPs are extracted from PROXY v1/v2 headers.
|
||||||
|
#[serde(default)]
|
||||||
|
pub proxy_protocol: bool,
|
||||||
|
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub metrics_port: Option<u16>,
|
pub metrics_port: Option<u16>,
|
||||||
|
|
||||||
|
|
@ -314,6 +319,7 @@ impl Default for ServerConfig {
|
||||||
listen_unix_sock: None,
|
listen_unix_sock: None,
|
||||||
listen_unix_sock_perm: None,
|
listen_unix_sock_perm: None,
|
||||||
listen_tcp: None,
|
listen_tcp: None,
|
||||||
|
proxy_protocol: false,
|
||||||
metrics_port: None,
|
metrics_port: None,
|
||||||
metrics_whitelist: default_metrics_whitelist(),
|
metrics_whitelist: default_metrics_whitelist(),
|
||||||
listeners: Vec::new(),
|
listeners: Vec::new(),
|
||||||
|
|
@ -362,6 +368,10 @@ pub struct AntiCensorshipConfig {
|
||||||
#[serde(default = "default_tls_domain")]
|
#[serde(default = "default_tls_domain")]
|
||||||
pub tls_domain: String,
|
pub tls_domain: String,
|
||||||
|
|
||||||
|
/// Additional TLS domains for generating multiple proxy links.
|
||||||
|
#[serde(default)]
|
||||||
|
pub tls_domains: Vec<String>,
|
||||||
|
|
||||||
#[serde(default = "default_true")]
|
#[serde(default = "default_true")]
|
||||||
pub mask: bool,
|
pub mask: bool,
|
||||||
|
|
||||||
|
|
@ -376,17 +386,28 @@ pub struct AntiCensorshipConfig {
|
||||||
|
|
||||||
#[serde(default = "default_fake_cert_len")]
|
#[serde(default = "default_fake_cert_len")]
|
||||||
pub fake_cert_len: usize,
|
pub fake_cert_len: usize,
|
||||||
|
|
||||||
|
/// Enable TLS certificate emulation using cached real certificates.
|
||||||
|
#[serde(default)]
|
||||||
|
pub tls_emulation: bool,
|
||||||
|
|
||||||
|
/// Directory to store TLS front cache (on disk).
|
||||||
|
#[serde(default = "default_tls_front_dir")]
|
||||||
|
pub tls_front_dir: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for AntiCensorshipConfig {
|
impl Default for AntiCensorshipConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
tls_domain: default_tls_domain(),
|
tls_domain: default_tls_domain(),
|
||||||
|
tls_domains: Vec::new(),
|
||||||
mask: true,
|
mask: true,
|
||||||
mask_host: None,
|
mask_host: None,
|
||||||
mask_port: default_mask_port(),
|
mask_port: default_mask_port(),
|
||||||
mask_unix_sock: None,
|
mask_unix_sock: None,
|
||||||
fake_cert_len: default_fake_cert_len(),
|
fake_cert_len: default_fake_cert_len(),
|
||||||
|
tls_emulation: false,
|
||||||
|
tls_front_dir: default_tls_front_dir(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -446,6 +467,8 @@ pub enum UpstreamType {
|
||||||
Direct {
|
Direct {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
interface: Option<String>,
|
interface: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
bind_addresses: Option<Vec<String>>,
|
||||||
},
|
},
|
||||||
Socks4 {
|
Socks4 {
|
||||||
address: String,
|
address: String,
|
||||||
|
|
|
||||||
12
src/main.rs
12
src/main.rs
|
|
@ -129,13 +129,23 @@ fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if config.general.modes.tls {
|
if config.general.modes.tls {
|
||||||
let domain_hex = hex::encode(&config.censorship.tls_domain);
|
let mut domains = Vec::with_capacity(1 + config.censorship.tls_domains.len());
|
||||||
|
domains.push(config.censorship.tls_domain.clone());
|
||||||
|
for d in &config.censorship.tls_domains {
|
||||||
|
if !domains.contains(d) {
|
||||||
|
domains.push(d.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for domain in domains {
|
||||||
|
let domain_hex = hex::encode(&domain);
|
||||||
info!(
|
info!(
|
||||||
target: "telemt::links",
|
target: "telemt::links",
|
||||||
" EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}",
|
" EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}",
|
||||||
host, port, secret, domain_hex
|
host, port, secret, domain_hex
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
warn!(target: "telemt::links", "User '{}' in show_link not found", user_name);
|
warn!(target: "telemt::links", "User '{}' in show_link not found", user_name);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ use crate::protocol::tls;
|
||||||
use crate::stats::{ReplayChecker, Stats};
|
use crate::stats::{ReplayChecker, Stats};
|
||||||
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
|
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
|
||||||
use crate::transport::middle_proxy::MePool;
|
use crate::transport::middle_proxy::MePool;
|
||||||
use crate::transport::{UpstreamManager, configure_client_socket};
|
use crate::transport::{UpstreamManager, configure_client_socket, parse_proxy_protocol};
|
||||||
|
|
||||||
use crate::proxy::direct_relay::handle_via_direct;
|
use crate::proxy::direct_relay::handle_via_direct;
|
||||||
use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle_tls_handshake};
|
use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle_tls_handshake};
|
||||||
|
|
@ -53,7 +53,28 @@ where
|
||||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
stats.increment_connects_all();
|
stats.increment_connects_all();
|
||||||
debug!(peer = %peer, "New connection (generic stream)");
|
let mut real_peer = peer;
|
||||||
|
|
||||||
|
if config.server.proxy_protocol {
|
||||||
|
match parse_proxy_protocol(&mut stream, peer).await {
|
||||||
|
Ok(info) => {
|
||||||
|
debug!(
|
||||||
|
peer = %peer,
|
||||||
|
client = %info.src_addr,
|
||||||
|
version = info.version,
|
||||||
|
"PROXY protocol header parsed"
|
||||||
|
);
|
||||||
|
real_peer = info.src_addr;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
stats.increment_connects_bad();
|
||||||
|
warn!(peer = %peer, error = %e, "Invalid PROXY protocol header");
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!(peer = %real_peer, "New connection (generic stream)");
|
||||||
|
|
||||||
let handshake_timeout = Duration::from_secs(config.timeouts.client_handshake);
|
let handshake_timeout = Duration::from_secs(config.timeouts.client_handshake);
|
||||||
let stats_for_timeout = stats.clone();
|
let stats_for_timeout = stats.clone();
|
||||||
|
|
@ -69,13 +90,13 @@ where
|
||||||
stream.read_exact(&mut first_bytes).await?;
|
stream.read_exact(&mut first_bytes).await?;
|
||||||
|
|
||||||
let is_tls = tls::is_tls_handshake(&first_bytes[..3]);
|
let is_tls = tls::is_tls_handshake(&first_bytes[..3]);
|
||||||
debug!(peer = %peer, is_tls = is_tls, "Handshake type detected");
|
debug!(peer = %real_peer, is_tls = is_tls, "Handshake type detected");
|
||||||
|
|
||||||
if is_tls {
|
if is_tls {
|
||||||
let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize;
|
let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize;
|
||||||
|
|
||||||
if tls_len < 512 {
|
if tls_len < 512 {
|
||||||
debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short");
|
debug!(peer = %real_peer, tls_len = tls_len, "TLS handshake too short");
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad();
|
||||||
let (reader, writer) = tokio::io::split(stream);
|
let (reader, writer) = tokio::io::split(stream);
|
||||||
handle_bad_client(reader, writer, &first_bytes, &config).await;
|
handle_bad_client(reader, writer, &first_bytes, &config).await;
|
||||||
|
|
@ -89,7 +110,7 @@ where
|
||||||
let (read_half, write_half) = tokio::io::split(stream);
|
let (read_half, write_half) = tokio::io::split(stream);
|
||||||
|
|
||||||
let (mut tls_reader, tls_writer, _tls_user) = match handle_tls_handshake(
|
let (mut tls_reader, tls_writer, _tls_user) = match handle_tls_handshake(
|
||||||
&handshake, read_half, write_half, peer,
|
&handshake, read_half, write_half, real_peer,
|
||||||
&config, &replay_checker, &rng,
|
&config, &replay_checker, &rng,
|
||||||
).await {
|
).await {
|
||||||
HandshakeResult::Success(result) => result,
|
HandshakeResult::Success(result) => result,
|
||||||
|
|
@ -107,7 +128,7 @@ where
|
||||||
.map_err(|_| ProxyError::InvalidHandshake("Short MTProto handshake".into()))?;
|
.map_err(|_| ProxyError::InvalidHandshake("Short MTProto handshake".into()))?;
|
||||||
|
|
||||||
let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake(
|
let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake(
|
||||||
&mtproto_handshake, tls_reader, tls_writer, peer,
|
&mtproto_handshake, tls_reader, tls_writer, real_peer,
|
||||||
&config, &replay_checker, true,
|
&config, &replay_checker, true,
|
||||||
).await {
|
).await {
|
||||||
HandshakeResult::Success(result) => result,
|
HandshakeResult::Success(result) => result,
|
||||||
|
|
@ -123,12 +144,12 @@ where
|
||||||
RunningClientHandler::handle_authenticated_static(
|
RunningClientHandler::handle_authenticated_static(
|
||||||
crypto_reader, crypto_writer, success,
|
crypto_reader, crypto_writer, success,
|
||||||
upstream_manager, stats, config, buffer_pool, rng, me_pool,
|
upstream_manager, stats, config, buffer_pool, rng, me_pool,
|
||||||
local_addr, peer, ip_tracker.clone(),
|
local_addr, real_peer, ip_tracker.clone(),
|
||||||
),
|
),
|
||||||
)))
|
)))
|
||||||
} else {
|
} else {
|
||||||
if !config.general.modes.classic && !config.general.modes.secure {
|
if !config.general.modes.classic && !config.general.modes.secure {
|
||||||
debug!(peer = %peer, "Non-TLS modes disabled");
|
debug!(peer = %real_peer, "Non-TLS modes disabled");
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad();
|
||||||
let (reader, writer) = tokio::io::split(stream);
|
let (reader, writer) = tokio::io::split(stream);
|
||||||
handle_bad_client(reader, writer, &first_bytes, &config).await;
|
handle_bad_client(reader, writer, &first_bytes, &config).await;
|
||||||
|
|
@ -142,7 +163,7 @@ where
|
||||||
let (read_half, write_half) = tokio::io::split(stream);
|
let (read_half, write_half) = tokio::io::split(stream);
|
||||||
|
|
||||||
let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake(
|
let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake(
|
||||||
&handshake, read_half, write_half, peer,
|
&handshake, read_half, write_half, real_peer,
|
||||||
&config, &replay_checker, false,
|
&config, &replay_checker, false,
|
||||||
).await {
|
).await {
|
||||||
HandshakeResult::Success(result) => result,
|
HandshakeResult::Success(result) => result,
|
||||||
|
|
@ -166,7 +187,7 @@ where
|
||||||
rng,
|
rng,
|
||||||
me_pool,
|
me_pool,
|
||||||
local_addr,
|
local_addr,
|
||||||
peer,
|
real_peer,
|
||||||
ip_tracker.clone(),
|
ip_tracker.clone(),
|
||||||
)
|
)
|
||||||
)))
|
)))
|
||||||
|
|
@ -275,6 +296,25 @@ impl RunningClientHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn do_handshake(mut self) -> Result<HandshakeOutcome> {
|
async fn do_handshake(mut self) -> Result<HandshakeOutcome> {
|
||||||
|
if self.config.server.proxy_protocol {
|
||||||
|
match parse_proxy_protocol(&mut self.stream, self.peer).await {
|
||||||
|
Ok(info) => {
|
||||||
|
debug!(
|
||||||
|
peer = %self.peer,
|
||||||
|
client = %info.src_addr,
|
||||||
|
version = info.version,
|
||||||
|
"PROXY protocol header parsed"
|
||||||
|
);
|
||||||
|
self.peer = info.src_addr;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
self.stats.increment_connects_bad();
|
||||||
|
warn!(peer = %self.peer, error = %e, "Invalid PROXY protocol header");
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut first_bytes = [0u8; 5];
|
let mut first_bytes = [0u8; 5];
|
||||||
self.stream.read_exact(&mut first_bytes).await?;
|
self.stream.read_exact(&mut first_bytes).await?;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -122,6 +122,38 @@ pub fn get_local_addr(stream: &TcpStream) -> Option<SocketAddr> {
|
||||||
stream.local_addr().ok()
|
stream.local_addr().ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Resolve primary IP address of a network interface by name.
|
||||||
|
/// Returns the first address matching the requested family (IPv4/IPv6).
|
||||||
|
#[cfg(unix)]
|
||||||
|
pub fn resolve_interface_ip(name: &str, want_ipv6: bool) -> Option<IpAddr> {
|
||||||
|
use nix::ifaddrs::getifaddrs;
|
||||||
|
|
||||||
|
if let Ok(addrs) = getifaddrs() {
|
||||||
|
for iface in addrs {
|
||||||
|
if iface.interface_name == name {
|
||||||
|
if let Some(address) = iface.address {
|
||||||
|
if let Some(v4) = address.as_sockaddr_in() {
|
||||||
|
if !want_ipv6 {
|
||||||
|
return Some(IpAddr::V4(v4.ip()));
|
||||||
|
}
|
||||||
|
} else if let Some(v6) = address.as_sockaddr_in6() {
|
||||||
|
if want_ipv6 {
|
||||||
|
return Some(IpAddr::V6(v6.ip().to_std()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stub for non-Unix platforms: interface name resolution unsupported.
|
||||||
|
#[cfg(not(unix))]
|
||||||
|
pub fn resolve_interface_ip(_name: &str, _want_ipv6: bool) -> Option<IpAddr> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
/// Get peer address of a socket
|
/// Get peer address of a socket
|
||||||
pub fn get_peer_addr(stream: &TcpStream) -> Option<SocketAddr> {
|
pub fn get_peer_addr(stream: &TcpStream) -> Option<SocketAddr> {
|
||||||
stream.peer_addr().ok()
|
stream.peer_addr().ok()
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::net::{SocketAddr, IpAddr};
|
use std::net::{SocketAddr, IpAddr};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
@ -15,7 +16,7 @@ use tracing::{debug, warn, info, trace};
|
||||||
use crate::config::{UpstreamConfig, UpstreamType};
|
use crate::config::{UpstreamConfig, UpstreamType};
|
||||||
use crate::error::{Result, ProxyError};
|
use crate::error::{Result, ProxyError};
|
||||||
use crate::protocol::constants::{TG_DATACENTERS_V4, TG_DATACENTERS_V6, TG_DATACENTER_PORT};
|
use crate::protocol::constants::{TG_DATACENTERS_V4, TG_DATACENTERS_V6, TG_DATACENTER_PORT};
|
||||||
use crate::transport::socket::create_outgoing_socket_bound;
|
use crate::transport::socket::{create_outgoing_socket_bound, resolve_interface_ip};
|
||||||
use crate::transport::socks::{connect_socks4, connect_socks5};
|
use crate::transport::socks::{connect_socks4, connect_socks5};
|
||||||
|
|
||||||
/// Number of Telegram datacenters
|
/// Number of Telegram datacenters
|
||||||
|
|
@ -84,6 +85,8 @@ struct UpstreamState {
|
||||||
dc_latency: [LatencyEma; NUM_DCS],
|
dc_latency: [LatencyEma; NUM_DCS],
|
||||||
/// Per-DC IP version preference (learned from connectivity tests)
|
/// Per-DC IP version preference (learned from connectivity tests)
|
||||||
dc_ip_pref: [IpPreference; NUM_DCS],
|
dc_ip_pref: [IpPreference; NUM_DCS],
|
||||||
|
/// Round-robin counter for bind_addresses selection
|
||||||
|
bind_rr: Arc<AtomicUsize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UpstreamState {
|
impl UpstreamState {
|
||||||
|
|
@ -95,6 +98,7 @@ impl UpstreamState {
|
||||||
last_check: std::time::Instant::now(),
|
last_check: std::time::Instant::now(),
|
||||||
dc_latency: [LatencyEma::new(0.3); NUM_DCS],
|
dc_latency: [LatencyEma::new(0.3); NUM_DCS],
|
||||||
dc_ip_pref: [IpPreference::Unknown; NUM_DCS],
|
dc_ip_pref: [IpPreference::Unknown; NUM_DCS],
|
||||||
|
bind_rr: Arc::new(AtomicUsize::new(0)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -166,6 +170,46 @@ impl UpstreamManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn resolve_bind_address(
|
||||||
|
interface: &Option<String>,
|
||||||
|
bind_addresses: &Option<Vec<String>>,
|
||||||
|
target: SocketAddr,
|
||||||
|
rr: Option<&AtomicUsize>,
|
||||||
|
) -> Option<IpAddr> {
|
||||||
|
let want_ipv6 = target.is_ipv6();
|
||||||
|
|
||||||
|
if let Some(addrs) = bind_addresses {
|
||||||
|
let candidates: Vec<IpAddr> = addrs
|
||||||
|
.iter()
|
||||||
|
.filter_map(|s| s.parse::<IpAddr>().ok())
|
||||||
|
.filter(|ip| ip.is_ipv6() == want_ipv6)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if !candidates.is_empty() {
|
||||||
|
if let Some(counter) = rr {
|
||||||
|
let idx = counter.fetch_add(1, Ordering::Relaxed) % candidates.len();
|
||||||
|
return Some(candidates[idx]);
|
||||||
|
}
|
||||||
|
return candidates.first().copied();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(iface) = interface {
|
||||||
|
if let Ok(ip) = iface.parse::<IpAddr>() {
|
||||||
|
if ip.is_ipv6() == want_ipv6 {
|
||||||
|
return Some(ip);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
#[cfg(unix)]
|
||||||
|
if let Some(ip) = resolve_interface_ip(iface, want_ipv6) {
|
||||||
|
return Some(ip);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
/// Select upstream using latency-weighted random selection.
|
/// Select upstream using latency-weighted random selection.
|
||||||
async fn select_upstream(&self, dc_idx: Option<i16>, scope: Option<&str>) -> Option<usize> {
|
async fn select_upstream(&self, dc_idx: Option<i16>, scope: Option<&str>) -> Option<usize> {
|
||||||
let upstreams = self.upstreams.read().await;
|
let upstreams = self.upstreams.read().await;
|
||||||
|
|
@ -262,7 +306,12 @@ impl UpstreamManager {
|
||||||
|
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
match self.connect_via_upstream(&upstream, target).await {
|
let bind_rr = {
|
||||||
|
let guard = self.upstreams.read().await;
|
||||||
|
guard.get(idx).map(|u| u.bind_rr.clone())
|
||||||
|
};
|
||||||
|
|
||||||
|
match self.connect_via_upstream(&upstream, target, bind_rr).await {
|
||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
let rtt_ms = start.elapsed().as_secs_f64() * 1000.0;
|
let rtt_ms = start.elapsed().as_secs_f64() * 1000.0;
|
||||||
let mut guard = self.upstreams.write().await;
|
let mut guard = self.upstreams.write().await;
|
||||||
|
|
@ -294,13 +343,27 @@ impl UpstreamManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn connect_via_upstream(&self, config: &UpstreamConfig, target: SocketAddr) -> Result<TcpStream> {
|
async fn connect_via_upstream(
|
||||||
|
&self,
|
||||||
|
config: &UpstreamConfig,
|
||||||
|
target: SocketAddr,
|
||||||
|
bind_rr: Option<Arc<AtomicUsize>>,
|
||||||
|
) -> Result<TcpStream> {
|
||||||
match &config.upstream_type {
|
match &config.upstream_type {
|
||||||
UpstreamType::Direct { interface } => {
|
UpstreamType::Direct { interface, bind_addresses } => {
|
||||||
let bind_ip = interface.as_ref()
|
let bind_ip = Self::resolve_bind_address(
|
||||||
.and_then(|s| s.parse::<IpAddr>().ok());
|
interface,
|
||||||
|
bind_addresses,
|
||||||
|
target,
|
||||||
|
bind_rr.as_deref(),
|
||||||
|
);
|
||||||
|
|
||||||
let socket = create_outgoing_socket_bound(target, bind_ip)?;
|
let socket = create_outgoing_socket_bound(target, bind_ip)?;
|
||||||
|
if let Some(ip) = bind_ip {
|
||||||
|
debug!(bind = %ip, target = %target, "Bound outgoing socket");
|
||||||
|
} else if interface.is_some() || bind_addresses.is_some() {
|
||||||
|
debug!(target = %target, "No matching bind address for target family");
|
||||||
|
}
|
||||||
|
|
||||||
socket.set_nonblocking(true)?;
|
socket.set_nonblocking(true)?;
|
||||||
match socket.connect(&target.into()) {
|
match socket.connect(&target.into()) {
|
||||||
|
|
@ -323,8 +386,12 @@ impl UpstreamManager {
|
||||||
let proxy_addr: SocketAddr = address.parse()
|
let proxy_addr: SocketAddr = address.parse()
|
||||||
.map_err(|_| ProxyError::Config("Invalid SOCKS4 address".to_string()))?;
|
.map_err(|_| ProxyError::Config("Invalid SOCKS4 address".to_string()))?;
|
||||||
|
|
||||||
let bind_ip = interface.as_ref()
|
let bind_ip = Self::resolve_bind_address(
|
||||||
.and_then(|s| s.parse::<IpAddr>().ok());
|
interface,
|
||||||
|
&None,
|
||||||
|
proxy_addr,
|
||||||
|
bind_rr.as_deref(),
|
||||||
|
);
|
||||||
|
|
||||||
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
|
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
|
||||||
|
|
||||||
|
|
@ -354,8 +421,12 @@ impl UpstreamManager {
|
||||||
let proxy_addr: SocketAddr = address.parse()
|
let proxy_addr: SocketAddr = address.parse()
|
||||||
.map_err(|_| ProxyError::Config("Invalid SOCKS5 address".to_string()))?;
|
.map_err(|_| ProxyError::Config("Invalid SOCKS5 address".to_string()))?;
|
||||||
|
|
||||||
let bind_ip = interface.as_ref()
|
let bind_ip = Self::resolve_bind_address(
|
||||||
.and_then(|s| s.parse::<IpAddr>().ok());
|
interface,
|
||||||
|
&None,
|
||||||
|
proxy_addr,
|
||||||
|
bind_rr.as_deref(),
|
||||||
|
);
|
||||||
|
|
||||||
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
|
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
|
||||||
|
|
||||||
|
|
@ -398,16 +469,16 @@ impl UpstreamManager {
|
||||||
ipv4_enabled: bool,
|
ipv4_enabled: bool,
|
||||||
ipv6_enabled: bool,
|
ipv6_enabled: bool,
|
||||||
) -> Vec<StartupPingResult> {
|
) -> Vec<StartupPingResult> {
|
||||||
let upstreams: Vec<(usize, UpstreamConfig)> = {
|
let upstreams: Vec<(usize, UpstreamConfig, Arc<AtomicUsize>)> = {
|
||||||
let guard = self.upstreams.read().await;
|
let guard = self.upstreams.read().await;
|
||||||
guard.iter().enumerate()
|
guard.iter().enumerate()
|
||||||
.map(|(i, u)| (i, u.config.clone()))
|
.map(|(i, u)| (i, u.config.clone(), u.bind_rr.clone()))
|
||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut all_results = Vec::new();
|
let mut all_results = Vec::new();
|
||||||
|
|
||||||
for (upstream_idx, upstream_config) in &upstreams {
|
for (upstream_idx, upstream_config, bind_rr) in &upstreams {
|
||||||
let upstream_name = match &upstream_config.upstream_type {
|
let upstream_name = match &upstream_config.upstream_type {
|
||||||
UpstreamType::Direct { interface } => {
|
UpstreamType::Direct { interface } => {
|
||||||
format!("direct{}", interface.as_ref().map(|i| format!(" ({})", i)).unwrap_or_default())
|
format!("direct{}", interface.as_ref().map(|i| format!(" ({})", i)).unwrap_or_default())
|
||||||
|
|
@ -424,7 +495,7 @@ impl UpstreamManager {
|
||||||
|
|
||||||
let result = tokio::time::timeout(
|
let result = tokio::time::timeout(
|
||||||
Duration::from_secs(DC_PING_TIMEOUT_SECS),
|
Duration::from_secs(DC_PING_TIMEOUT_SECS),
|
||||||
self.ping_single_dc(&upstream_config, addr_v6)
|
self.ping_single_dc(&upstream_config, Some(bind_rr.clone()), addr_v6)
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
let ping_result = match result {
|
let ping_result = match result {
|
||||||
|
|
@ -475,7 +546,7 @@ impl UpstreamManager {
|
||||||
|
|
||||||
let result = tokio::time::timeout(
|
let result = tokio::time::timeout(
|
||||||
Duration::from_secs(DC_PING_TIMEOUT_SECS),
|
Duration::from_secs(DC_PING_TIMEOUT_SECS),
|
||||||
self.ping_single_dc(&upstream_config, addr_v4)
|
self.ping_single_dc(&upstream_config, Some(bind_rr.clone()), addr_v4)
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
let ping_result = match result {
|
let ping_result = match result {
|
||||||
|
|
@ -607,9 +678,14 @@ impl UpstreamManager {
|
||||||
all_results
|
all_results
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn ping_single_dc(&self, config: &UpstreamConfig, target: SocketAddr) -> Result<f64> {
|
async fn ping_single_dc(
|
||||||
|
&self,
|
||||||
|
config: &UpstreamConfig,
|
||||||
|
bind_rr: Option<Arc<AtomicUsize>>,
|
||||||
|
target: SocketAddr,
|
||||||
|
) -> Result<f64> {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let _stream = self.connect_via_upstream(config, target).await?;
|
let _stream = self.connect_via_upstream(config, target, bind_rr).await?;
|
||||||
Ok(start.elapsed().as_secs_f64() * 1000.0)
|
Ok(start.elapsed().as_secs_f64() * 1000.0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -649,15 +725,16 @@ impl UpstreamManager {
|
||||||
let count = self.upstreams.read().await.len();
|
let count = self.upstreams.read().await.len();
|
||||||
|
|
||||||
for i in 0..count {
|
for i in 0..count {
|
||||||
let config = {
|
let (config, bind_rr) = {
|
||||||
let guard = self.upstreams.read().await;
|
let guard = self.upstreams.read().await;
|
||||||
guard[i].config.clone()
|
let u = &guard[i];
|
||||||
|
(u.config.clone(), u.bind_rr.clone())
|
||||||
};
|
};
|
||||||
|
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let result = tokio::time::timeout(
|
let result = tokio::time::timeout(
|
||||||
Duration::from_secs(10),
|
Duration::from_secs(10),
|
||||||
self.connect_via_upstream(&config, dc_addr)
|
self.connect_via_upstream(&config, dc_addr, Some(bind_rr.clone()))
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
|
|
@ -686,7 +763,7 @@ impl UpstreamManager {
|
||||||
let start2 = Instant::now();
|
let start2 = Instant::now();
|
||||||
let result2 = tokio::time::timeout(
|
let result2 = tokio::time::timeout(
|
||||||
Duration::from_secs(10),
|
Duration::from_secs(10),
|
||||||
self.connect_via_upstream(&config, fallback_addr)
|
self.connect_via_upstream(&config, fallback_addr, Some(bind_rr.clone()))
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
let mut guard = self.upstreams.write().await;
|
let mut guard = self.upstreams.write().await;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue