diff --git a/src/config/defaults.rs b/src/config/defaults.rs index a0443fc..b9b0da1 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -171,15 +171,15 @@ pub(crate) fn default_cache_public_ip_path() -> String { } pub(crate) fn default_proxy_secret_reload_secs() -> u64 { - 1 * 60 * 60 + 60 * 60 } pub(crate) fn default_proxy_config_reload_secs() -> u64 { - 1 * 60 * 60 + 60 * 60 } pub(crate) fn default_update_every_secs() -> u64 { - 1 * 30 * 60 + 30 * 60 } pub(crate) fn default_me_reinit_drain_timeout_secs() -> u64 { diff --git a/src/config/load.rs b/src/config/load.rs index dce5fbc..750e0dc 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -278,23 +278,25 @@ impl ProxyConfig { reuse_allow: false, }); } - if let Some(ipv6_str) = &config.server.listen_addr_ipv6 { - if let Ok(ipv6) = ipv6_str.parse::() { - config.server.listeners.push(ListenerConfig { - ip: ipv6, - announce: None, - announce_ip: None, - proxy_protocol: None, - reuse_allow: false, - }); - } + if let Some(ipv6_str) = &config.server.listen_addr_ipv6 + && let Ok(ipv6) = ipv6_str.parse::() + { + config.server.listeners.push(ListenerConfig { + ip: ipv6, + announce: None, + announce_ip: None, + proxy_protocol: None, + reuse_allow: false, + }); } } // Migration: announce_ip → announce for each listener. for listener in &mut config.server.listeners { - if listener.announce.is_none() && listener.announce_ip.is_some() { - listener.announce = Some(listener.announce_ip.unwrap().to_string()); + if listener.announce.is_none() + && let Some(ip) = listener.announce_ip.take() + { + listener.announce = Some(ip.to_string()); } } diff --git a/src/config/types.rs b/src/config/types.rs index 8d573df..c9ceea4 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -677,9 +677,10 @@ pub struct ListenerConfig { /// - `show_link = "*"` — show links for all users /// - `show_link = ["a", "b"]` — show links for specific users /// - omitted — show no links (default) -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub enum ShowLink { /// Don't show any links (default when omitted). + #[default] None, /// Show links for all configured users. All, @@ -687,12 +688,6 @@ pub enum ShowLink { Specific(Vec), } -impl Default for ShowLink { - fn default() -> Self { - ShowLink::None - } -} - impl ShowLink { /// Returns true if no links should be shown. pub fn is_empty(&self) -> bool { diff --git a/src/crypto/aes.rs b/src/crypto/aes.rs index 674e4cb..deda730 100644 --- a/src/crypto/aes.rs +++ b/src/crypto/aes.rs @@ -23,13 +23,13 @@ type Aes256Ctr = Ctr128BE; // ============= AES-256-CTR ============= /// AES-256-CTR encryptor/decryptor -/// +/// /// CTR mode is symmetric — encryption and decryption are the same operation. /// /// **Zeroize note:** The inner `Aes256Ctr` cipher state (expanded key schedule -/// + counter) is opaque and cannot be zeroized. If you need to protect key -/// material, zeroize the `[u8; 32]` key and `u128` IV at the call site -/// before dropping them. +/// + counter) is opaque and cannot be zeroized. If you need to protect key +/// material, zeroize the `[u8; 32]` key and `u128` IV at the call site +/// before dropping them. pub struct AesCtr { cipher: Aes256Ctr, } @@ -149,7 +149,7 @@ impl AesCbc { /// /// CBC Encryption: C[i] = AES_Encrypt(P[i] XOR C[i-1]), where C[-1] = IV pub fn encrypt(&self, data: &[u8]) -> Result> { - if data.len() % Self::BLOCK_SIZE != 0 { + if !data.len().is_multiple_of(Self::BLOCK_SIZE) { return Err(ProxyError::Crypto( format!("CBC data must be aligned to 16 bytes, got {}", data.len()) )); @@ -180,7 +180,7 @@ impl AesCbc { /// /// CBC Decryption: P[i] = AES_Decrypt(C[i]) XOR C[i-1], where C[-1] = IV pub fn decrypt(&self, data: &[u8]) -> Result> { - if data.len() % Self::BLOCK_SIZE != 0 { + if !data.len().is_multiple_of(Self::BLOCK_SIZE) { return Err(ProxyError::Crypto( format!("CBC data must be aligned to 16 bytes, got {}", data.len()) )); @@ -209,7 +209,7 @@ impl AesCbc { /// Encrypt data in-place pub fn encrypt_in_place(&self, data: &mut [u8]) -> Result<()> { - if data.len() % Self::BLOCK_SIZE != 0 { + if !data.len().is_multiple_of(Self::BLOCK_SIZE) { return Err(ProxyError::Crypto( format!("CBC data must be aligned to 16 bytes, got {}", data.len()) )); @@ -242,7 +242,7 @@ impl AesCbc { /// Decrypt data in-place pub fn decrypt_in_place(&self, data: &mut [u8]) -> Result<()> { - if data.len() % Self::BLOCK_SIZE != 0 { + if !data.len().is_multiple_of(Self::BLOCK_SIZE) { return Err(ProxyError::Crypto( format!("CBC data must be aligned to 16 bytes, got {}", data.len()) )); diff --git a/src/crypto/hash.rs b/src/crypto/hash.rs index d3f6f55..fa3e441 100644 --- a/src/crypto/hash.rs +++ b/src/crypto/hash.rs @@ -64,6 +64,7 @@ pub fn crc32c(data: &[u8]) -> u32 { /// /// Returned buffer layout (IPv4): /// nonce_srv | nonce_clt | clt_ts | srv_ip | clt_port | purpose | clt_ip | srv_port | secret | nonce_srv | [clt_v6 | srv_v6] | nonce_clt +#[allow(clippy::too_many_arguments)] pub fn build_middleproxy_prekey( nonce_srv: &[u8; 16], nonce_clt: &[u8; 16], @@ -108,6 +109,7 @@ pub fn build_middleproxy_prekey( /// Uses MD5 + SHA-1 as mandated by the Telegram Middle Proxy protocol. /// These algorithms are NOT replaceable here — changing them would break /// interoperability with Telegram's middle proxy infrastructure. +#[allow(clippy::too_many_arguments)] pub fn derive_middleproxy_keys( nonce_srv: &[u8; 16], nonce_clt: &[u8; 16], diff --git a/src/crypto/random.rs b/src/crypto/random.rs index 0dd5f1a..6313610 100644 --- a/src/crypto/random.rs +++ b/src/crypto/random.rs @@ -95,7 +95,7 @@ impl SecureRandom { return 0; } - let bytes_needed = (k + 7) / 8; + let bytes_needed = k.div_ceil(8); let bytes = self.bytes(bytes_needed.min(8)); let mut result = 0u64; diff --git a/src/error.rs b/src/error.rs index eaebd88..e4d66b9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -91,7 +91,7 @@ impl From for std::io::Error { std::io::Error::new(std::io::ErrorKind::UnexpectedEof, err) } StreamError::Poisoned { .. } => { - std::io::Error::new(std::io::ErrorKind::Other, err) + std::io::Error::other(err) } StreamError::BufferOverflow { .. } => { std::io::Error::new(std::io::ErrorKind::OutOfMemory, err) @@ -100,7 +100,7 @@ impl From for std::io::Error { std::io::Error::new(std::io::ErrorKind::InvalidData, err) } StreamError::PartialRead { .. } | StreamError::PartialWrite { .. } => { - std::io::Error::new(std::io::ErrorKind::Other, err) + std::io::Error::other(err) } } } @@ -135,12 +135,7 @@ impl Recoverable for StreamError { } fn can_continue(&self) -> bool { - match self { - Self::Poisoned { .. } => false, - Self::UnexpectedEof => false, - Self::BufferOverflow { .. } => false, - _ => true, - } + !matches!(self, Self::Poisoned { .. } | Self::UnexpectedEof | Self::BufferOverflow { .. }) } } diff --git a/src/main.rs b/src/main.rs index 0d1eccc..7264239 100644 --- a/src/main.rs +++ b/src/main.rs @@ -301,7 +301,7 @@ async fn main() -> std::result::Result<(), Box> { match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).await { Ok(proxy_secret) => { info!( - secret_len = proxy_secret.len() as usize, // ← ЯВНЫЙ ТИП usize + secret_len = proxy_secret.len(), key_sig = format_args!( "0x{:08x}", if proxy_secret.len() >= 4 { @@ -597,14 +597,12 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai } else { info!(" IPv4 in use / IPv6 is fallback"); } - } else { - if v6_works && !v4_works { - info!(" IPv6 only / IPv4 unavailable)"); - } else if v4_works && !v6_works { - info!(" IPv4 only / IPv6 unavailable)"); - } else if !v6_works && !v4_works { - info!(" No DC connectivity"); - } + } else if v6_works && !v4_works { + info!(" IPv6 only / IPv4 unavailable)"); + } else if v4_works && !v6_works { + info!(" IPv4 only / IPv6 unavailable)"); + } else if !v6_works && !v4_works { + info!(" No DC connectivity"); } info!(" via {}", upstream_result.upstream_name); diff --git a/src/network/probe.rs b/src/network/probe.rs index eda69b8..c52b340 100644 --- a/src/network/probe.rs +++ b/src/network/probe.rs @@ -95,23 +95,21 @@ pub async fn run_probe(config: &NetworkConfig, stun_addr: Option, nat_pr } pub fn decide_network_capabilities(config: &NetworkConfig, probe: &NetworkProbe) -> NetworkDecision { - let mut decision = NetworkDecision::default(); + let ipv4_dc = config.ipv4 && probe.detected_ipv4.is_some(); + let ipv6_dc = config.ipv6.unwrap_or(probe.detected_ipv6.is_some()) && probe.detected_ipv6.is_some(); - decision.ipv4_dc = config.ipv4 && probe.detected_ipv4.is_some(); - decision.ipv6_dc = config.ipv6.unwrap_or(probe.detected_ipv6.is_some()) && probe.detected_ipv6.is_some(); - - decision.ipv4_me = config.ipv4 + let ipv4_me = config.ipv4 && probe.detected_ipv4.is_some() && (!probe.ipv4_is_bogon || probe.reflected_ipv4.is_some()); let ipv6_enabled = config.ipv6.unwrap_or(probe.detected_ipv6.is_some()); - decision.ipv6_me = ipv6_enabled + let ipv6_me = ipv6_enabled && probe.detected_ipv6.is_some() && (!probe.ipv6_is_bogon || probe.reflected_ipv6.is_some()); - decision.effective_prefer = match config.prefer { - 6 if decision.ipv6_me || decision.ipv6_dc => 6, - 4 if decision.ipv4_me || decision.ipv4_dc => 4, + let effective_prefer = match config.prefer { + 6 if ipv6_me || ipv6_dc => 6, + 4 if ipv4_me || ipv4_dc => 4, 6 => { warn!("prefer=6 requested but IPv6 unavailable; falling back to IPv4"); 4 @@ -119,10 +117,17 @@ pub fn decide_network_capabilities(config: &NetworkConfig, probe: &NetworkProbe) _ => 4, }; - let me_families = decision.ipv4_me as u8 + decision.ipv6_me as u8; - decision.effective_multipath = config.multipath && me_families >= 2; + let me_families = ipv4_me as u8 + ipv6_me as u8; + let effective_multipath = config.multipath && me_families >= 2; - decision + NetworkDecision { + ipv4_dc, + ipv6_dc, + ipv4_me, + ipv6_me, + effective_prefer, + effective_multipath, + } } fn detect_local_ip_v4() -> Option { diff --git a/src/network/stun.rs b/src/network/stun.rs index c47aa49..5bda495 100644 --- a/src/network/stun.rs +++ b/src/network/stun.rs @@ -198,16 +198,11 @@ async fn resolve_stun_addr(stun_addr: &str, family: IpFamily) -> Result true, - (false, IpFamily::V6) => true, - _ => false, - }) - .next(); + .find(|a| matches!((a.is_ipv4(), family), (true, IpFamily::V4) | (false, IpFamily::V6))); Ok(target) } diff --git a/src/protocol/constants.rs b/src/protocol/constants.rs index e6ddbaf..9e79206 100644 --- a/src/protocol/constants.rs +++ b/src/protocol/constants.rs @@ -160,7 +160,7 @@ pub const MAX_TLS_CHUNK_SIZE: usize = 16384 + 256; /// Secure Intermediate payload is expected to be 4-byte aligned. pub fn is_valid_secure_payload_len(data_len: usize) -> bool { - data_len % 4 == 0 + data_len.is_multiple_of(4) } /// Compute Secure Intermediate payload length from wire length. @@ -179,7 +179,7 @@ pub fn secure_padding_len(data_len: usize, rng: &SecureRandom) -> usize { is_valid_secure_payload_len(data_len), "Secure payload must be 4-byte aligned, got {data_len}" ); - (rng.range(3) + 1) as usize + rng.range(3) + 1 } // ============= Timeouts ============= @@ -231,7 +231,6 @@ pub static RESERVED_NONCE_CONTINUES: &[[u8; 4]] = &[ // ============= RPC Constants (for Middle Proxy) ============= /// RPC Proxy Request - /// RPC Flags (from Erlang mtp_rpc.erl) pub const RPC_FLAG_NOT_ENCRYPTED: u32 = 0x2; pub const RPC_FLAG_HAS_AD_TAG: u32 = 0x8; diff --git a/src/protocol/frame.rs b/src/protocol/frame.rs index a332be0..dd59ba9 100644 --- a/src/protocol/frame.rs +++ b/src/protocol/frame.rs @@ -85,7 +85,7 @@ impl FrameMode { pub fn validate_message_length(len: usize) -> bool { use super::constants::{MIN_MSG_LEN, MAX_MSG_LEN, PADDING_FILLER}; - len >= MIN_MSG_LEN && len <= MAX_MSG_LEN && len % PADDING_FILLER.len() == 0 + (MIN_MSG_LEN..=MAX_MSG_LEN).contains(&len) && len.is_multiple_of(PADDING_FILLER.len()) } #[cfg(test)] diff --git a/src/protocol/tls.rs b/src/protocol/tls.rs index 091092a..fbe7ad5 100644 --- a/src/protocol/tls.rs +++ b/src/protocol/tls.rs @@ -335,7 +335,7 @@ pub fn validate_tls_handshake( // This is a quirk in some clients that use uptime instead of real time let is_boot_time = timestamp < 60 * 60 * 24 * 1000; // < ~2.7 years in seconds - if !is_boot_time && (time_diff < TIME_SKEW_MIN || time_diff > TIME_SKEW_MAX) { + if !is_boot_time && !(TIME_SKEW_MIN..=TIME_SKEW_MAX).contains(&time_diff) { continue; } } @@ -393,7 +393,7 @@ pub fn build_server_hello( ) -> Vec { const MIN_APP_DATA: usize = 64; const MAX_APP_DATA: usize = 16640; // RFC 8446 §5.2 upper bound - let fake_cert_len = fake_cert_len.max(MIN_APP_DATA).min(MAX_APP_DATA); + let fake_cert_len = fake_cert_len.clamp(MIN_APP_DATA, MAX_APP_DATA); let x25519_key = gen_fake_x25519_key(rng); // Build ServerHello @@ -525,10 +525,10 @@ pub fn extract_sni_from_client_hello(handshake: &[u8]) -> Option { if sn_pos + name_len > sn_end { break; } - if name_type == 0 && name_len > 0 { - if let Ok(host) = std::str::from_utf8(&handshake[sn_pos..sn_pos + name_len]) { - return Some(host.to_string()); - } + if name_type == 0 && name_len > 0 + && let Ok(host) = std::str::from_utf8(&handshake[sn_pos..sn_pos + name_len]) + { + return Some(host.to_string()); } sn_pos += name_len; } @@ -571,7 +571,7 @@ pub fn extract_alpn_from_client_hello(handshake: &[u8]) -> Vec> { let list_len = u16::from_be_bytes([handshake[pos], handshake[pos+1]]) as usize; let mut lp = pos + 2; let list_end = (pos + 2).saturating_add(list_len).min(pos + elen); - while lp + 1 <= list_end { + while lp < list_end { let plen = handshake[lp] as usize; lp += 1; if lp + plen > list_end { break; } diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 051ce9e..483f6e0 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -594,18 +594,18 @@ impl RunningClientHandler { peer_addr: SocketAddr, ip_tracker: &UserIpTracker, ) -> Result<()> { - if let Some(expiration) = config.access.user_expirations.get(user) { - if chrono::Utc::now() > *expiration { - return Err(ProxyError::UserExpired { - user: user.to_string(), - }); - } + if let Some(expiration) = config.access.user_expirations.get(user) + && chrono::Utc::now() > *expiration + { + return Err(ProxyError::UserExpired { + user: user.to_string(), + }); } // IP limit check if let Err(reason) = ip_tracker.check_and_add(user, peer_addr.ip()).await { warn!( - user = %user, + user = %user, ip = %peer_addr.ip(), reason = %reason, "IP limit exceeded" @@ -615,20 +615,20 @@ impl RunningClientHandler { }); } - if let Some(limit) = config.access.user_max_tcp_conns.get(user) { - if stats.get_user_curr_connects(user) >= *limit as u64 { - return Err(ProxyError::ConnectionLimitExceeded { - user: user.to_string(), - }); - } + if let Some(limit) = config.access.user_max_tcp_conns.get(user) + && stats.get_user_curr_connects(user) >= *limit as u64 + { + return Err(ProxyError::ConnectionLimitExceeded { + user: user.to_string(), + }); } - if let Some(quota) = config.access.user_data_quota.get(user) { - if stats.get_user_total_octets(user) >= *quota { - return Err(ProxyError::DataQuotaExceeded { - user: user.to_string(), - }); - } + if let Some(quota) = config.access.user_data_quota.get(user) + && stats.get_user_total_octets(user) >= *quota + { + return Err(ProxyError::DataQuotaExceeded { + user: user.to_string(), + }); } Ok(()) diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index 630937b..e50623d 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -118,10 +118,10 @@ fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result { // Unknown DC requested by client without override: log and fall back. if !config.dc_overrides.contains_key(&dc_key) { warn!(dc_idx = dc_idx, "Requested non-standard DC with no override; falling back to default cluster"); - if let Some(path) = &config.general.unknown_dc_log_path { - if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) { - let _ = writeln!(file, "dc_idx={dc_idx}"); - } + if let Some(path) = &config.general.unknown_dc_log_path + && let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) + { + let _ = writeln!(file, "dc_idx={dc_idx}"); } } diff --git a/src/proxy/masking.rs b/src/proxy/masking.rs index 78ef806..72175fe 100644 --- a/src/proxy/masking.rs +++ b/src/proxy/masking.rs @@ -19,12 +19,12 @@ const MASK_BUFFER_SIZE: usize = 8192; /// Detect client type based on initial data fn detect_client_type(data: &[u8]) -> &'static str { // Check for HTTP request - if data.len() > 4 { - if data.starts_with(b"GET ") || data.starts_with(b"POST") || + if data.len() > 4 + && (data.starts_with(b"GET ") || data.starts_with(b"POST") || data.starts_with(b"HEAD") || data.starts_with(b"PUT ") || - data.starts_with(b"DELETE") || data.starts_with(b"OPTIONS") { - return "HTTP"; - } + data.starts_with(b"DELETE") || data.starts_with(b"OPTIONS")) + { + return "HTTP"; } // Check for TLS ClientHello (0x16 = handshake, 0x03 0x01-0x03 = TLS version) diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index a6a11e1..f089442 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -393,13 +393,13 @@ where .unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}")))); // When client closes, but ME channel stopped as unregistered - it isnt error - if client_closed { - if matches!( + if client_closed + && matches!( writer_result, Err(ProxyError::Proxy(ref msg)) if msg == "ME connection lost" - ) { - writer_result = Ok(()); - } + ) + { + writer_result = Ok(()); } let result = match (main_result, c2me_result, writer_result) { @@ -549,7 +549,7 @@ where match proto_tag { ProtoTag::Abridged => { - if data.len() % 4 != 0 { + if !data.len().is_multiple_of(4) { return Err(ProxyError::Proxy(format!( "Abridged payload must be 4-byte aligned, got {}", data.len() @@ -567,7 +567,7 @@ where frame_buf.push(first); frame_buf.extend_from_slice(data); client_writer - .write_all(&frame_buf) + .write_all(frame_buf) .await .map_err(ProxyError::Io)?; } else if len_words < (1 << 24) { @@ -581,7 +581,7 @@ where frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]); frame_buf.extend_from_slice(data); client_writer - .write_all(&frame_buf) + .write_all(frame_buf) .await .map_err(ProxyError::Io)?; } else { @@ -618,7 +618,7 @@ where rng.fill(&mut frame_buf[start..]); } client_writer - .write_all(&frame_buf) + .write_all(frame_buf) .await .map_err(ProxyError::Io)?; } diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 31e9d4f..a58996d 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -326,10 +326,10 @@ impl ReplayShard { // Use key.as_ref() to get &[u8] — avoids Borrow ambiguity // between Borrow<[u8]> and Borrow> - if let Some(entry) = self.cache.peek(key.as_ref()) { - if entry.seq == queue_seq { - self.cache.pop(key.as_ref()); - } + if let Some(entry) = self.cache.peek(key.as_ref()) + && entry.seq == queue_seq + { + self.cache.pop(key.as_ref()); } } } diff --git a/src/stream/crypto_stream.rs b/src/stream/crypto_stream.rs index 67d8c95..5303fe5 100644 --- a/src/stream/crypto_stream.rs +++ b/src/stream/crypto_stream.rs @@ -47,7 +47,7 @@ //! - when upstream is Pending but pending still has room: accept `to_accept` bytes and //! encrypt+append ciphertext directly into pending (in-place encryption of appended range) -//! Encrypted stream wrappers using AES-CTR +//! Encrypted stream wrappers using AES-CTR //! //! This module provides stateful async stream wrappers that handle //! encryption/decryption with proper partial read/write handling. @@ -153,9 +153,9 @@ impl CryptoReader { fn take_poison_error(&mut self) -> io::Error { match &mut self.state { CryptoReaderState::Poisoned { error } => error.take().unwrap_or_else(|| { - io::Error::new(ErrorKind::Other, "stream previously poisoned") + io::Error::other("stream previously poisoned") }), - _ => io::Error::new(ErrorKind::Other, "stream not poisoned"), + _ => io::Error::other("stream not poisoned"), } } } @@ -168,6 +168,7 @@ impl AsyncRead for CryptoReader { ) -> Poll> { let this = self.get_mut(); + #[allow(clippy::never_loop)] loop { match &mut this.state { CryptoReaderState::Poisoned { .. } => { @@ -485,14 +486,14 @@ impl CryptoWriter { fn take_poison_error(&mut self) -> io::Error { match &mut self.state { CryptoWriterState::Poisoned { error } => error.take().unwrap_or_else(|| { - io::Error::new(ErrorKind::Other, "stream previously poisoned") + io::Error::other("stream previously poisoned") }), - _ => io::Error::new(ErrorKind::Other, "stream not poisoned"), + _ => io::Error::other("stream not poisoned"), } } /// Ensure we are in Flushing state and return mutable pending buffer. - fn ensure_pending<'a>(state: &'a mut CryptoWriterState, max_pending: usize) -> &'a mut PendingCiphertext { + fn ensure_pending(state: &mut CryptoWriterState, max_pending: usize) -> &mut PendingCiphertext { if matches!(state, CryptoWriterState::Idle) { *state = CryptoWriterState::Flushing { pending: PendingCiphertext::new(max_pending), diff --git a/src/stream/frame_codec.rs b/src/stream/frame_codec.rs index 3de8257..2ff7de7 100644 --- a/src/stream/frame_codec.rs +++ b/src/stream/frame_codec.rs @@ -139,7 +139,7 @@ fn encode_abridged(frame: &Frame, dst: &mut BytesMut) -> io::Result<()> { let data = &frame.data; // Validate alignment - if data.len() % 4 != 0 { + if !data.len().is_multiple_of(4) { return Err(Error::new( ErrorKind::InvalidInput, format!("abridged frame must be 4-byte aligned, got {} bytes", data.len()) diff --git a/src/stream/frame_stream.rs b/src/stream/frame_stream.rs index b66c2cd..c729162 100644 --- a/src/stream/frame_stream.rs +++ b/src/stream/frame_stream.rs @@ -78,7 +78,7 @@ impl AbridgedFrameWriter { impl AbridgedFrameWriter { /// Write a frame pub async fn write_frame(&mut self, data: &[u8], meta: &FrameMeta) -> Result<()> { - if data.len() % 4 != 0 { + if !data.len().is_multiple_of(4) { return Err(Error::new( ErrorKind::InvalidInput, format!("Abridged frame must be aligned to 4 bytes, got {}", data.len()), @@ -331,7 +331,7 @@ impl MtprotoFrameReader { } // Validate length - if len < MIN_MSG_LEN || len > MAX_MSG_LEN || len % PADDING_FILLER.len() != 0 { + if !(MIN_MSG_LEN..=MAX_MSG_LEN).contains(&len) || !len.is_multiple_of(PADDING_FILLER.len()) { return Err(Error::new( ErrorKind::InvalidData, format!("Invalid message length: {}", len), diff --git a/src/stream/tls_stream.rs b/src/stream/tls_stream.rs index fa165db..fe28542 100644 --- a/src/stream/tls_stream.rs +++ b/src/stream/tls_stream.rs @@ -135,7 +135,7 @@ impl TlsRecordHeader { } /// Build header bytes - fn to_bytes(&self) -> [u8; 5] { + fn to_bytes(self) -> [u8; 5] { [ self.record_type, self.version[0], @@ -260,9 +260,9 @@ impl FakeTlsReader { fn take_poison_error(&mut self) -> io::Error { match &mut self.state { TlsReaderState::Poisoned { error } => error.take().unwrap_or_else(|| { - io::Error::new(ErrorKind::Other, "stream previously poisoned") + io::Error::other("stream previously poisoned") }), - _ => io::Error::new(ErrorKind::Other, "stream not poisoned"), + _ => io::Error::other("stream not poisoned"), } } } @@ -297,7 +297,7 @@ impl AsyncRead for FakeTlsReader { TlsReaderState::Poisoned { error } => { this.state = TlsReaderState::Poisoned { error: None }; let err = error.unwrap_or_else(|| { - io::Error::new(ErrorKind::Other, "stream previously poisoned") + io::Error::other("stream previously poisoned") }); return Poll::Ready(Err(err)); } @@ -616,9 +616,9 @@ impl FakeTlsWriter { fn take_poison_error(&mut self) -> io::Error { match &mut self.state { TlsWriterState::Poisoned { error } => error.take().unwrap_or_else(|| { - io::Error::new(ErrorKind::Other, "stream previously poisoned") + io::Error::other("stream previously poisoned") }), - _ => io::Error::new(ErrorKind::Other, "stream not poisoned"), + _ => io::Error::other("stream not poisoned"), } } @@ -682,7 +682,7 @@ impl AsyncWrite for FakeTlsWriter { TlsWriterState::Poisoned { error } => { this.state = TlsWriterState::Poisoned { error: None }; let err = error.unwrap_or_else(|| { - Error::new(ErrorKind::Other, "stream previously poisoned") + Error::other("stream previously poisoned") }); return Poll::Ready(Err(err)); } @@ -771,7 +771,7 @@ impl AsyncWrite for FakeTlsWriter { TlsWriterState::Poisoned { error } => { this.state = TlsWriterState::Poisoned { error: None }; let err = error.unwrap_or_else(|| { - Error::new(ErrorKind::Other, "stream previously poisoned") + Error::other("stream previously poisoned") }); return Poll::Ready(Err(err)); } diff --git a/src/tls_front/cache.rs b/src/tls_front/cache.rs index a425a35..23e60db 100644 --- a/src/tls_front/cache.rs +++ b/src/tls_front/cache.rs @@ -115,32 +115,32 @@ impl TlsFrontCache { if !name.ends_with(".json") { continue; } - if let Ok(data) = tokio::fs::read(entry.path()).await { - if let Ok(mut cached) = serde_json::from_slice::(&data) { - if cached.domain.is_empty() - || cached.domain.len() > 255 - || !cached.domain.chars().all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-') - { - warn!(file = %name, "Skipping TLS cache entry with invalid domain"); - continue; - } - // fetched_at is skipped during deserialization; approximate with file mtime if available. - if let Ok(meta) = entry.metadata().await { - if let Ok(modified) = meta.modified() { - cached.fetched_at = modified; - } - } - // Drop entries older than 72h - if let Ok(age) = cached.fetched_at.elapsed() { - if age > Duration::from_secs(72 * 3600) { - warn!(domain = %cached.domain, "Skipping stale TLS cache entry (>72h)"); - continue; - } - } - let domain = cached.domain.clone(); - self.set(&domain, cached).await; - loaded += 1; + if let Ok(data) = tokio::fs::read(entry.path()).await + && let Ok(mut cached) = serde_json::from_slice::(&data) + { + if cached.domain.is_empty() + || cached.domain.len() > 255 + || !cached.domain.chars().all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-') + { + warn!(file = %name, "Skipping TLS cache entry with invalid domain"); + continue; } + // fetched_at is skipped during deserialization; approximate with file mtime if available. + if let Ok(meta) = entry.metadata().await + && let Ok(modified) = meta.modified() + { + cached.fetched_at = modified; + } + // Drop entries older than 72h + if let Ok(age) = cached.fetched_at.elapsed() + && age > Duration::from_secs(72 * 3600) + { + warn!(domain = %cached.domain, "Skipping stale TLS cache entry (>72h)"); + continue; + } + let domain = cached.domain.clone(); + self.set(&domain, cached).await; + loaded += 1; } } } diff --git a/src/tls_front/emulator.rs b/src/tls_front/emulator.rs index 25d2a8c..c8c18ac 100644 --- a/src/tls_front/emulator.rs +++ b/src/tls_front/emulator.rs @@ -12,7 +12,7 @@ fn jitter_and_clamp_sizes(sizes: &[usize], rng: &SecureRandom) -> Vec { sizes .iter() .map(|&size| { - let base = size.max(MIN_APP_DATA).min(MAX_APP_DATA); + let base = size.clamp(MIN_APP_DATA, MAX_APP_DATA); let jitter_range = ((base as f64) * 0.03).round() as i64; if jitter_range == 0 { return base; @@ -50,7 +50,7 @@ fn ensure_payload_capacity(mut sizes: Vec, payload_len: usize) -> Vec 17 { + let body_len = size - 17; + rec.extend_from_slice(&rng.bytes(body_len)); + rec.push(0x16); // inner content type marker (handshake) + rec.extend_from_slice(&rng.bytes(16)); // AEAD-like tag } else { - if size > 17 { - let body_len = size - 17; - rec.extend_from_slice(&rng.bytes(body_len)); - rec.push(0x16); // inner content type marker (handshake) - rec.extend_from_slice(&rng.bytes(16)); // AEAD-like tag - } else { - rec.extend_from_slice(&rng.bytes(size)); - } + rec.extend_from_slice(&rng.bytes(size)); } app_data.extend_from_slice(&rec); } diff --git a/src/tls_front/fetcher.rs b/src/tls_front/fetcher.rs index 4678ea3..7ac4b42 100644 --- a/src/tls_front/fetcher.rs +++ b/src/tls_front/fetcher.rs @@ -384,7 +384,7 @@ async fn fetch_via_raw_tls( for _ in 0..4 { match timeout(connect_timeout, read_tls_record(&mut stream)).await { Ok(Ok(rec)) => records.push(rec), - Ok(Err(e)) => return Err(e.into()), + Ok(Err(e)) => return Err(e), Err(_) => break, } if records.len() >= 3 && records.iter().any(|(t, _)| *t == TLS_RECORD_APPLICATION) { diff --git a/src/transport/middle_proxy/codec.rs b/src/transport/middle_proxy/codec.rs index 6d83761..6df0466 100644 --- a/src/transport/middle_proxy/codec.rs +++ b/src/transport/middle_proxy/codec.rs @@ -165,11 +165,10 @@ fn process_pid16() -> u16 { } fn process_utime() -> u32 { - let utime = std::time::SystemTime::now() + std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() - .as_secs() as u32; - utime + .as_secs() as u32 } pub(crate) fn cbc_encrypt_padded( diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 96d5f91..56d5b81 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -40,14 +40,16 @@ pub struct ProxyConfigData { } fn parse_host_port(s: &str) -> Option<(IpAddr, u16)> { - if let Some(bracket_end) = s.rfind(']') { - if s.starts_with('[') && bracket_end + 1 < s.len() && s.as_bytes().get(bracket_end + 1) == Some(&b':') { - let host = &s[1..bracket_end]; - let port_str = &s[bracket_end + 2..]; - let ip = host.parse::().ok()?; - let port = port_str.parse::().ok()?; - return Some((ip, port)); - } + if let Some(bracket_end) = s.rfind(']') + && s.starts_with('[') + && bracket_end + 1 < s.len() + && s.as_bytes().get(bracket_end + 1) == Some(&b':') + { + let host = &s[1..bracket_end]; + let port_str = &s[bracket_end + 2..]; + let ip = host.parse::().ok()?; + let port = port_str.parse::().ok()?; + return Some((ip, port)); } let idx = s.rfind(':')?; @@ -84,20 +86,18 @@ pub async fn fetch_proxy_config(url: &str) -> Result { .map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))? ; - if let Some(date) = resp.headers().get(reqwest::header::DATE) { - if let Ok(date_str) = date.to_str() { - if let Ok(server_time) = httpdate::parse_http_date(date_str) { - if let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| { - server_time.duration_since(SystemTime::now()).map_err(|_| e) - }) { - let skew_secs = skew.as_secs(); - if skew_secs > 60 { - warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header"); - } else if skew_secs > 30 { - warn!(skew_secs, "Time skew >30s detected from fetch_proxy_config Date header"); - } - } - } + if let Some(date) = resp.headers().get(reqwest::header::DATE) + && let Ok(date_str) = date.to_str() + && let Ok(server_time) = httpdate::parse_http_date(date_str) + && let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| { + server_time.duration_since(SystemTime::now()).map_err(|_| e) + }) + { + let skew_secs = skew.as_secs(); + if skew_secs > 60 { + warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header"); + } else if skew_secs > 30 { + warn!(skew_secs, "Time skew >30s detected from fetch_proxy_config Date header"); } } diff --git a/src/transport/middle_proxy/handshake.rs b/src/transport/middle_proxy/handshake.rs index 95a9d6e..d9bcdde 100644 --- a/src/transport/middle_proxy/handshake.rs +++ b/src/transport/middle_proxy/handshake.rs @@ -47,21 +47,21 @@ impl MePool { pub(crate) async fn connect_tcp(&self, addr: SocketAddr) -> Result<(TcpStream, f64)> { let start = Instant::now(); let connect_fut = async { - if addr.is_ipv6() { - if let Some(v6) = self.detected_ipv6 { - match TcpSocket::new_v6() { - Ok(sock) => { - if let Err(e) = sock.bind(SocketAddr::new(IpAddr::V6(v6), 0)) { - debug!(error = %e, bind_ip = %v6, "ME IPv6 bind failed, falling back to default bind"); - } else { - match sock.connect(addr).await { - Ok(stream) => return Ok(stream), - Err(e) => debug!(error = %e, target = %addr, "ME IPv6 bound connect failed, retrying default connect"), - } + if addr.is_ipv6() + && let Some(v6) = self.detected_ipv6 + { + match TcpSocket::new_v6() { + Ok(sock) => { + if let Err(e) = sock.bind(SocketAddr::new(IpAddr::V6(v6), 0)) { + debug!(error = %e, bind_ip = %v6, "ME IPv6 bind failed, falling back to default bind"); + } else { + match sock.connect(addr).await { + Ok(stream) => return Ok(stream), + Err(e) => debug!(error = %e, target = %addr, "ME IPv6 bound connect failed, retrying default connect"), } } - Err(e) => debug!(error = %e, "ME IPv6 socket creation failed, falling back to default connect"), } + Err(e) => debug!(error = %e, "ME IPv6 socket creation failed, falling back to default connect"), } } TcpStream::connect(addr).await diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index e73e5f1..4bb7e64 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -92,10 +92,10 @@ async fn check_family( let key = (dc, family); let now = Instant::now(); - if let Some(ts) = next_attempt.get(&key) { - if now < *ts { - continue; - } + if let Some(ts) = next_attempt.get(&key) + && now < *ts + { + continue; } let max_concurrent = pool.me_reconnect_max_concurrent_per_dc.max(1) as usize; diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 2047e80..06fdc96 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -498,10 +498,10 @@ impl MePool { let mut guard = self.proxy_map_v4.write().await; let keys: Vec = guard.keys().cloned().collect(); for k in keys.iter().cloned().filter(|k| *k > 0) { - if !guard.contains_key(&-k) { - if let Some(addrs) = guard.get(&k).cloned() { - guard.insert(-k, addrs); - } + if !guard.contains_key(&-k) + && let Some(addrs) = guard.get(&k).cloned() + { + guard.insert(-k, addrs); } } } @@ -509,10 +509,10 @@ impl MePool { let mut guard = self.proxy_map_v6.write().await; let keys: Vec = guard.keys().cloned().collect(); for k in keys.iter().cloned().filter(|k| *k > 0) { - if !guard.contains_key(&-k) { - if let Some(addrs) = guard.get(&k).cloned() { - guard.insert(-k, addrs); - } + if !guard.contains_key(&-k) + && let Some(addrs) = guard.get(&k).cloned() + { + guard.insert(-k, addrs); } } } @@ -760,13 +760,12 @@ impl MePool { cancel_reader_token.clone(), ) .await; - if let Some(pool) = pool.upgrade() { - if cleanup_for_reader + if let Some(pool) = pool.upgrade() + && cleanup_for_reader .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() - { - pool.remove_writer_and_close_clients(writer_id).await; - } + { + pool.remove_writer_and_close_clients(writer_id).await; } if let Err(e) = res { warn!(error = %e, "ME reader ended"); @@ -834,13 +833,12 @@ impl MePool { stats_ping.increment_me_keepalive_failed(); debug!("ME ping failed, removing dead writer"); cancel_ping.cancel(); - if let Some(pool) = pool_ping.upgrade() { - if cleanup_for_ping + if let Some(pool) = pool_ping.upgrade() + && cleanup_for_ping .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() - { - pool.remove_writer_and_close_clients(writer_id).await; - } + { + pool.remove_writer_and_close_clients(writer_id).await; } break; } @@ -943,24 +941,20 @@ impl MePool { let pool = Arc::downgrade(self); tokio::spawn(async move { let deadline = timeout.map(|t| Instant::now() + t); - loop { - if let Some(p) = pool.upgrade() { - if let Some(deadline_at) = deadline { - if Instant::now() >= deadline_at { - warn!(writer_id, "Drain timeout, force-closing"); - p.stats.increment_pool_force_close_total(); - let _ = p.remove_writer_and_close_clients(writer_id).await; - break; - } - } - if p.registry.is_writer_empty(writer_id).await { - let _ = p.remove_writer_only(writer_id).await; - break; - } - tokio::time::sleep(Duration::from_secs(1)).await; - } else { + while let Some(p) = pool.upgrade() { + if let Some(deadline_at) = deadline + && Instant::now() >= deadline_at + { + warn!(writer_id, "Drain timeout, force-closing"); + p.stats.increment_pool_force_close_total(); + let _ = p.remove_writer_and_close_clients(writer_id).await; break; } + if p.registry.is_writer_empty(writer_id).await { + let _ = p.remove_writer_only(writer_id).await; + break; + } + tokio::time::sleep(Duration::from_secs(1)).await; } }); } diff --git a/src/transport/middle_proxy/pool_nat.rs b/src/transport/middle_proxy/pool_nat.rs index 4d9e2a1..9936707 100644 --- a/src/transport/middle_proxy/pool_nat.rs +++ b/src/transport/middle_proxy/pool_nat.rs @@ -25,7 +25,7 @@ impl MePool { pub(super) fn translate_ip_for_nat(&self, ip: IpAddr) -> IpAddr { let nat_ip = self .nat_ip_cfg - .or_else(|| self.nat_ip_detected.try_read().ok().and_then(|g| (*g).clone())); + .or_else(|| self.nat_ip_detected.try_read().ok().and_then(|g| *g)); let Some(nat_ip) = nat_ip else { return ip; @@ -75,7 +75,7 @@ impl MePool { return None; } - if let Some(ip) = self.nat_ip_detected.read().await.clone() { + if let Some(ip) = *self.nat_ip_detected.read().await { return Some(ip); } @@ -102,17 +102,17 @@ impl MePool { ) -> Option { const STUN_CACHE_TTL: Duration = Duration::from_secs(600); // Backoff window - if let Some(until) = *self.stun_backoff_until.read().await { - if Instant::now() < until { - if let Ok(cache) = self.nat_reflection_cache.try_lock() { - let slot = match family { - IpFamily::V4 => cache.v4, - IpFamily::V6 => cache.v6, - }; - return slot.map(|(_, addr)| addr); - } - return None; + if let Some(until) = *self.stun_backoff_until.read().await + && Instant::now() < until + { + if let Ok(cache) = self.nat_reflection_cache.try_lock() { + let slot = match family { + IpFamily::V4 => cache.v4, + IpFamily::V6 => cache.v6, + }; + return slot.map(|(_, addr)| addr); } + return None; } if let Ok(mut cache) = self.nat_reflection_cache.try_lock() { @@ -120,10 +120,10 @@ impl MePool { IpFamily::V4 => &mut cache.v4, IpFamily::V6 => &mut cache.v6, }; - if let Some((ts, addr)) = slot { - if ts.elapsed() < STUN_CACHE_TTL { - return Some(*addr); - } + if let Some((ts, addr)) = slot + && ts.elapsed() < STUN_CACHE_TTL + { + return Some(*addr); } } diff --git a/src/transport/middle_proxy/secret.rs b/src/transport/middle_proxy/secret.rs index 9641143..69a3198 100644 --- a/src/transport/middle_proxy/secret.rs +++ b/src/transport/middle_proxy/secret.rs @@ -63,20 +63,18 @@ pub async fn download_proxy_secret() -> Result> { ))); } - if let Some(date) = resp.headers().get(reqwest::header::DATE) { - if let Ok(date_str) = date.to_str() { - if let Ok(server_time) = httpdate::parse_http_date(date_str) { - if let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| { - server_time.duration_since(SystemTime::now()).map_err(|_| e) - }) { - let skew_secs = skew.as_secs(); - if skew_secs > 60 { - warn!(skew_secs, "Time skew >60s detected from proxy-secret Date header"); - } else if skew_secs > 30 { - warn!(skew_secs, "Time skew >30s detected from proxy-secret Date header"); - } - } - } + if let Some(date) = resp.headers().get(reqwest::header::DATE) + && let Ok(date_str) = date.to_str() + && let Ok(server_time) = httpdate::parse_http_date(date_str) + && let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| { + server_time.duration_since(SystemTime::now()).map_err(|_| e) + }) + { + let skew_secs = skew.as_secs(); + if skew_secs > 60 { + warn!(skew_secs, "Time skew >60s detected from proxy-secret Date header"); + } else if skew_secs > 30 { + warn!(skew_secs, "Time skew >30s detected from proxy-secret Date header"); } } diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 56bd17a..8867212 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -242,10 +242,10 @@ impl MePool { } if preferred.is_empty() { let def = self.default_dc.load(Ordering::Relaxed); - if def != 0 { - if let Some(v) = map_guard.get(&def) { - preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); - } + if def != 0 + && let Some(v) = map_guard.get(&def) + { + preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); } } @@ -267,7 +267,7 @@ impl MePool { if !self.writer_accepts_new_binding(w) { continue; } - if preferred.iter().any(|p| *p == w.addr) { + if preferred.contains(&w.addr) { out.push(idx); } } diff --git a/src/transport/socket.rs b/src/transport/socket.rs index 0a20c3c..f1f8d5c 100644 --- a/src/transport/socket.rs +++ b/src/transport/socket.rs @@ -136,17 +136,17 @@ pub fn resolve_interface_ip(name: &str, want_ipv6: bool) -> Option { if let Ok(addrs) = getifaddrs() { for iface in addrs { - if iface.interface_name == name { - if let Some(address) = iface.address { - if let Some(v4) = address.as_sockaddr_in() { - if !want_ipv6 { - return Some(IpAddr::V4(v4.ip())); - } - } else if let Some(v6) = address.as_sockaddr_in6() { - if want_ipv6 { - return Some(IpAddr::V6(v6.ip().clone())); - } + if iface.interface_name == name + && let Some(address) = iface.address + { + if let Some(v4) = address.as_sockaddr_in() { + if !want_ipv6 { + return Some(IpAddr::V4(v4.ip())); } + } else if let Some(v6) = address.as_sockaddr_in6() + && want_ipv6 + { + return Some(IpAddr::V6(v6.ip())); } } } diff --git a/src/transport/socks.rs b/src/transport/socks.rs index 188d369..8196b52 100644 --- a/src/transport/socks.rs +++ b/src/transport/socks.rs @@ -27,11 +27,11 @@ pub async fn connect_socks4( buf.extend_from_slice(user); buf.push(0); // NULL - stream.write_all(&buf).await.map_err(|e| ProxyError::Io(e))?; + stream.write_all(&buf).await.map_err(ProxyError::Io)?; // Response: VN (1) | CD (1) | DSTPORT (2) | DSTIP (4) let mut resp = [0u8; 8]; - stream.read_exact(&mut resp).await.map_err(|e| ProxyError::Io(e))?; + stream.read_exact(&mut resp).await.map_err(ProxyError::Io)?; if resp[1] != 90 { return Err(ProxyError::Proxy(format!("SOCKS4 request rejected: code {}", resp[1]))); @@ -56,10 +56,10 @@ pub async fn connect_socks5( let mut buf = vec![5u8, methods.len() as u8]; buf.extend_from_slice(&methods); - stream.write_all(&buf).await.map_err(|e| ProxyError::Io(e))?; + stream.write_all(&buf).await.map_err(ProxyError::Io)?; let mut resp = [0u8; 2]; - stream.read_exact(&mut resp).await.map_err(|e| ProxyError::Io(e))?; + stream.read_exact(&mut resp).await.map_err(ProxyError::Io)?; if resp[0] != 5 { return Err(ProxyError::Proxy("Invalid SOCKS5 version".to_string())); @@ -80,10 +80,10 @@ pub async fn connect_socks5( auth_buf.push(p_bytes.len() as u8); auth_buf.extend_from_slice(p_bytes); - stream.write_all(&auth_buf).await.map_err(|e| ProxyError::Io(e))?; + stream.write_all(&auth_buf).await.map_err(ProxyError::Io)?; let mut auth_resp = [0u8; 2]; - stream.read_exact(&mut auth_resp).await.map_err(|e| ProxyError::Io(e))?; + stream.read_exact(&mut auth_resp).await.map_err(ProxyError::Io)?; if auth_resp[1] != 0 { return Err(ProxyError::Proxy("SOCKS5 authentication failed".to_string())); @@ -112,11 +112,11 @@ pub async fn connect_socks5( req.extend_from_slice(&target.port().to_be_bytes()); - stream.write_all(&req).await.map_err(|e| ProxyError::Io(e))?; + stream.write_all(&req).await.map_err(ProxyError::Io)?; // Response let mut head = [0u8; 4]; - stream.read_exact(&mut head).await.map_err(|e| ProxyError::Io(e))?; + stream.read_exact(&mut head).await.map_err(ProxyError::Io)?; if head[1] != 0 { return Err(ProxyError::Proxy(format!("SOCKS5 request failed: code {}", head[1]))); @@ -126,17 +126,17 @@ pub async fn connect_socks5( match head[3] { 1 => { // IPv4 let mut addr = [0u8; 4 + 2]; - stream.read_exact(&mut addr).await.map_err(|e| ProxyError::Io(e))?; + stream.read_exact(&mut addr).await.map_err(ProxyError::Io)?; }, 3 => { // Domain let mut len = [0u8; 1]; - stream.read_exact(&mut len).await.map_err(|e| ProxyError::Io(e))?; + stream.read_exact(&mut len).await.map_err(ProxyError::Io)?; let mut addr = vec![0u8; len[0] as usize + 2]; - stream.read_exact(&mut addr).await.map_err(|e| ProxyError::Io(e))?; + stream.read_exact(&mut addr).await.map_err(ProxyError::Io)?; }, 4 => { // IPv6 let mut addr = [0u8; 16 + 2]; - stream.read_exact(&mut addr).await.map_err(|e| ProxyError::Io(e))?; + stream.read_exact(&mut addr).await.map_err(ProxyError::Io)?; }, _ => return Err(ProxyError::Proxy("Invalid address type in SOCKS5 response".to_string())), } diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index 887fa99..e2198a8 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -57,9 +57,10 @@ impl LatencyEma { // ============= Per-DC IP Preference Tracking ============= /// Tracks which IP version works for each DC -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub enum IpPreference { /// Not yet tested + #[default] Unknown, /// IPv6 works PreferV6, @@ -71,12 +72,6 @@ pub enum IpPreference { Unavailable, } -impl Default for IpPreference { - fn default() -> Self { - Self::Unknown - } -} - // ============= Upstream State ============= #[derive(Debug)] @@ -112,7 +107,7 @@ impl UpstreamState { if abs_dc == 0 { return None; } - if abs_dc >= 1 && abs_dc <= NUM_DCS { + if (1..=NUM_DCS).contains(&abs_dc) { Some(abs_dc - 1) } else { // Unknown DC → default cluster (DC 2, index 1) @@ -122,10 +117,10 @@ impl UpstreamState { /// Get latency for a specific DC, falling back to average across all known DCs fn effective_latency(&self, dc_idx: Option) -> Option { - if let Some(di) = dc_idx.and_then(Self::dc_array_idx) { - if let Some(ms) = self.dc_latency[di].get() { - return Some(ms); - } + if let Some(di) = dc_idx.and_then(Self::dc_array_idx) + && let Some(ms) = self.dc_latency[di].get() + { + return Some(ms); } let (sum, count) = self.dc_latency.iter() @@ -582,7 +577,7 @@ impl UpstreamManager { let result = tokio::time::timeout( Duration::from_secs(DC_PING_TIMEOUT_SECS), - self.ping_single_dc(&upstream_config, Some(bind_rr.clone()), addr_v6) + self.ping_single_dc(upstream_config, Some(bind_rr.clone()), addr_v6) ).await; let ping_result = match result { @@ -633,7 +628,7 @@ impl UpstreamManager { let result = tokio::time::timeout( Duration::from_secs(DC_PING_TIMEOUT_SECS), - self.ping_single_dc(&upstream_config, Some(bind_rr.clone()), addr_v4) + self.ping_single_dc(upstream_config, Some(bind_rr.clone()), addr_v4) ).await; let ping_result = match result { @@ -696,7 +691,7 @@ impl UpstreamManager { } let result = tokio::time::timeout( Duration::from_secs(DC_PING_TIMEOUT_SECS), - self.ping_single_dc(&upstream_config, Some(bind_rr.clone()), addr) + self.ping_single_dc(upstream_config, Some(bind_rr.clone()), addr) ).await; let ping_result = match result { diff --git a/src/util/ip.rs b/src/util/ip.rs index f3e774f..36a5759 100644 --- a/src/util/ip.rs +++ b/src/util/ip.rs @@ -67,54 +67,56 @@ pub async fn detect_ip() -> IpInfo { // Try to get local interface IP first (default gateway interface) // We connect to Google DNS to find out which interface is used for routing - if let Some(ip) = get_local_ip("8.8.8.8:80") { - if ip.is_ipv4() && !ip.is_loopback() { - info.ipv4 = Some(ip); - debug!(ip = %ip, "Detected local IPv4 address via routing"); - } + if let Some(ip) = get_local_ip("8.8.8.8:80") + && ip.is_ipv4() + && !ip.is_loopback() + { + info.ipv4 = Some(ip); + debug!(ip = %ip, "Detected local IPv4 address via routing"); } - if let Some(ip) = get_local_ipv6("[2001:4860:4860::8888]:80") { - if ip.is_ipv6() && !ip.is_loopback() { - info.ipv6 = Some(ip); - debug!(ip = %ip, "Detected local IPv6 address via routing"); - } + if let Some(ip) = get_local_ipv6("[2001:4860:4860::8888]:80") + && ip.is_ipv6() + && !ip.is_loopback() + { + info.ipv6 = Some(ip); + debug!(ip = %ip, "Detected local IPv6 address via routing"); } - - // If local detection failed or returned private IP (and we want public), + + // If local detection failed or returned private IP (and we want public), // or just as a fallback/verification, we might want to check external services. - // However, the requirement is: "if IP for listening is not set... it should be IP from interface... + // However, the requirement is: "if IP for listening is not set... it should be IP from interface... // if impossible - request external resources". - + // So if we found a local IP, we might be good. But often servers are behind NAT. // If the local IP is private, we probably want the public IP for the tg:// link. // Let's check if the detected IPs are private. - - let need_external_v4 = info.ipv4.map_or(true, |ip| is_private_ip(ip)); - let need_external_v6 = info.ipv6.map_or(true, |ip| is_private_ip(ip)); + + let need_external_v4 = info.ipv4.is_none_or(is_private_ip); + let need_external_v6 = info.ipv6.is_none_or(is_private_ip); if need_external_v4 { debug!("Local IPv4 is private or missing, checking external services..."); for url in IPV4_URLS { - if let Some(ip) = fetch_ip(url).await { - if ip.is_ipv4() { - info.ipv4 = Some(ip); - debug!(ip = %ip, "Detected public IPv4 address"); - break; - } + if let Some(ip) = fetch_ip(url).await + && ip.is_ipv4() + { + info.ipv4 = Some(ip); + debug!(ip = %ip, "Detected public IPv4 address"); + break; } } } - + if need_external_v6 { debug!("Local IPv6 is private or missing, checking external services..."); for url in IPV6_URLS { - if let Some(ip) = fetch_ip(url).await { - if ip.is_ipv6() { - info.ipv6 = Some(ip); - debug!(ip = %ip, "Detected public IPv6 address"); - break; - } + if let Some(ip) = fetch_ip(url).await + && ip.is_ipv6() + { + info.ipv6 = Some(ip); + debug!(ip = %ip, "Detected public IPv6 address"); + break; } } } diff --git a/src/util/time.rs b/src/util/time.rs index 310b015..07ea0ba 100644 --- a/src/util/time.rs +++ b/src/util/time.rs @@ -67,15 +67,15 @@ pub async fn check_time_sync() -> Option { #[allow(dead_code)] pub async fn time_sync_task(check_interval: Duration) -> ! { loop { - if let Some(result) = check_time_sync().await { - if result.is_skewed { - error!( - "System clock is off by {} seconds. Please sync your clock.", - result.skew_secs - ); - } + if let Some(result) = check_time_sync().await + && result.is_skewed + { + error!( + "System clock is off by {} seconds. Please sync your clock.", + result.skew_secs + ); } - + tokio::time::sleep(check_interval).await; } } \ No newline at end of file