Merge pull request #223 from vladon/fix/clippy-warnings

fix: resolve clippy warnings
This commit is contained in:
Alexey 2026-02-24 10:15:47 +03:00 committed by GitHub
commit 2356ae5584
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
38 changed files with 336 additions and 358 deletions

View File

@ -171,15 +171,15 @@ pub(crate) fn default_cache_public_ip_path() -> String {
}
pub(crate) fn default_proxy_secret_reload_secs() -> u64 {
1 * 60 * 60
60 * 60
}
pub(crate) fn default_proxy_config_reload_secs() -> u64 {
1 * 60 * 60
60 * 60
}
pub(crate) fn default_update_every_secs() -> u64 {
1 * 30 * 60
30 * 60
}
pub(crate) fn default_me_reinit_drain_timeout_secs() -> u64 {

View File

@ -278,23 +278,25 @@ impl ProxyConfig {
reuse_allow: false,
});
}
if let Some(ipv6_str) = &config.server.listen_addr_ipv6 {
if let Ok(ipv6) = ipv6_str.parse::<IpAddr>() {
config.server.listeners.push(ListenerConfig {
ip: ipv6,
announce: None,
announce_ip: None,
proxy_protocol: None,
reuse_allow: false,
});
}
if let Some(ipv6_str) = &config.server.listen_addr_ipv6
&& let Ok(ipv6) = ipv6_str.parse::<IpAddr>()
{
config.server.listeners.push(ListenerConfig {
ip: ipv6,
announce: None,
announce_ip: None,
proxy_protocol: None,
reuse_allow: false,
});
}
}
// Migration: announce_ip → announce for each listener.
for listener in &mut config.server.listeners {
if listener.announce.is_none() && listener.announce_ip.is_some() {
listener.announce = Some(listener.announce_ip.unwrap().to_string());
if listener.announce.is_none()
&& let Some(ip) = listener.announce_ip.take()
{
listener.announce = Some(ip.to_string());
}
}

View File

@ -677,9 +677,10 @@ pub struct ListenerConfig {
/// - `show_link = "*"` — show links for all users
/// - `show_link = ["a", "b"]` — show links for specific users
/// - omitted — show no links (default)
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub enum ShowLink {
/// Don't show any links (default when omitted).
#[default]
None,
/// Show links for all configured users.
All,
@ -687,12 +688,6 @@ pub enum ShowLink {
Specific(Vec<String>),
}
impl Default for ShowLink {
fn default() -> Self {
ShowLink::None
}
}
impl ShowLink {
/// Returns true if no links should be shown.
pub fn is_empty(&self) -> bool {

View File

@ -23,13 +23,13 @@ type Aes256Ctr = Ctr128BE<Aes256>;
// ============= AES-256-CTR =============
/// AES-256-CTR encryptor/decryptor
///
///
/// CTR mode is symmetric — encryption and decryption are the same operation.
///
/// **Zeroize note:** The inner `Aes256Ctr` cipher state (expanded key schedule
/// + counter) is opaque and cannot be zeroized. If you need to protect key
/// material, zeroize the `[u8; 32]` key and `u128` IV at the call site
/// before dropping them.
/// + counter) is opaque and cannot be zeroized. If you need to protect key
/// material, zeroize the `[u8; 32]` key and `u128` IV at the call site
/// before dropping them.
pub struct AesCtr {
cipher: Aes256Ctr,
}
@ -149,7 +149,7 @@ impl AesCbc {
///
/// CBC Encryption: C[i] = AES_Encrypt(P[i] XOR C[i-1]), where C[-1] = IV
pub fn encrypt(&self, data: &[u8]) -> Result<Vec<u8>> {
if data.len() % Self::BLOCK_SIZE != 0 {
if !data.len().is_multiple_of(Self::BLOCK_SIZE) {
return Err(ProxyError::Crypto(
format!("CBC data must be aligned to 16 bytes, got {}", data.len())
));
@ -180,7 +180,7 @@ impl AesCbc {
///
/// CBC Decryption: P[i] = AES_Decrypt(C[i]) XOR C[i-1], where C[-1] = IV
pub fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>> {
if data.len() % Self::BLOCK_SIZE != 0 {
if !data.len().is_multiple_of(Self::BLOCK_SIZE) {
return Err(ProxyError::Crypto(
format!("CBC data must be aligned to 16 bytes, got {}", data.len())
));
@ -209,7 +209,7 @@ impl AesCbc {
/// Encrypt data in-place
pub fn encrypt_in_place(&self, data: &mut [u8]) -> Result<()> {
if data.len() % Self::BLOCK_SIZE != 0 {
if !data.len().is_multiple_of(Self::BLOCK_SIZE) {
return Err(ProxyError::Crypto(
format!("CBC data must be aligned to 16 bytes, got {}", data.len())
));
@ -242,7 +242,7 @@ impl AesCbc {
/// Decrypt data in-place
pub fn decrypt_in_place(&self, data: &mut [u8]) -> Result<()> {
if data.len() % Self::BLOCK_SIZE != 0 {
if !data.len().is_multiple_of(Self::BLOCK_SIZE) {
return Err(ProxyError::Crypto(
format!("CBC data must be aligned to 16 bytes, got {}", data.len())
));

View File

@ -64,6 +64,7 @@ pub fn crc32c(data: &[u8]) -> u32 {
///
/// Returned buffer layout (IPv4):
/// nonce_srv | nonce_clt | clt_ts | srv_ip | clt_port | purpose | clt_ip | srv_port | secret | nonce_srv | [clt_v6 | srv_v6] | nonce_clt
#[allow(clippy::too_many_arguments)]
pub fn build_middleproxy_prekey(
nonce_srv: &[u8; 16],
nonce_clt: &[u8; 16],
@ -108,6 +109,7 @@ pub fn build_middleproxy_prekey(
/// Uses MD5 + SHA-1 as mandated by the Telegram Middle Proxy protocol.
/// These algorithms are NOT replaceable here — changing them would break
/// interoperability with Telegram's middle proxy infrastructure.
#[allow(clippy::too_many_arguments)]
pub fn derive_middleproxy_keys(
nonce_srv: &[u8; 16],
nonce_clt: &[u8; 16],

View File

@ -95,7 +95,7 @@ impl SecureRandom {
return 0;
}
let bytes_needed = (k + 7) / 8;
let bytes_needed = k.div_ceil(8);
let bytes = self.bytes(bytes_needed.min(8));
let mut result = 0u64;

View File

@ -91,7 +91,7 @@ impl From<StreamError> for std::io::Error {
std::io::Error::new(std::io::ErrorKind::UnexpectedEof, err)
}
StreamError::Poisoned { .. } => {
std::io::Error::new(std::io::ErrorKind::Other, err)
std::io::Error::other(err)
}
StreamError::BufferOverflow { .. } => {
std::io::Error::new(std::io::ErrorKind::OutOfMemory, err)
@ -100,7 +100,7 @@ impl From<StreamError> for std::io::Error {
std::io::Error::new(std::io::ErrorKind::InvalidData, err)
}
StreamError::PartialRead { .. } | StreamError::PartialWrite { .. } => {
std::io::Error::new(std::io::ErrorKind::Other, err)
std::io::Error::other(err)
}
}
}
@ -135,12 +135,7 @@ impl Recoverable for StreamError {
}
fn can_continue(&self) -> bool {
match self {
Self::Poisoned { .. } => false,
Self::UnexpectedEof => false,
Self::BufferOverflow { .. } => false,
_ => true,
}
!matches!(self, Self::Poisoned { .. } | Self::UnexpectedEof | Self::BufferOverflow { .. })
}
}

View File

@ -301,7 +301,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).await {
Ok(proxy_secret) => {
info!(
secret_len = proxy_secret.len() as usize, // ← ЯВНЫЙ ТИП usize
secret_len = proxy_secret.len(),
key_sig = format_args!(
"0x{:08x}",
if proxy_secret.len() >= 4 {
@ -597,14 +597,12 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
} else {
info!(" IPv4 in use / IPv6 is fallback");
}
} else {
if v6_works && !v4_works {
info!(" IPv6 only / IPv4 unavailable)");
} else if v4_works && !v6_works {
info!(" IPv4 only / IPv6 unavailable)");
} else if !v6_works && !v4_works {
info!(" No DC connectivity");
}
} else if v6_works && !v4_works {
info!(" IPv6 only / IPv4 unavailable)");
} else if v4_works && !v6_works {
info!(" IPv4 only / IPv6 unavailable)");
} else if !v6_works && !v4_works {
info!(" No DC connectivity");
}
info!(" via {}", upstream_result.upstream_name);

View File

@ -95,23 +95,21 @@ pub async fn run_probe(config: &NetworkConfig, stun_addr: Option<String>, nat_pr
}
pub fn decide_network_capabilities(config: &NetworkConfig, probe: &NetworkProbe) -> NetworkDecision {
let mut decision = NetworkDecision::default();
let ipv4_dc = config.ipv4 && probe.detected_ipv4.is_some();
let ipv6_dc = config.ipv6.unwrap_or(probe.detected_ipv6.is_some()) && probe.detected_ipv6.is_some();
decision.ipv4_dc = config.ipv4 && probe.detected_ipv4.is_some();
decision.ipv6_dc = config.ipv6.unwrap_or(probe.detected_ipv6.is_some()) && probe.detected_ipv6.is_some();
decision.ipv4_me = config.ipv4
let ipv4_me = config.ipv4
&& probe.detected_ipv4.is_some()
&& (!probe.ipv4_is_bogon || probe.reflected_ipv4.is_some());
let ipv6_enabled = config.ipv6.unwrap_or(probe.detected_ipv6.is_some());
decision.ipv6_me = ipv6_enabled
let ipv6_me = ipv6_enabled
&& probe.detected_ipv6.is_some()
&& (!probe.ipv6_is_bogon || probe.reflected_ipv6.is_some());
decision.effective_prefer = match config.prefer {
6 if decision.ipv6_me || decision.ipv6_dc => 6,
4 if decision.ipv4_me || decision.ipv4_dc => 4,
let effective_prefer = match config.prefer {
6 if ipv6_me || ipv6_dc => 6,
4 if ipv4_me || ipv4_dc => 4,
6 => {
warn!("prefer=6 requested but IPv6 unavailable; falling back to IPv4");
4
@ -119,10 +117,17 @@ pub fn decide_network_capabilities(config: &NetworkConfig, probe: &NetworkProbe)
_ => 4,
};
let me_families = decision.ipv4_me as u8 + decision.ipv6_me as u8;
decision.effective_multipath = config.multipath && me_families >= 2;
let me_families = ipv4_me as u8 + ipv6_me as u8;
let effective_multipath = config.multipath && me_families >= 2;
decision
NetworkDecision {
ipv4_dc,
ipv6_dc,
ipv4_me,
ipv6_me,
effective_prefer,
effective_multipath,
}
}
fn detect_local_ip_v4() -> Option<Ipv4Addr> {

View File

@ -198,16 +198,11 @@ async fn resolve_stun_addr(stun_addr: &str, family: IpFamily) -> Result<Option<S
});
}
let addrs = lookup_host(stun_addr)
let mut addrs = lookup_host(stun_addr)
.await
.map_err(|e| ProxyError::Proxy(format!("STUN resolve failed: {e}")))?;
let target = addrs
.filter(|a| match (a.is_ipv4(), family) {
(true, IpFamily::V4) => true,
(false, IpFamily::V6) => true,
_ => false,
})
.next();
.find(|a| matches!((a.is_ipv4(), family), (true, IpFamily::V4) | (false, IpFamily::V6)));
Ok(target)
}

View File

@ -160,7 +160,7 @@ pub const MAX_TLS_CHUNK_SIZE: usize = 16384 + 256;
/// Secure Intermediate payload is expected to be 4-byte aligned.
pub fn is_valid_secure_payload_len(data_len: usize) -> bool {
data_len % 4 == 0
data_len.is_multiple_of(4)
}
/// Compute Secure Intermediate payload length from wire length.
@ -179,7 +179,7 @@ pub fn secure_padding_len(data_len: usize, rng: &SecureRandom) -> usize {
is_valid_secure_payload_len(data_len),
"Secure payload must be 4-byte aligned, got {data_len}"
);
(rng.range(3) + 1) as usize
rng.range(3) + 1
}
// ============= Timeouts =============
@ -231,7 +231,6 @@ pub static RESERVED_NONCE_CONTINUES: &[[u8; 4]] = &[
// ============= RPC Constants (for Middle Proxy) =============
/// RPC Proxy Request
/// RPC Flags (from Erlang mtp_rpc.erl)
pub const RPC_FLAG_NOT_ENCRYPTED: u32 = 0x2;
pub const RPC_FLAG_HAS_AD_TAG: u32 = 0x8;

View File

@ -85,7 +85,7 @@ impl FrameMode {
pub fn validate_message_length(len: usize) -> bool {
use super::constants::{MIN_MSG_LEN, MAX_MSG_LEN, PADDING_FILLER};
len >= MIN_MSG_LEN && len <= MAX_MSG_LEN && len % PADDING_FILLER.len() == 0
(MIN_MSG_LEN..=MAX_MSG_LEN).contains(&len) && len.is_multiple_of(PADDING_FILLER.len())
}
#[cfg(test)]

View File

@ -335,7 +335,7 @@ pub fn validate_tls_handshake(
// This is a quirk in some clients that use uptime instead of real time
let is_boot_time = timestamp < 60 * 60 * 24 * 1000; // < ~2.7 years in seconds
if !is_boot_time && (time_diff < TIME_SKEW_MIN || time_diff > TIME_SKEW_MAX) {
if !is_boot_time && !(TIME_SKEW_MIN..=TIME_SKEW_MAX).contains(&time_diff) {
continue;
}
}
@ -393,7 +393,7 @@ pub fn build_server_hello(
) -> Vec<u8> {
const MIN_APP_DATA: usize = 64;
const MAX_APP_DATA: usize = 16640; // RFC 8446 §5.2 upper bound
let fake_cert_len = fake_cert_len.max(MIN_APP_DATA).min(MAX_APP_DATA);
let fake_cert_len = fake_cert_len.clamp(MIN_APP_DATA, MAX_APP_DATA);
let x25519_key = gen_fake_x25519_key(rng);
// Build ServerHello
@ -525,10 +525,10 @@ pub fn extract_sni_from_client_hello(handshake: &[u8]) -> Option<String> {
if sn_pos + name_len > sn_end {
break;
}
if name_type == 0 && name_len > 0 {
if let Ok(host) = std::str::from_utf8(&handshake[sn_pos..sn_pos + name_len]) {
return Some(host.to_string());
}
if name_type == 0 && name_len > 0
&& let Ok(host) = std::str::from_utf8(&handshake[sn_pos..sn_pos + name_len])
{
return Some(host.to_string());
}
sn_pos += name_len;
}
@ -571,7 +571,7 @@ pub fn extract_alpn_from_client_hello(handshake: &[u8]) -> Vec<Vec<u8>> {
let list_len = u16::from_be_bytes([handshake[pos], handshake[pos+1]]) as usize;
let mut lp = pos + 2;
let list_end = (pos + 2).saturating_add(list_len).min(pos + elen);
while lp + 1 <= list_end {
while lp < list_end {
let plen = handshake[lp] as usize;
lp += 1;
if lp + plen > list_end { break; }

View File

@ -594,18 +594,18 @@ impl RunningClientHandler {
peer_addr: SocketAddr,
ip_tracker: &UserIpTracker,
) -> Result<()> {
if let Some(expiration) = config.access.user_expirations.get(user) {
if chrono::Utc::now() > *expiration {
return Err(ProxyError::UserExpired {
user: user.to_string(),
});
}
if let Some(expiration) = config.access.user_expirations.get(user)
&& chrono::Utc::now() > *expiration
{
return Err(ProxyError::UserExpired {
user: user.to_string(),
});
}
// IP limit check
if let Err(reason) = ip_tracker.check_and_add(user, peer_addr.ip()).await {
warn!(
user = %user,
user = %user,
ip = %peer_addr.ip(),
reason = %reason,
"IP limit exceeded"
@ -615,20 +615,20 @@ impl RunningClientHandler {
});
}
if let Some(limit) = config.access.user_max_tcp_conns.get(user) {
if stats.get_user_curr_connects(user) >= *limit as u64 {
return Err(ProxyError::ConnectionLimitExceeded {
user: user.to_string(),
});
}
if let Some(limit) = config.access.user_max_tcp_conns.get(user)
&& stats.get_user_curr_connects(user) >= *limit as u64
{
return Err(ProxyError::ConnectionLimitExceeded {
user: user.to_string(),
});
}
if let Some(quota) = config.access.user_data_quota.get(user) {
if stats.get_user_total_octets(user) >= *quota {
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
}
if let Some(quota) = config.access.user_data_quota.get(user)
&& stats.get_user_total_octets(user) >= *quota
{
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
}
Ok(())

View File

@ -118,10 +118,10 @@ fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> {
// Unknown DC requested by client without override: log and fall back.
if !config.dc_overrides.contains_key(&dc_key) {
warn!(dc_idx = dc_idx, "Requested non-standard DC with no override; falling back to default cluster");
if let Some(path) = &config.general.unknown_dc_log_path {
if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) {
let _ = writeln!(file, "dc_idx={dc_idx}");
}
if let Some(path) = &config.general.unknown_dc_log_path
&& let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path)
{
let _ = writeln!(file, "dc_idx={dc_idx}");
}
}

View File

@ -19,12 +19,12 @@ const MASK_BUFFER_SIZE: usize = 8192;
/// Detect client type based on initial data
fn detect_client_type(data: &[u8]) -> &'static str {
// Check for HTTP request
if data.len() > 4 {
if data.starts_with(b"GET ") || data.starts_with(b"POST") ||
if data.len() > 4
&& (data.starts_with(b"GET ") || data.starts_with(b"POST") ||
data.starts_with(b"HEAD") || data.starts_with(b"PUT ") ||
data.starts_with(b"DELETE") || data.starts_with(b"OPTIONS") {
return "HTTP";
}
data.starts_with(b"DELETE") || data.starts_with(b"OPTIONS"))
{
return "HTTP";
}
// Check for TLS ClientHello (0x16 = handshake, 0x03 0x01-0x03 = TLS version)

View File

@ -393,13 +393,13 @@ where
.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}"))));
// When client closes, but ME channel stopped as unregistered - it isnt error
if client_closed {
if matches!(
if client_closed
&& matches!(
writer_result,
Err(ProxyError::Proxy(ref msg)) if msg == "ME connection lost"
) {
writer_result = Ok(());
}
)
{
writer_result = Ok(());
}
let result = match (main_result, c2me_result, writer_result) {
@ -549,7 +549,7 @@ where
match proto_tag {
ProtoTag::Abridged => {
if data.len() % 4 != 0 {
if !data.len().is_multiple_of(4) {
return Err(ProxyError::Proxy(format!(
"Abridged payload must be 4-byte aligned, got {}",
data.len()
@ -567,7 +567,7 @@ where
frame_buf.push(first);
frame_buf.extend_from_slice(data);
client_writer
.write_all(&frame_buf)
.write_all(frame_buf)
.await
.map_err(ProxyError::Io)?;
} else if len_words < (1 << 24) {
@ -581,7 +581,7 @@ where
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
frame_buf.extend_from_slice(data);
client_writer
.write_all(&frame_buf)
.write_all(frame_buf)
.await
.map_err(ProxyError::Io)?;
} else {
@ -618,7 +618,7 @@ where
rng.fill(&mut frame_buf[start..]);
}
client_writer
.write_all(&frame_buf)
.write_all(frame_buf)
.await
.map_err(ProxyError::Io)?;
}

View File

@ -326,10 +326,10 @@ impl ReplayShard {
// Use key.as_ref() to get &[u8] — avoids Borrow<Q> ambiguity
// between Borrow<[u8]> and Borrow<Box<[u8]>>
if let Some(entry) = self.cache.peek(key.as_ref()) {
if entry.seq == queue_seq {
self.cache.pop(key.as_ref());
}
if let Some(entry) = self.cache.peek(key.as_ref())
&& entry.seq == queue_seq
{
self.cache.pop(key.as_ref());
}
}
}

View File

@ -47,7 +47,7 @@
//! - when upstream is Pending but pending still has room: accept `to_accept` bytes and
//! encrypt+append ciphertext directly into pending (in-place encryption of appended range)
//! Encrypted stream wrappers using AES-CTR
//! Encrypted stream wrappers using AES-CTR
//!
//! This module provides stateful async stream wrappers that handle
//! encryption/decryption with proper partial read/write handling.
@ -153,9 +153,9 @@ impl<R> CryptoReader<R> {
fn take_poison_error(&mut self) -> io::Error {
match &mut self.state {
CryptoReaderState::Poisoned { error } => error.take().unwrap_or_else(|| {
io::Error::new(ErrorKind::Other, "stream previously poisoned")
io::Error::other("stream previously poisoned")
}),
_ => io::Error::new(ErrorKind::Other, "stream not poisoned"),
_ => io::Error::other("stream not poisoned"),
}
}
}
@ -168,6 +168,7 @@ impl<R: AsyncRead + Unpin> AsyncRead for CryptoReader<R> {
) -> Poll<Result<()>> {
let this = self.get_mut();
#[allow(clippy::never_loop)]
loop {
match &mut this.state {
CryptoReaderState::Poisoned { .. } => {
@ -485,14 +486,14 @@ impl<W> CryptoWriter<W> {
fn take_poison_error(&mut self) -> io::Error {
match &mut self.state {
CryptoWriterState::Poisoned { error } => error.take().unwrap_or_else(|| {
io::Error::new(ErrorKind::Other, "stream previously poisoned")
io::Error::other("stream previously poisoned")
}),
_ => io::Error::new(ErrorKind::Other, "stream not poisoned"),
_ => io::Error::other("stream not poisoned"),
}
}
/// Ensure we are in Flushing state and return mutable pending buffer.
fn ensure_pending<'a>(state: &'a mut CryptoWriterState, max_pending: usize) -> &'a mut PendingCiphertext {
fn ensure_pending(state: &mut CryptoWriterState, max_pending: usize) -> &mut PendingCiphertext {
if matches!(state, CryptoWriterState::Idle) {
*state = CryptoWriterState::Flushing {
pending: PendingCiphertext::new(max_pending),

View File

@ -139,7 +139,7 @@ fn encode_abridged(frame: &Frame, dst: &mut BytesMut) -> io::Result<()> {
let data = &frame.data;
// Validate alignment
if data.len() % 4 != 0 {
if !data.len().is_multiple_of(4) {
return Err(Error::new(
ErrorKind::InvalidInput,
format!("abridged frame must be 4-byte aligned, got {} bytes", data.len())

View File

@ -78,7 +78,7 @@ impl<W> AbridgedFrameWriter<W> {
impl<W: AsyncWrite + Unpin> AbridgedFrameWriter<W> {
/// Write a frame
pub async fn write_frame(&mut self, data: &[u8], meta: &FrameMeta) -> Result<()> {
if data.len() % 4 != 0 {
if !data.len().is_multiple_of(4) {
return Err(Error::new(
ErrorKind::InvalidInput,
format!("Abridged frame must be aligned to 4 bytes, got {}", data.len()),
@ -331,7 +331,7 @@ impl<R: AsyncRead + Unpin> MtprotoFrameReader<R> {
}
// Validate length
if len < MIN_MSG_LEN || len > MAX_MSG_LEN || len % PADDING_FILLER.len() != 0 {
if !(MIN_MSG_LEN..=MAX_MSG_LEN).contains(&len) || !len.is_multiple_of(PADDING_FILLER.len()) {
return Err(Error::new(
ErrorKind::InvalidData,
format!("Invalid message length: {}", len),

View File

@ -135,7 +135,7 @@ impl TlsRecordHeader {
}
/// Build header bytes
fn to_bytes(&self) -> [u8; 5] {
fn to_bytes(self) -> [u8; 5] {
[
self.record_type,
self.version[0],
@ -260,9 +260,9 @@ impl<R> FakeTlsReader<R> {
fn take_poison_error(&mut self) -> io::Error {
match &mut self.state {
TlsReaderState::Poisoned { error } => error.take().unwrap_or_else(|| {
io::Error::new(ErrorKind::Other, "stream previously poisoned")
io::Error::other("stream previously poisoned")
}),
_ => io::Error::new(ErrorKind::Other, "stream not poisoned"),
_ => io::Error::other("stream not poisoned"),
}
}
}
@ -297,7 +297,7 @@ impl<R: AsyncRead + Unpin> AsyncRead for FakeTlsReader<R> {
TlsReaderState::Poisoned { error } => {
this.state = TlsReaderState::Poisoned { error: None };
let err = error.unwrap_or_else(|| {
io::Error::new(ErrorKind::Other, "stream previously poisoned")
io::Error::other("stream previously poisoned")
});
return Poll::Ready(Err(err));
}
@ -616,9 +616,9 @@ impl<W> FakeTlsWriter<W> {
fn take_poison_error(&mut self) -> io::Error {
match &mut self.state {
TlsWriterState::Poisoned { error } => error.take().unwrap_or_else(|| {
io::Error::new(ErrorKind::Other, "stream previously poisoned")
io::Error::other("stream previously poisoned")
}),
_ => io::Error::new(ErrorKind::Other, "stream not poisoned"),
_ => io::Error::other("stream not poisoned"),
}
}
@ -682,7 +682,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for FakeTlsWriter<W> {
TlsWriterState::Poisoned { error } => {
this.state = TlsWriterState::Poisoned { error: None };
let err = error.unwrap_or_else(|| {
Error::new(ErrorKind::Other, "stream previously poisoned")
Error::other("stream previously poisoned")
});
return Poll::Ready(Err(err));
}
@ -771,7 +771,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for FakeTlsWriter<W> {
TlsWriterState::Poisoned { error } => {
this.state = TlsWriterState::Poisoned { error: None };
let err = error.unwrap_or_else(|| {
Error::new(ErrorKind::Other, "stream previously poisoned")
Error::other("stream previously poisoned")
});
return Poll::Ready(Err(err));
}

View File

@ -115,32 +115,32 @@ impl TlsFrontCache {
if !name.ends_with(".json") {
continue;
}
if let Ok(data) = tokio::fs::read(entry.path()).await {
if let Ok(mut cached) = serde_json::from_slice::<CachedTlsData>(&data) {
if cached.domain.is_empty()
|| cached.domain.len() > 255
|| !cached.domain.chars().all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-')
{
warn!(file = %name, "Skipping TLS cache entry with invalid domain");
continue;
}
// fetched_at is skipped during deserialization; approximate with file mtime if available.
if let Ok(meta) = entry.metadata().await {
if let Ok(modified) = meta.modified() {
cached.fetched_at = modified;
}
}
// Drop entries older than 72h
if let Ok(age) = cached.fetched_at.elapsed() {
if age > Duration::from_secs(72 * 3600) {
warn!(domain = %cached.domain, "Skipping stale TLS cache entry (>72h)");
continue;
}
}
let domain = cached.domain.clone();
self.set(&domain, cached).await;
loaded += 1;
if let Ok(data) = tokio::fs::read(entry.path()).await
&& let Ok(mut cached) = serde_json::from_slice::<CachedTlsData>(&data)
{
if cached.domain.is_empty()
|| cached.domain.len() > 255
|| !cached.domain.chars().all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-')
{
warn!(file = %name, "Skipping TLS cache entry with invalid domain");
continue;
}
// fetched_at is skipped during deserialization; approximate with file mtime if available.
if let Ok(meta) = entry.metadata().await
&& let Ok(modified) = meta.modified()
{
cached.fetched_at = modified;
}
// Drop entries older than 72h
if let Ok(age) = cached.fetched_at.elapsed()
&& age > Duration::from_secs(72 * 3600)
{
warn!(domain = %cached.domain, "Skipping stale TLS cache entry (>72h)");
continue;
}
let domain = cached.domain.clone();
self.set(&domain, cached).await;
loaded += 1;
}
}
}

View File

@ -12,7 +12,7 @@ fn jitter_and_clamp_sizes(sizes: &[usize], rng: &SecureRandom) -> Vec<usize> {
sizes
.iter()
.map(|&size| {
let base = size.max(MIN_APP_DATA).min(MAX_APP_DATA);
let base = size.clamp(MIN_APP_DATA, MAX_APP_DATA);
let jitter_range = ((base as f64) * 0.03).round() as i64;
if jitter_range == 0 {
return base;
@ -50,7 +50,7 @@ fn ensure_payload_capacity(mut sizes: Vec<usize>, payload_len: usize) -> Vec<usi
while body_total < payload_len {
let remaining = payload_len - body_total;
let chunk = (remaining + 17).min(MAX_APP_DATA).max(MIN_APP_DATA);
let chunk = (remaining + 17).clamp(MIN_APP_DATA, MAX_APP_DATA);
sizes.push(chunk);
body_total += chunk.saturating_sub(17);
}
@ -189,7 +189,7 @@ pub fn build_emulated_server_hello(
.as_ref()
.map(|payload| payload.certificate_message.as_slice())
.filter(|payload| !payload.is_empty())
.or_else(|| compact_payload.as_deref())
.or(compact_payload.as_deref())
} else {
compact_payload.as_deref()
};
@ -223,15 +223,13 @@ pub fn build_emulated_server_hello(
} else {
rec.extend_from_slice(&rng.bytes(size));
}
} else if size > 17 {
let body_len = size - 17;
rec.extend_from_slice(&rng.bytes(body_len));
rec.push(0x16); // inner content type marker (handshake)
rec.extend_from_slice(&rng.bytes(16)); // AEAD-like tag
} else {
if size > 17 {
let body_len = size - 17;
rec.extend_from_slice(&rng.bytes(body_len));
rec.push(0x16); // inner content type marker (handshake)
rec.extend_from_slice(&rng.bytes(16)); // AEAD-like tag
} else {
rec.extend_from_slice(&rng.bytes(size));
}
rec.extend_from_slice(&rng.bytes(size));
}
app_data.extend_from_slice(&rec);
}

View File

@ -384,7 +384,7 @@ async fn fetch_via_raw_tls(
for _ in 0..4 {
match timeout(connect_timeout, read_tls_record(&mut stream)).await {
Ok(Ok(rec)) => records.push(rec),
Ok(Err(e)) => return Err(e.into()),
Ok(Err(e)) => return Err(e),
Err(_) => break,
}
if records.len() >= 3 && records.iter().any(|(t, _)| *t == TLS_RECORD_APPLICATION) {

View File

@ -165,11 +165,10 @@ fn process_pid16() -> u16 {
}
fn process_utime() -> u32 {
let utime = std::time::SystemTime::now()
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as u32;
utime
.as_secs() as u32
}
pub(crate) fn cbc_encrypt_padded(

View File

@ -40,14 +40,16 @@ pub struct ProxyConfigData {
}
fn parse_host_port(s: &str) -> Option<(IpAddr, u16)> {
if let Some(bracket_end) = s.rfind(']') {
if s.starts_with('[') && bracket_end + 1 < s.len() && s.as_bytes().get(bracket_end + 1) == Some(&b':') {
let host = &s[1..bracket_end];
let port_str = &s[bracket_end + 2..];
let ip = host.parse::<IpAddr>().ok()?;
let port = port_str.parse::<u16>().ok()?;
return Some((ip, port));
}
if let Some(bracket_end) = s.rfind(']')
&& s.starts_with('[')
&& bracket_end + 1 < s.len()
&& s.as_bytes().get(bracket_end + 1) == Some(&b':')
{
let host = &s[1..bracket_end];
let port_str = &s[bracket_end + 2..];
let ip = host.parse::<IpAddr>().ok()?;
let port = port_str.parse::<u16>().ok()?;
return Some((ip, port));
}
let idx = s.rfind(':')?;
@ -84,20 +86,18 @@ pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))?
;
if let Some(date) = resp.headers().get(reqwest::header::DATE) {
if let Ok(date_str) = date.to_str() {
if let Ok(server_time) = httpdate::parse_http_date(date_str) {
if let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| {
server_time.duration_since(SystemTime::now()).map_err(|_| e)
}) {
let skew_secs = skew.as_secs();
if skew_secs > 60 {
warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header");
} else if skew_secs > 30 {
warn!(skew_secs, "Time skew >30s detected from fetch_proxy_config Date header");
}
}
}
if let Some(date) = resp.headers().get(reqwest::header::DATE)
&& let Ok(date_str) = date.to_str()
&& let Ok(server_time) = httpdate::parse_http_date(date_str)
&& let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| {
server_time.duration_since(SystemTime::now()).map_err(|_| e)
})
{
let skew_secs = skew.as_secs();
if skew_secs > 60 {
warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header");
} else if skew_secs > 30 {
warn!(skew_secs, "Time skew >30s detected from fetch_proxy_config Date header");
}
}

View File

@ -47,21 +47,21 @@ impl MePool {
pub(crate) async fn connect_tcp(&self, addr: SocketAddr) -> Result<(TcpStream, f64)> {
let start = Instant::now();
let connect_fut = async {
if addr.is_ipv6() {
if let Some(v6) = self.detected_ipv6 {
match TcpSocket::new_v6() {
Ok(sock) => {
if let Err(e) = sock.bind(SocketAddr::new(IpAddr::V6(v6), 0)) {
debug!(error = %e, bind_ip = %v6, "ME IPv6 bind failed, falling back to default bind");
} else {
match sock.connect(addr).await {
Ok(stream) => return Ok(stream),
Err(e) => debug!(error = %e, target = %addr, "ME IPv6 bound connect failed, retrying default connect"),
}
if addr.is_ipv6()
&& let Some(v6) = self.detected_ipv6
{
match TcpSocket::new_v6() {
Ok(sock) => {
if let Err(e) = sock.bind(SocketAddr::new(IpAddr::V6(v6), 0)) {
debug!(error = %e, bind_ip = %v6, "ME IPv6 bind failed, falling back to default bind");
} else {
match sock.connect(addr).await {
Ok(stream) => return Ok(stream),
Err(e) => debug!(error = %e, target = %addr, "ME IPv6 bound connect failed, retrying default connect"),
}
}
Err(e) => debug!(error = %e, "ME IPv6 socket creation failed, falling back to default connect"),
}
Err(e) => debug!(error = %e, "ME IPv6 socket creation failed, falling back to default connect"),
}
}
TcpStream::connect(addr).await

View File

@ -92,10 +92,10 @@ async fn check_family(
let key = (dc, family);
let now = Instant::now();
if let Some(ts) = next_attempt.get(&key) {
if now < *ts {
continue;
}
if let Some(ts) = next_attempt.get(&key)
&& now < *ts
{
continue;
}
let max_concurrent = pool.me_reconnect_max_concurrent_per_dc.max(1) as usize;

View File

@ -498,10 +498,10 @@ impl MePool {
let mut guard = self.proxy_map_v4.write().await;
let keys: Vec<i32> = guard.keys().cloned().collect();
for k in keys.iter().cloned().filter(|k| *k > 0) {
if !guard.contains_key(&-k) {
if let Some(addrs) = guard.get(&k).cloned() {
guard.insert(-k, addrs);
}
if !guard.contains_key(&-k)
&& let Some(addrs) = guard.get(&k).cloned()
{
guard.insert(-k, addrs);
}
}
}
@ -509,10 +509,10 @@ impl MePool {
let mut guard = self.proxy_map_v6.write().await;
let keys: Vec<i32> = guard.keys().cloned().collect();
for k in keys.iter().cloned().filter(|k| *k > 0) {
if !guard.contains_key(&-k) {
if let Some(addrs) = guard.get(&k).cloned() {
guard.insert(-k, addrs);
}
if !guard.contains_key(&-k)
&& let Some(addrs) = guard.get(&k).cloned()
{
guard.insert(-k, addrs);
}
}
}
@ -760,13 +760,12 @@ impl MePool {
cancel_reader_token.clone(),
)
.await;
if let Some(pool) = pool.upgrade() {
if cleanup_for_reader
if let Some(pool) = pool.upgrade()
&& cleanup_for_reader
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
}
{
pool.remove_writer_and_close_clients(writer_id).await;
}
if let Err(e) = res {
warn!(error = %e, "ME reader ended");
@ -834,13 +833,12 @@ impl MePool {
stats_ping.increment_me_keepalive_failed();
debug!("ME ping failed, removing dead writer");
cancel_ping.cancel();
if let Some(pool) = pool_ping.upgrade() {
if cleanup_for_ping
if let Some(pool) = pool_ping.upgrade()
&& cleanup_for_ping
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
}
{
pool.remove_writer_and_close_clients(writer_id).await;
}
break;
}
@ -943,24 +941,20 @@ impl MePool {
let pool = Arc::downgrade(self);
tokio::spawn(async move {
let deadline = timeout.map(|t| Instant::now() + t);
loop {
if let Some(p) = pool.upgrade() {
if let Some(deadline_at) = deadline {
if Instant::now() >= deadline_at {
warn!(writer_id, "Drain timeout, force-closing");
p.stats.increment_pool_force_close_total();
let _ = p.remove_writer_and_close_clients(writer_id).await;
break;
}
}
if p.registry.is_writer_empty(writer_id).await {
let _ = p.remove_writer_only(writer_id).await;
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
} else {
while let Some(p) = pool.upgrade() {
if let Some(deadline_at) = deadline
&& Instant::now() >= deadline_at
{
warn!(writer_id, "Drain timeout, force-closing");
p.stats.increment_pool_force_close_total();
let _ = p.remove_writer_and_close_clients(writer_id).await;
break;
}
if p.registry.is_writer_empty(writer_id).await {
let _ = p.remove_writer_only(writer_id).await;
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}

View File

@ -25,7 +25,7 @@ impl MePool {
pub(super) fn translate_ip_for_nat(&self, ip: IpAddr) -> IpAddr {
let nat_ip = self
.nat_ip_cfg
.or_else(|| self.nat_ip_detected.try_read().ok().and_then(|g| (*g).clone()));
.or_else(|| self.nat_ip_detected.try_read().ok().and_then(|g| *g));
let Some(nat_ip) = nat_ip else {
return ip;
@ -75,7 +75,7 @@ impl MePool {
return None;
}
if let Some(ip) = self.nat_ip_detected.read().await.clone() {
if let Some(ip) = *self.nat_ip_detected.read().await {
return Some(ip);
}
@ -102,17 +102,17 @@ impl MePool {
) -> Option<std::net::SocketAddr> {
const STUN_CACHE_TTL: Duration = Duration::from_secs(600);
// Backoff window
if let Some(until) = *self.stun_backoff_until.read().await {
if Instant::now() < until {
if let Ok(cache) = self.nat_reflection_cache.try_lock() {
let slot = match family {
IpFamily::V4 => cache.v4,
IpFamily::V6 => cache.v6,
};
return slot.map(|(_, addr)| addr);
}
return None;
if let Some(until) = *self.stun_backoff_until.read().await
&& Instant::now() < until
{
if let Ok(cache) = self.nat_reflection_cache.try_lock() {
let slot = match family {
IpFamily::V4 => cache.v4,
IpFamily::V6 => cache.v6,
};
return slot.map(|(_, addr)| addr);
}
return None;
}
if let Ok(mut cache) = self.nat_reflection_cache.try_lock() {
@ -120,10 +120,10 @@ impl MePool {
IpFamily::V4 => &mut cache.v4,
IpFamily::V6 => &mut cache.v6,
};
if let Some((ts, addr)) = slot {
if ts.elapsed() < STUN_CACHE_TTL {
return Some(*addr);
}
if let Some((ts, addr)) = slot
&& ts.elapsed() < STUN_CACHE_TTL
{
return Some(*addr);
}
}

View File

@ -63,20 +63,18 @@ pub async fn download_proxy_secret() -> Result<Vec<u8>> {
)));
}
if let Some(date) = resp.headers().get(reqwest::header::DATE) {
if let Ok(date_str) = date.to_str() {
if let Ok(server_time) = httpdate::parse_http_date(date_str) {
if let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| {
server_time.duration_since(SystemTime::now()).map_err(|_| e)
}) {
let skew_secs = skew.as_secs();
if skew_secs > 60 {
warn!(skew_secs, "Time skew >60s detected from proxy-secret Date header");
} else if skew_secs > 30 {
warn!(skew_secs, "Time skew >30s detected from proxy-secret Date header");
}
}
}
if let Some(date) = resp.headers().get(reqwest::header::DATE)
&& let Ok(date_str) = date.to_str()
&& let Ok(server_time) = httpdate::parse_http_date(date_str)
&& let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| {
server_time.duration_since(SystemTime::now()).map_err(|_| e)
})
{
let skew_secs = skew.as_secs();
if skew_secs > 60 {
warn!(skew_secs, "Time skew >60s detected from proxy-secret Date header");
} else if skew_secs > 30 {
warn!(skew_secs, "Time skew >30s detected from proxy-secret Date header");
}
}

View File

@ -242,10 +242,10 @@ impl MePool {
}
if preferred.is_empty() {
let def = self.default_dc.load(Ordering::Relaxed);
if def != 0 {
if let Some(v) = map_guard.get(&def) {
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
}
if def != 0
&& let Some(v) = map_guard.get(&def)
{
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
}
}
@ -267,7 +267,7 @@ impl MePool {
if !self.writer_accepts_new_binding(w) {
continue;
}
if preferred.iter().any(|p| *p == w.addr) {
if preferred.contains(&w.addr) {
out.push(idx);
}
}

View File

@ -136,17 +136,17 @@ pub fn resolve_interface_ip(name: &str, want_ipv6: bool) -> Option<IpAddr> {
if let Ok(addrs) = getifaddrs() {
for iface in addrs {
if iface.interface_name == name {
if let Some(address) = iface.address {
if let Some(v4) = address.as_sockaddr_in() {
if !want_ipv6 {
return Some(IpAddr::V4(v4.ip()));
}
} else if let Some(v6) = address.as_sockaddr_in6() {
if want_ipv6 {
return Some(IpAddr::V6(v6.ip().clone()));
}
if iface.interface_name == name
&& let Some(address) = iface.address
{
if let Some(v4) = address.as_sockaddr_in() {
if !want_ipv6 {
return Some(IpAddr::V4(v4.ip()));
}
} else if let Some(v6) = address.as_sockaddr_in6()
&& want_ipv6
{
return Some(IpAddr::V6(v6.ip()));
}
}
}

View File

@ -27,11 +27,11 @@ pub async fn connect_socks4(
buf.extend_from_slice(user);
buf.push(0); // NULL
stream.write_all(&buf).await.map_err(|e| ProxyError::Io(e))?;
stream.write_all(&buf).await.map_err(ProxyError::Io)?;
// Response: VN (1) | CD (1) | DSTPORT (2) | DSTIP (4)
let mut resp = [0u8; 8];
stream.read_exact(&mut resp).await.map_err(|e| ProxyError::Io(e))?;
stream.read_exact(&mut resp).await.map_err(ProxyError::Io)?;
if resp[1] != 90 {
return Err(ProxyError::Proxy(format!("SOCKS4 request rejected: code {}", resp[1])));
@ -56,10 +56,10 @@ pub async fn connect_socks5(
let mut buf = vec![5u8, methods.len() as u8];
buf.extend_from_slice(&methods);
stream.write_all(&buf).await.map_err(|e| ProxyError::Io(e))?;
stream.write_all(&buf).await.map_err(ProxyError::Io)?;
let mut resp = [0u8; 2];
stream.read_exact(&mut resp).await.map_err(|e| ProxyError::Io(e))?;
stream.read_exact(&mut resp).await.map_err(ProxyError::Io)?;
if resp[0] != 5 {
return Err(ProxyError::Proxy("Invalid SOCKS5 version".to_string()));
@ -80,10 +80,10 @@ pub async fn connect_socks5(
auth_buf.push(p_bytes.len() as u8);
auth_buf.extend_from_slice(p_bytes);
stream.write_all(&auth_buf).await.map_err(|e| ProxyError::Io(e))?;
stream.write_all(&auth_buf).await.map_err(ProxyError::Io)?;
let mut auth_resp = [0u8; 2];
stream.read_exact(&mut auth_resp).await.map_err(|e| ProxyError::Io(e))?;
stream.read_exact(&mut auth_resp).await.map_err(ProxyError::Io)?;
if auth_resp[1] != 0 {
return Err(ProxyError::Proxy("SOCKS5 authentication failed".to_string()));
@ -112,11 +112,11 @@ pub async fn connect_socks5(
req.extend_from_slice(&target.port().to_be_bytes());
stream.write_all(&req).await.map_err(|e| ProxyError::Io(e))?;
stream.write_all(&req).await.map_err(ProxyError::Io)?;
// Response
let mut head = [0u8; 4];
stream.read_exact(&mut head).await.map_err(|e| ProxyError::Io(e))?;
stream.read_exact(&mut head).await.map_err(ProxyError::Io)?;
if head[1] != 0 {
return Err(ProxyError::Proxy(format!("SOCKS5 request failed: code {}", head[1])));
@ -126,17 +126,17 @@ pub async fn connect_socks5(
match head[3] {
1 => { // IPv4
let mut addr = [0u8; 4 + 2];
stream.read_exact(&mut addr).await.map_err(|e| ProxyError::Io(e))?;
stream.read_exact(&mut addr).await.map_err(ProxyError::Io)?;
},
3 => { // Domain
let mut len = [0u8; 1];
stream.read_exact(&mut len).await.map_err(|e| ProxyError::Io(e))?;
stream.read_exact(&mut len).await.map_err(ProxyError::Io)?;
let mut addr = vec![0u8; len[0] as usize + 2];
stream.read_exact(&mut addr).await.map_err(|e| ProxyError::Io(e))?;
stream.read_exact(&mut addr).await.map_err(ProxyError::Io)?;
},
4 => { // IPv6
let mut addr = [0u8; 16 + 2];
stream.read_exact(&mut addr).await.map_err(|e| ProxyError::Io(e))?;
stream.read_exact(&mut addr).await.map_err(ProxyError::Io)?;
},
_ => return Err(ProxyError::Proxy("Invalid address type in SOCKS5 response".to_string())),
}

View File

@ -57,9 +57,10 @@ impl LatencyEma {
// ============= Per-DC IP Preference Tracking =============
/// Tracks which IP version works for each DC
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum IpPreference {
/// Not yet tested
#[default]
Unknown,
/// IPv6 works
PreferV6,
@ -71,12 +72,6 @@ pub enum IpPreference {
Unavailable,
}
impl Default for IpPreference {
fn default() -> Self {
Self::Unknown
}
}
// ============= Upstream State =============
#[derive(Debug)]
@ -112,7 +107,7 @@ impl UpstreamState {
if abs_dc == 0 {
return None;
}
if abs_dc >= 1 && abs_dc <= NUM_DCS {
if (1..=NUM_DCS).contains(&abs_dc) {
Some(abs_dc - 1)
} else {
// Unknown DC → default cluster (DC 2, index 1)
@ -122,10 +117,10 @@ impl UpstreamState {
/// Get latency for a specific DC, falling back to average across all known DCs
fn effective_latency(&self, dc_idx: Option<i16>) -> Option<f64> {
if let Some(di) = dc_idx.and_then(Self::dc_array_idx) {
if let Some(ms) = self.dc_latency[di].get() {
return Some(ms);
}
if let Some(di) = dc_idx.and_then(Self::dc_array_idx)
&& let Some(ms) = self.dc_latency[di].get()
{
return Some(ms);
}
let (sum, count) = self.dc_latency.iter()
@ -582,7 +577,7 @@ impl UpstreamManager {
let result = tokio::time::timeout(
Duration::from_secs(DC_PING_TIMEOUT_SECS),
self.ping_single_dc(&upstream_config, Some(bind_rr.clone()), addr_v6)
self.ping_single_dc(upstream_config, Some(bind_rr.clone()), addr_v6)
).await;
let ping_result = match result {
@ -633,7 +628,7 @@ impl UpstreamManager {
let result = tokio::time::timeout(
Duration::from_secs(DC_PING_TIMEOUT_SECS),
self.ping_single_dc(&upstream_config, Some(bind_rr.clone()), addr_v4)
self.ping_single_dc(upstream_config, Some(bind_rr.clone()), addr_v4)
).await;
let ping_result = match result {
@ -696,7 +691,7 @@ impl UpstreamManager {
}
let result = tokio::time::timeout(
Duration::from_secs(DC_PING_TIMEOUT_SECS),
self.ping_single_dc(&upstream_config, Some(bind_rr.clone()), addr)
self.ping_single_dc(upstream_config, Some(bind_rr.clone()), addr)
).await;
let ping_result = match result {

View File

@ -67,54 +67,56 @@ pub async fn detect_ip() -> IpInfo {
// Try to get local interface IP first (default gateway interface)
// We connect to Google DNS to find out which interface is used for routing
if let Some(ip) = get_local_ip("8.8.8.8:80") {
if ip.is_ipv4() && !ip.is_loopback() {
info.ipv4 = Some(ip);
debug!(ip = %ip, "Detected local IPv4 address via routing");
}
if let Some(ip) = get_local_ip("8.8.8.8:80")
&& ip.is_ipv4()
&& !ip.is_loopback()
{
info.ipv4 = Some(ip);
debug!(ip = %ip, "Detected local IPv4 address via routing");
}
if let Some(ip) = get_local_ipv6("[2001:4860:4860::8888]:80") {
if ip.is_ipv6() && !ip.is_loopback() {
info.ipv6 = Some(ip);
debug!(ip = %ip, "Detected local IPv6 address via routing");
}
if let Some(ip) = get_local_ipv6("[2001:4860:4860::8888]:80")
&& ip.is_ipv6()
&& !ip.is_loopback()
{
info.ipv6 = Some(ip);
debug!(ip = %ip, "Detected local IPv6 address via routing");
}
// If local detection failed or returned private IP (and we want public),
// If local detection failed or returned private IP (and we want public),
// or just as a fallback/verification, we might want to check external services.
// However, the requirement is: "if IP for listening is not set... it should be IP from interface...
// However, the requirement is: "if IP for listening is not set... it should be IP from interface...
// if impossible - request external resources".
// So if we found a local IP, we might be good. But often servers are behind NAT.
// If the local IP is private, we probably want the public IP for the tg:// link.
// Let's check if the detected IPs are private.
let need_external_v4 = info.ipv4.map_or(true, |ip| is_private_ip(ip));
let need_external_v6 = info.ipv6.map_or(true, |ip| is_private_ip(ip));
let need_external_v4 = info.ipv4.is_none_or(is_private_ip);
let need_external_v6 = info.ipv6.is_none_or(is_private_ip);
if need_external_v4 {
debug!("Local IPv4 is private or missing, checking external services...");
for url in IPV4_URLS {
if let Some(ip) = fetch_ip(url).await {
if ip.is_ipv4() {
info.ipv4 = Some(ip);
debug!(ip = %ip, "Detected public IPv4 address");
break;
}
if let Some(ip) = fetch_ip(url).await
&& ip.is_ipv4()
{
info.ipv4 = Some(ip);
debug!(ip = %ip, "Detected public IPv4 address");
break;
}
}
}
if need_external_v6 {
debug!("Local IPv6 is private or missing, checking external services...");
for url in IPV6_URLS {
if let Some(ip) = fetch_ip(url).await {
if ip.is_ipv6() {
info.ipv6 = Some(ip);
debug!(ip = %ip, "Detected public IPv6 address");
break;
}
if let Some(ip) = fetch_ip(url).await
&& ip.is_ipv6()
{
info.ipv6 = Some(ip);
debug!(ip = %ip, "Detected public IPv6 address");
break;
}
}
}

View File

@ -67,15 +67,15 @@ pub async fn check_time_sync() -> Option<TimeSyncResult> {
#[allow(dead_code)]
pub async fn time_sync_task(check_interval: Duration) -> ! {
loop {
if let Some(result) = check_time_sync().await {
if result.is_skewed {
error!(
"System clock is off by {} seconds. Please sync your clock.",
result.skew_secs
);
}
if let Some(result) = check_time_sync().await
&& result.is_skewed
{
error!(
"System clock is off by {} seconds. Please sync your clock.",
result.skew_secs
);
}
tokio::time::sleep(check_interval).await;
}
}