From 17a966b822bf044f851a5b83fde77aee2752bb5b Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 17 Apr 2026 10:48:01 +0300 Subject: [PATCH] Rustfmt --- src/config/hot_reload.rs | 3 +- src/daemon/mod.rs | 8 +-- src/maestro/runtime_tasks.rs | 12 ++++- src/proxy/direct_relay.rs | 4 +- src/proxy/relay.rs | 20 +++---- src/proxy/traffic_limiter.rs | 38 ++++++++------ src/tls_front/emulator.rs | 14 ++--- src/tls_front/fetcher.rs | 24 ++++++--- .../middle_proxy/fairness/pressure.rs | 4 +- .../middle_proxy/fairness/scheduler.rs | 52 ++++++++++++------- src/transport/middle_proxy/mod.rs | 6 +-- src/transport/middle_proxy/reader.rs | 12 +++-- .../tests/fairness_security_tests.rs | 3 +- 13 files changed, 116 insertions(+), 84 deletions(-) diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 4a71e37..48b56f8 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -122,7 +122,8 @@ pub struct HotFields { pub user_expirations: std::collections::HashMap>, pub user_data_quota: std::collections::HashMap, pub user_rate_limits: std::collections::HashMap, - pub cidr_rate_limits: std::collections::HashMap, + pub cidr_rate_limits: + std::collections::HashMap, pub user_max_unique_ips: std::collections::HashMap, pub user_max_unique_ips_global_each: usize, pub user_max_unique_ips_mode: crate::config::UserMaxUniqueIpsMode, diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 15d8812..a8d201d 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -8,8 +8,8 @@ use std::io::{self, Read, Write}; use std::os::unix::fs::OpenOptionsExt; use std::path::{Path, PathBuf}; -use nix::fcntl::{Flock, FlockArg}; use nix::errno::Errno; +use nix::fcntl::{Flock, FlockArg}; use nix::unistd::{self, ForkResult, Gid, Pid, Uid, chdir, close, fork, getpid, setsid}; use tracing::{debug, info, warn}; @@ -350,11 +350,7 @@ fn set_supplementary_groups(gid: Gid) -> Result<(), nix::Error> { groups.as_ptr(), ) }; - if rc == 0 { - Ok(()) - } else { - Err(Errno::last()) - } + if rc == 0 { Ok(()) } else { Err(Errno::last()) } } #[cfg(not(target_os = "macos"))] diff --git a/src/maestro/runtime_tasks.rs b/src/maestro/runtime_tasks.rs index b48060c..da059bd 100644 --- a/src/maestro/runtime_tasks.rs +++ b/src/maestro/runtime_tasks.rs @@ -190,8 +190,16 @@ pub(crate) async fn spawn_runtime_tasks( ); let mut config_rx_rate_limits = config_rx.clone(); tokio::spawn(async move { - let mut prev_user_limits = config_rx_rate_limits.borrow().access.user_rate_limits.clone(); - let mut prev_cidr_limits = config_rx_rate_limits.borrow().access.cidr_rate_limits.clone(); + let mut prev_user_limits = config_rx_rate_limits + .borrow() + .access + .user_rate_limits + .clone(); + let mut prev_cidr_limits = config_rx_rate_limits + .borrow() + .access + .cidr_rate_limits + .clone(); loop { if config_rx_rate_limits.changed().await.is_err() { break; diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index 8b6189b..6bd2101 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -316,7 +316,9 @@ where stats.increment_user_connects(user); let _direct_connection_lease = stats.acquire_direct_connection_lease(); - let traffic_lease = shared.traffic_limiter.acquire_lease(user, success.peer.ip()); + let traffic_lease = shared + .traffic_limiter + .acquire_lease(user, success.peer.ip()); let buffer_pool_trim = Arc::clone(&buffer_pool); let relay_activity_timeout = if shared.conntrack_pressure_active() { diff --git a/src/proxy/relay.rs b/src/proxy/relay.rs index a59ca5b..c9e6a98 100644 --- a/src/proxy/relay.rs +++ b/src/proxy/relay.rs @@ -289,17 +289,9 @@ impl StatsIo { let Some(started_at) = wait.started_at.take() else { return; }; - let wait_ms = started_at - .elapsed() - .as_millis() - .min(u128::from(u64::MAX)) as u64; + let wait_ms = started_at.elapsed().as_millis().min(u128::from(u64::MAX)) as u64; if let Some(lease) = lease { - lease.observe_wait_ms( - direction, - wait.blocked_user, - wait.blocked_cidr, - wait_ms, - ); + lease.observe_wait_ms(direction, wait.blocked_user, wait.blocked_cidr, wait_ms); } wait.blocked_user = false; wait.blocked_cidr = false; @@ -340,8 +332,7 @@ impl StatsIo { while self.c2s_rate_debt_bytes > 0 { let consume = lease.try_consume(RateDirection::Up, self.c2s_rate_debt_bytes); if consume.granted > 0 { - self.c2s_rate_debt_bytes = - self.c2s_rate_debt_bytes.saturating_sub(consume.granted); + self.c2s_rate_debt_bytes = self.c2s_rate_debt_bytes.saturating_sub(consume.granted); continue; } Self::arm_wait( @@ -647,7 +638,10 @@ impl AsyncWrite for StatsIo { match Pin::new(&mut this.inner).poll_write(cx, write_buf) { Poll::Ready(Ok(n)) => { if reserved_bytes > n as u64 { - refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_bytes - n as u64); + refund_reserved_quota_bytes( + this.user_stats.as_ref(), + reserved_bytes - n as u64, + ); } if shaper_reserved_bytes > n as u64 && let Some(lease) = this.traffic_lease.as_ref() diff --git a/src/proxy/traffic_limiter.rs b/src/proxy/traffic_limiter.rs index 5c93944..0edfa0f 100644 --- a/src/proxy/traffic_limiter.rs +++ b/src/proxy/traffic_limiter.rs @@ -74,7 +74,8 @@ impl ScopeMetrics { self.wait_up_ms_total.fetch_add(wait_ms, Ordering::Relaxed); } RateDirection::Down => { - self.wait_down_ms_total.fetch_add(wait_ms, Ordering::Relaxed); + self.wait_down_ms_total + .fetch_add(wait_ms, Ordering::Relaxed); } } } @@ -254,9 +255,7 @@ impl CidrDirectionBucket { let grant = if guaranteed_remaining > 0 { requested.min(guaranteed_remaining).min(total_remaining) } else { - requested - .min(total_remaining) - .min(MAX_BORROW_CHUNK_BYTES) + requested.min(total_remaining).min(MAX_BORROW_CHUNK_BYTES) }; if grant == 0 { @@ -266,12 +265,7 @@ impl CidrDirectionBucket { let next_total = total_used.saturating_add(grant); if self .used - .compare_exchange_weak( - total_used, - next_total, - Ordering::Relaxed, - Ordering::Relaxed, - ) + .compare_exchange_weak(total_used, next_total, Ordering::Relaxed, Ordering::Relaxed) .is_ok() { user_state.used.fetch_add(grant, Ordering::Relaxed); @@ -430,8 +424,14 @@ struct PolicySnapshot { impl PolicySnapshot { fn match_cidr(&self, ip: IpAddr) -> Option<&CidrRule> { match ip { - IpAddr::V4(_) => self.cidr_rules_v4.iter().find(|rule| rule.cidr.contains(ip)), - IpAddr::V6(_) => self.cidr_rules_v6.iter().find(|rule| rule.cidr.contains(ip)), + IpAddr::V4(_) => self + .cidr_rules_v4 + .iter() + .find(|rule| rule.cidr.contains(ip)), + IpAddr::V6(_) => self + .cidr_rules_v6 + .iter() + .find(|rule| rule.cidr.contains(ip)), } } } @@ -535,7 +535,8 @@ impl TrafficLease { if let (Some(cidr_bucket), Some(cidr_user_share)) = (self.cidr_bucket.as_ref(), self.cidr_user_share.as_ref()) { - let cidr_granted = cidr_bucket.try_consume_for_user(direction, cidr_user_share, granted); + let cidr_granted = + cidr_bucket.try_consume_for_user(direction, cidr_user_share, granted); if cidr_granted < granted && let Some(user_bucket) = self.user_bucket.as_ref() { @@ -693,7 +694,9 @@ impl TrafficLimiter { .get_or_insert_with(user, || UserBucket::new(limit)); bucket.set_rates(limit); bucket.active_leases.fetch_add(1, Ordering::Relaxed); - self.user_scope.active_leases.fetch_add(1, Ordering::Relaxed); + self.user_scope + .active_leases + .fetch_add(1, Ordering::Relaxed); user_bucket = Some(bucket); } @@ -706,7 +709,9 @@ impl TrafficLimiter { .get_or_insert_with(rule.key.as_str(), || CidrBucket::new(rule.limits)); bucket.set_rates(rule.limits); bucket.active_leases.fetch_add(1, Ordering::Relaxed); - self.cidr_scope.active_leases.fetch_add(1, Ordering::Relaxed); + self.cidr_scope + .active_leases + .fetch_add(1, Ordering::Relaxed); let share = bucket.acquire_user_share(user); cidr_user_key = Some(user.to_string()); cidr_user_share = Some(share); @@ -784,7 +789,8 @@ impl TrafficLimiter { let policy = self.policy.load_full(); self.user_buckets.retain(|user, bucket| { - bucket.active_leases.load(Ordering::Relaxed) > 0 || policy.user_limits.contains_key(user) + bucket.active_leases.load(Ordering::Relaxed) > 0 + || policy.user_limits.contains_key(user) }); self.cidr_buckets.retain(|cidr_key, bucket| { bucket.cleanup_idle_users(); diff --git a/src/tls_front/emulator.rs b/src/tls_front/emulator.rs index 2aa83c8..a23373d 100644 --- a/src/tls_front/emulator.rs +++ b/src/tls_front/emulator.rs @@ -98,7 +98,9 @@ fn emulated_ticket_record_sizes( let target_count = sizes .len() - .max(usize::from(new_session_tickets.min(MAX_TICKET_RECORDS as u8))) + .max(usize::from( + new_session_tickets.min(MAX_TICKET_RECORDS as u8), + )) .min(MAX_TICKET_RECORDS); while sizes.len() < target_count { @@ -329,11 +331,11 @@ pub fn build_emulated_server_hello( let mut tickets = Vec::new(); for ticket_len in emulated_ticket_record_sizes(cached, new_session_tickets, rng) { let mut rec = Vec::with_capacity(5 + ticket_len); - rec.push(TLS_RECORD_APPLICATION); - rec.extend_from_slice(&TLS_VERSION); - rec.extend_from_slice(&(ticket_len as u16).to_be_bytes()); - rec.extend_from_slice(&rng.bytes(ticket_len)); - tickets.extend_from_slice(&rec); + rec.push(TLS_RECORD_APPLICATION); + rec.extend_from_slice(&TLS_VERSION); + rec.extend_from_slice(&(ticket_len as u16).to_be_bytes()); + rec.extend_from_slice(&rng.bytes(ticket_len)); + tickets.extend_from_slice(&rec); } let mut response = Vec::with_capacity( diff --git a/src/tls_front/fetcher.rs b/src/tls_front/fetcher.rs index 2c37c34..aad956e 100644 --- a/src/tls_front/fetcher.rs +++ b/src/tls_front/fetcher.rs @@ -794,7 +794,9 @@ async fn connect_tcp_with_upstream( )) } -fn socket_addrs_from_upstream_stream(stream: &UpstreamStream) -> (Option, Option) { +fn socket_addrs_from_upstream_stream( + stream: &UpstreamStream, +) -> (Option, Option) { match stream { UpstreamStream::Tcp(tcp) => (tcp.local_addr().ok(), tcp.peer_addr().ok()), UpstreamStream::Shadowsocks(_) => (None, None), @@ -820,12 +822,16 @@ fn build_tls_fetch_proxy_header( } _ => { let header = match (src_addr, dst_addr) { - (Some(SocketAddr::V4(src)), Some(SocketAddr::V4(dst))) => ProxyProtocolV1Builder::new() - .tcp4(src.into(), dst.into()) - .build(), - (Some(SocketAddr::V6(src)), Some(SocketAddr::V6(dst))) => ProxyProtocolV1Builder::new() - .tcp6(src.into(), dst.into()) - .build(), + (Some(SocketAddr::V4(src)), Some(SocketAddr::V4(dst))) => { + ProxyProtocolV1Builder::new() + .tcp4(src.into(), dst.into()) + .build() + } + (Some(SocketAddr::V6(src)), Some(SocketAddr::V6(dst))) => { + ProxyProtocolV1Builder::new() + .tcp6(src.into(), dst.into()) + .build() + } _ => ProxyProtocolV1Builder::new().build(), }; Some(header) @@ -1472,7 +1478,9 @@ mod tests { assert_eq!( &header[..12], - &[0x0d, 0x0a, 0x0d, 0x0a, 0x00, 0x0d, 0x0a, 0x51, 0x55, 0x49, 0x54, 0x0a] + &[ + 0x0d, 0x0a, 0x0d, 0x0a, 0x00, 0x0d, 0x0a, 0x51, 0x55, 0x49, 0x54, 0x0a + ] ); assert_eq!(header[12], 0x21); assert_eq!(header[13], 0x11); diff --git a/src/transport/middle_proxy/fairness/pressure.rs b/src/transport/middle_proxy/fairness/pressure.rs index 1f068bd..02a5942 100644 --- a/src/transport/middle_proxy/fairness/pressure.rs +++ b/src/transport/middle_proxy/fairness/pressure.rs @@ -136,8 +136,8 @@ impl PressureEvaluator { let queue_ratio_pct = if max_total_queued_bytes == 0 { 100 } else { - ((signals.total_queued_bytes.saturating_mul(100)) / max_total_queued_bytes) - .min(100) as u8 + ((signals.total_queued_bytes.saturating_mul(100)) / max_total_queued_bytes).min(100) + as u8 }; let standing_ratio_pct = if signals.active_flows == 0 { diff --git a/src/transport/middle_proxy/fairness/scheduler.rs b/src/transport/middle_proxy/fairness/scheduler.rs index b4cc5d8..e8e6af8 100644 --- a/src/transport/middle_proxy/fairness/scheduler.rs +++ b/src/transport/middle_proxy/fairness/scheduler.rs @@ -166,7 +166,8 @@ impl WorkerFairnessState { return AdmissionDecision::RejectSaturated; } - if self.total_queued_bytes.saturating_add(frame_bytes) > self.config.max_total_queued_bytes { + if self.total_queued_bytes.saturating_add(frame_bytes) > self.config.max_total_queued_bytes + { self.pressure .note_admission_reject(now, &self.config.pressure); self.enqueue_rejects = self.enqueue_rejects.saturating_add(1); @@ -211,7 +212,8 @@ impl WorkerFairnessState { .expect("flow inserted must be retrievable") }; - if entry.fairness.pending_bytes.saturating_add(frame_bytes) > self.config.max_flow_queued_bytes + if entry.fairness.pending_bytes.saturating_add(frame_bytes) + > self.config.max_flow_queued_bytes { self.pressure .note_admission_reject(now, &self.config.pressure); @@ -237,7 +239,8 @@ impl WorkerFairnessState { entry.queue.push_back(frame); self.total_queued_bytes = self.total_queued_bytes.saturating_add(frame_bytes); - self.bucket_queued_bytes[bucket_id] = self.bucket_queued_bytes[bucket_id].saturating_add(frame_bytes); + self.bucket_queued_bytes[bucket_id] = + self.bucket_queued_bytes[bucket_id].saturating_add(frame_bytes); if !entry.fairness.in_active_ring { entry.fairness.in_active_ring = true; @@ -277,23 +280,31 @@ impl WorkerFairnessState { Self::classify_flow(&self.config, pressure_state, now, &mut flow.fairness); - let quantum = Self::effective_quantum_bytes(&self.config, pressure_state, &flow.fairness); - flow.fairness.deficit_bytes = - flow.fairness.deficit_bytes.saturating_add(i64::from(quantum)); + let quantum = + Self::effective_quantum_bytes(&self.config, pressure_state, &flow.fairness); + flow.fairness.deficit_bytes = flow + .fairness + .deficit_bytes + .saturating_add(i64::from(quantum)); self.deficit_grants = self.deficit_grants.saturating_add(1); let front_len = flow.queue.front().map_or(0, |front| front.queued_bytes()); if flow.fairness.deficit_bytes < front_len as i64 { - flow.fairness.consecutive_skips = flow.fairness.consecutive_skips.saturating_add(1); + flow.fairness.consecutive_skips = + flow.fairness.consecutive_skips.saturating_add(1); self.deficit_skips = self.deficit_skips.saturating_add(1); requeue_active = true; } else if let Some(frame) = flow.queue.pop_front() { drained_bytes = frame.queued_bytes(); - flow.fairness.pending_bytes = flow.fairness.pending_bytes.saturating_sub(drained_bytes); - flow.fairness.deficit_bytes = - flow.fairness.deficit_bytes.saturating_sub(drained_bytes as i64); + flow.fairness.pending_bytes = + flow.fairness.pending_bytes.saturating_sub(drained_bytes); + flow.fairness.deficit_bytes = flow + .fairness + .deficit_bytes + .saturating_sub(drained_bytes as i64); flow.fairness.consecutive_skips = 0; - flow.fairness.queue_started_at = flow.queue.front().map(|front| front.enqueued_at); + flow.fairness.queue_started_at = + flow.queue.front().map(|front| front.enqueued_at); requeue_active = !flow.queue.is_empty(); if !requeue_active { flow.fairness.scheduler_state = FlowSchedulerState::Idle; @@ -359,7 +370,8 @@ impl WorkerFairnessState { return DispatchAction::Continue; }; - flow.fairness.consecutive_stalls = flow.fairness.consecutive_stalls.saturating_add(1); + flow.fairness.consecutive_stalls = + flow.fairness.consecutive_stalls.saturating_add(1); flow.fairness.scheduler_state = FlowSchedulerState::Backpressured; flow.fairness.pressure_class = FlowPressureClass::Backpressured; @@ -376,7 +388,8 @@ impl WorkerFairnessState { } else { let frame_bytes = candidate.frame.queued_bytes(); flow.queue.push_front(candidate.frame); - flow.fairness.pending_bytes = flow.fairness.pending_bytes.saturating_add(frame_bytes); + flow.fairness.pending_bytes = + flow.fairness.pending_bytes.saturating_add(frame_bytes); if flow.fairness.queue_started_at.is_none() { flow.fairness.queue_started_at = Some(now); } @@ -390,7 +403,8 @@ impl WorkerFairnessState { } } - if flow.fairness.consecutive_stalls >= self.config.max_consecutive_stalls_before_close + if flow.fairness.consecutive_stalls + >= self.config.max_consecutive_stalls_before_close && self.pressure.state() == PressureState::Saturated { self.remove_flow(conn_id); @@ -414,18 +428,16 @@ impl WorkerFairnessState { return; }; - self.bucket_active_flows[entry.fairness.bucket_id] = self.bucket_active_flows - [entry.fairness.bucket_id] - .saturating_sub(1); + self.bucket_active_flows[entry.fairness.bucket_id] = + self.bucket_active_flows[entry.fairness.bucket_id].saturating_sub(1); let mut reclaimed = 0u64; for frame in entry.queue { reclaimed = reclaimed.saturating_add(frame.queued_bytes()); } self.total_queued_bytes = self.total_queued_bytes.saturating_sub(reclaimed); - self.bucket_queued_bytes[entry.fairness.bucket_id] = self.bucket_queued_bytes - [entry.fairness.bucket_id] - .saturating_sub(reclaimed); + self.bucket_queued_bytes[entry.fairness.bucket_id] = + self.bucket_queued_bytes[entry.fairness.bucket_id].saturating_sub(reclaimed); } fn evaluate_pressure(&mut self, now: Instant, force: bool) { diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index 3ebe190..992fec3 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -3,6 +3,9 @@ mod codec; mod config_updater; mod fairness; +#[cfg(test)] +#[path = "tests/fairness_security_tests.rs"] +mod fairness_security_tests; mod handshake; mod health; #[cfg(test)] @@ -31,9 +34,6 @@ mod pool_writer; #[cfg(test)] #[path = "tests/pool_writer_security_tests.rs"] mod pool_writer_security_tests; -#[cfg(test)] -#[path = "tests/fairness_security_tests.rs"] -mod fairness_security_tests; mod reader; mod registry; mod rotation; diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index 3000628..8041185 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -118,20 +118,22 @@ fn apply_fairness_metrics_delta( stats.set_me_fair_backpressured_flows_gauge(current.backpressured_flows as u64); stats.set_me_fair_pressure_state_gauge(current.pressure_state.as_u8() as u64); stats.add_me_fair_scheduler_rounds_total( - current.scheduler_rounds.saturating_sub(prev.scheduler_rounds), + current + .scheduler_rounds + .saturating_sub(prev.scheduler_rounds), ); stats.add_me_fair_deficit_grants_total( current.deficit_grants.saturating_sub(prev.deficit_grants), ); - stats.add_me_fair_deficit_skips_total( - current.deficit_skips.saturating_sub(prev.deficit_skips), - ); + stats.add_me_fair_deficit_skips_total(current.deficit_skips.saturating_sub(prev.deficit_skips)); stats.add_me_fair_enqueue_rejects_total( current.enqueue_rejects.saturating_sub(prev.enqueue_rejects), ); stats.add_me_fair_shed_drops_total(current.shed_drops.saturating_sub(prev.shed_drops)); stats.add_me_fair_penalties_total( - current.fairness_penalties.saturating_sub(prev.fairness_penalties), + current + .fairness_penalties + .saturating_sub(prev.fairness_penalties), ); stats.add_me_fair_downstream_stalls_total( current diff --git a/src/transport/middle_proxy/tests/fairness_security_tests.rs b/src/transport/middle_proxy/tests/fairness_security_tests.rs index 69932a3..41a8d86 100644 --- a/src/transport/middle_proxy/tests/fairness_security_tests.rs +++ b/src/transport/middle_proxy/tests/fairness_security_tests.rs @@ -175,7 +175,8 @@ fn fairness_randomized_sequence_preserves_memory_bounds() { } else { DispatchFeedback::QueueFull }; - let _ = fairness.apply_dispatch_feedback(candidate.frame.conn_id, candidate, feedback, now); + let _ = + fairness.apply_dispatch_feedback(candidate.frame.conn_id, candidate, feedback, now); } let snapshot = fairness.snapshot();