From 57b2aa045354b53bc8b65dce20b96a3ca8336251 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 10 May 2026 14:14:52 +0300 Subject: [PATCH] Rustfmt --- Cargo.lock | 6 +-- Cargo.toml | 2 +- src/config/load.rs | 5 +- src/maestro/listeners.rs | 12 ++--- src/maestro/runtime_tasks.rs | 4 +- src/maestro/shutdown.rs | 11 +++- src/metrics.rs | 53 +++++++------------ src/proxy/client.rs | 7 +-- src/proxy/handshake.rs | 5 +- src/proxy/middle_relay.rs | 43 +++++++-------- src/proxy/relay.rs | 11 +--- .../tests/direct_relay_security_tests.rs | 15 ++---- .../relay_atomic_quota_invariant_tests.rs | 8 +-- src/stats/mod.rs | 12 ++--- src/tls_front/cache.rs | 7 +-- 15 files changed, 80 insertions(+), 121 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 60ea46a..b14f3da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2404,9 +2404,9 @@ checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" [[package]] name = "rustls-webpki" -version = "0.103.12" +version = "0.103.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06" +checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e" dependencies = [ "aws-lc-rs", "ring", @@ -2791,7 +2791,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" [[package]] name = "telemt" -version = "3.4.10" +version = "3.4.11" dependencies = [ "aes", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 2f7456d..44c40ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.4.10" +version = "3.4.11" edition = "2024" [features] diff --git a/src/config/load.rs b/src/config/load.rs index 9f51179..5a9b38c 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -621,10 +621,7 @@ fn collect_unknown_config_keys(parsed_toml: &toml::Value) -> Vec { diff --git a/src/maestro/runtime_tasks.rs b/src/maestro/runtime_tasks.rs index b0c3e4e..7125b04 100644 --- a/src/maestro/runtime_tasks.rs +++ b/src/maestro/runtime_tasks.rs @@ -75,7 +75,9 @@ pub(crate) async fn spawn_runtime_tasks( let stats_maintenance = stats.clone(); tokio::spawn(async move { - stats_maintenance.run_periodic_user_stats_maintenance().await; + stats_maintenance + .run_periodic_user_stats_maintenance() + .await; }); let ip_tracker_maintenance = ip_tracker.clone(); diff --git a/src/maestro/shutdown.rs b/src/maestro/shutdown.rs index 7b4a4ce..790b529 100644 --- a/src/maestro/shutdown.rs +++ b/src/maestro/shutdown.rs @@ -8,8 +8,8 @@ //! //! SIGHUP is handled separately in config/hot_reload.rs for config reload. -use std::sync::Arc; use std::path::PathBuf; +use std::sync::Arc; use std::time::{Duration, Instant}; #[cfg(not(unix))] @@ -52,7 +52,14 @@ pub(crate) async fn wait_for_shutdown( quota_state_path: PathBuf, ) { let signal = wait_for_shutdown_signal().await; - perform_shutdown(signal, process_started_at, me_pool, &stats, quota_state_path).await; + perform_shutdown( + signal, + process_started_at, + me_pool, + &stats, + quota_state_path, + ) + .await; } /// Waits for any shutdown signal (SIGINT, SIGTERM, SIGQUIT). diff --git a/src/metrics.rs b/src/metrics.rs index 620c4cd..e32cf06 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -374,10 +374,7 @@ async fn render_tls_front_profile_health( out, "# HELP telemt_tls_front_profile_age_seconds Age of cached TLS front profile data per configured domain" ); - let _ = writeln!( - out, - "# TYPE telemt_tls_front_profile_age_seconds gauge" - ); + let _ = writeln!(out, "# TYPE telemt_tls_front_profile_age_seconds gauge"); let _ = writeln!( out, "# HELP telemt_tls_front_profile_app_data_records TLS front cached app-data record count per configured domain" @@ -390,10 +387,7 @@ async fn render_tls_front_profile_health( out, "# HELP telemt_tls_front_profile_ticket_records TLS front cached ticket-like tail record count per configured domain" ); - let _ = writeln!( - out, - "# TYPE telemt_tls_front_profile_ticket_records gauge" - ); + let _ = writeln!(out, "# TYPE telemt_tls_front_profile_ticket_records gauge"); let _ = writeln!( out, "# HELP telemt_tls_front_profile_change_cipher_spec_records TLS front cached ChangeCipherSpec record count per configured domain" @@ -406,21 +400,14 @@ async fn render_tls_front_profile_health( out, "# HELP telemt_tls_front_profile_app_data_bytes TLS front cached total app-data bytes per configured domain" ); - let _ = writeln!( - out, - "# TYPE telemt_tls_front_profile_app_data_bytes gauge" - ); + let _ = writeln!(out, "# TYPE telemt_tls_front_profile_app_data_bytes gauge"); for item in health { let domain = prometheus_label_value(&item.domain); let _ = writeln!( out, "telemt_tls_front_profile_info{{domain=\"{}\",source=\"{}\",is_default=\"{}\",has_cert_info=\"{}\",has_cert_payload=\"{}\"}} 1", - domain, - item.source, - item.is_default, - item.has_cert_info, - item.has_cert_payload + domain, item.source, item.is_default, item.has_cert_info, item.has_cert_payload ); let _ = writeln!( out, @@ -771,10 +758,7 @@ async fn render_metrics( out, "# HELP telemt_quota_contention_timeout_total Quota reservations that hit the bounded contention budget" ); - let _ = writeln!( - out, - "# TYPE telemt_quota_contention_timeout_total counter" - ); + let _ = writeln!(out, "# TYPE telemt_quota_contention_timeout_total counter"); let _ = writeln!( out, "telemt_quota_contention_timeout_total {}", @@ -917,10 +901,7 @@ async fn render_metrics( out, "# HELP telemt_rate_limiter_burst_bound_bytes Configured upper bound for one direct relay rate-limit burst" ); - let _ = writeln!( - out, - "# TYPE telemt_rate_limiter_burst_bound_bytes gauge" - ); + let _ = writeln!(out, "# TYPE telemt_rate_limiter_burst_bound_bytes gauge"); let _ = writeln!( out, "telemt_rate_limiter_burst_bound_bytes{{direction=\"up\"}} {}", @@ -3793,8 +3774,9 @@ mod tests { ))); assert!(output.contains("telemt_connections_total 2")); assert!(output.contains("telemt_connections_bad_total 1")); - assert!(output - .contains("telemt_connections_bad_by_class_total{class=\"tls_handshake_bad_client\"} 1")); + assert!(output.contains( + "telemt_connections_bad_by_class_total{class=\"tls_handshake_bad_client\"} 1" + )); assert!(output.contains("telemt_handshake_timeouts_total 1")); assert!(output.contains("telemt_handshake_failures_by_class_total{class=\"timeout\"} 1")); assert!(output.contains("telemt_auth_expensive_checks_total 9")); @@ -3910,16 +3892,21 @@ mod tests { output.contains("telemt_tls_front_profile_info{domain=\"fallback.example\",source=\"default\",is_default=\"true\",has_cert_info=\"false\",has_cert_payload=\"false\"} 1") ); assert!( - output.contains("telemt_tls_front_profile_app_data_records{domain=\"primary.example\"} 2") + output.contains( + "telemt_tls_front_profile_app_data_records{domain=\"primary.example\"} 2" + ) ); assert!( - output.contains("telemt_tls_front_profile_ticket_records{domain=\"primary.example\"} 1") + output + .contains("telemt_tls_front_profile_ticket_records{domain=\"primary.example\"} 1") ); + assert!(output.contains( + "telemt_tls_front_profile_change_cipher_spec_records{domain=\"primary.example\"} 1" + )); assert!( - output.contains("telemt_tls_front_profile_change_cipher_spec_records{domain=\"primary.example\"} 1") - ); - assert!( - output.contains("telemt_tls_front_profile_app_data_bytes{domain=\"primary.example\"} 1536") + output.contains( + "telemt_tls_front_profile_app_data_bytes{domain=\"primary.example\"} 1536" + ) ); } diff --git a/src/proxy/client.rs b/src/proxy/client.rs index c022a8c..136efc8 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -1002,12 +1002,7 @@ impl RunningClientHandler { trusted = ?self.config.server.proxy_protocol_trusted_cidrs, "Rejecting PROXY protocol header from untrusted source" ); - record_beobachten_class( - &self.beobachten, - &self.config, - self.peer.ip(), - "other", - ); + record_beobachten_class(&self.beobachten, &self.config, self.peer.ip(), "other"); return Err(ProxyError::InvalidProxyProtocol); } diff --git a/src/proxy/handshake.rs b/src/proxy/handshake.rs index b49b618..15b04de 100644 --- a/src/proxy/handshake.rs +++ b/src/proxy/handshake.rs @@ -1901,7 +1901,10 @@ where .auth_expensive_checks_total .fetch_add(validation_checks as u64, Ordering::Relaxed); - if config.access.is_user_source_ip_denied(user.as_str(), peer.ip()) { + if config + .access + .is_user_source_ip_denied(user.as_str(), peer.ip()) + { auth_probe_record_failure_in(shared, peer.ip(), Instant::now()); maybe_apply_server_hello_delay(config).await; warn!( diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 8fadf9b..c8d6ce0 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -1883,9 +1883,7 @@ where }; // When client closes, but ME channel stopped as unregistered - it isnt error - if client_closed - && matches!(writer_result, Err(ProxyError::MiddleConnectionLost)) - { + if client_closed && matches!(writer_result, Err(ProxyError::MiddleConnectionLost)) { writer_result = Ok(()); } @@ -2520,27 +2518,26 @@ where ) .await?; - let write_mode = - match write_client_payload( - client_writer, - proto_tag, - flags, - &data, - rng, - frame_buf, - cancel, - ) - .await - { - Ok(mode) => mode, - Err(err) => { - if quota_limit.is_some() { - stats.add_quota_write_fail_bytes_total(data_len); - stats.increment_quota_write_fail_events_total(); - } - return Err(err); + let write_mode = match write_client_payload( + client_writer, + proto_tag, + flags, + &data, + rng, + frame_buf, + cancel, + ) + .await + { + Ok(mode) => mode, + Err(err) => { + if quota_limit.is_some() { + stats.add_quota_write_fail_bytes_total(data_len); + stats.increment_quota_write_fail_events_total(); } - }; + return Err(err); + } + }; bytes_me2c.fetch_add(data_len, Ordering::Relaxed); if let Some(user_stats) = quota_user_stats { diff --git a/src/proxy/relay.rs b/src/proxy/relay.rs index 1ddef74..965dd66 100644 --- a/src/proxy/relay.rs +++ b/src/proxy/relay.rs @@ -518,10 +518,7 @@ impl AsyncRead for StatsIo { Poll::Ready(Ok(n)) => { if reserved_read_bytes > n as u64 { let refund_bytes = reserved_read_bytes - n as u64; - refund_reserved_quota_bytes( - this.user_stats.as_ref(), - refund_bytes, - ); + refund_reserved_quota_bytes(this.user_stats.as_ref(), refund_bytes); this.stats.add_quota_refund_bytes_total(refund_bytes); } if n > 0 { @@ -538,7 +535,6 @@ impl AsyncRead for StatsIo { this.quota_bytes_since_check = 0; } } - } if let Some(limit) = this.quota_limit && this.user_stats.quota_used() >= limit @@ -700,10 +696,7 @@ impl AsyncWrite for StatsIo { Poll::Ready(Ok(n)) => { if reserved_bytes > n as u64 { let refund_bytes = reserved_bytes - n as u64; - refund_reserved_quota_bytes( - this.user_stats.as_ref(), - refund_bytes, - ); + refund_reserved_quota_bytes(this.user_stats.as_ref(), refund_bytes); this.stats.add_quota_refund_bytes_total(refund_bytes); } if shaper_reserved_bytes > n as u64 diff --git a/src/proxy/tests/direct_relay_security_tests.rs b/src/proxy/tests/direct_relay_security_tests.rs index 73b6bff..67d1eee 100644 --- a/src/proxy/tests/direct_relay_security_tests.rs +++ b/src/proxy/tests/direct_relay_security_tests.rs @@ -1519,10 +1519,7 @@ async fn direct_relay_cutover_midflight_releases_route_gauge() { "cutover should terminate direct relay session" ); assert!( - matches!( - relay_result, - Err(ProxyError::RouteSwitched) - ), + matches!(relay_result, Err(ProxyError::RouteSwitched)), "client-visible cutover error must stay generic and avoid route-internal metadata" ); @@ -1659,10 +1656,7 @@ async fn direct_relay_cutover_storm_multi_session_keeps_generic_errors_and_relea .expect("direct relay task must not panic"); assert!( - matches!( - relay_result, - Err(ProxyError::RouteSwitched) - ), + matches!(relay_result, Err(ProxyError::RouteSwitched)), "storm-cutover termination must remain generic for all direct sessions" ); } @@ -1965,10 +1959,7 @@ async fn adversarial_direct_relay_cutover_integrity() { .expect("Session must not panic"); assert!( - matches!( - result, - Err(ProxyError::RouteSwitched) - ), + matches!(result, Err(ProxyError::RouteSwitched)), "Session must terminate with route switch error on cutover" ); } diff --git a/src/proxy/tests/relay_atomic_quota_invariant_tests.rs b/src/proxy/tests/relay_atomic_quota_invariant_tests.rs index 237d244..4909ba5 100644 --- a/src/proxy/tests/relay_atomic_quota_invariant_tests.rs +++ b/src/proxy/tests/relay_atomic_quota_invariant_tests.rs @@ -187,12 +187,8 @@ fn poll_read_once( #[test] fn direct_c2s_quota_refunds_unused_on_short_read() { let user = "direct-c2s-short-read-refund-user"; - let (mut io, stats, read_calls, quota_exceeded) = make_stats_io_with_read_script( - user, - 64, - 0, - vec![ReadStep::Data(vec![0x11; 5])], - ); + let (mut io, stats, read_calls, quota_exceeded) = + make_stats_io_with_read_script(user, 64, 0, vec![ReadStep::Data(vec![0x11; 5])]); let mut storage = [0u8; 16]; let n = match poll_read_once(&mut io, &mut storage) { diff --git a/src/stats/mod.rs b/src/stats/mod.rs index a0a6279..e13cf63 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -8,8 +8,8 @@ pub mod telemetry; use dashmap::DashMap; use lru::LruCache; use parking_lot::Mutex; -use std::collections::{HashMap, VecDeque}; use std::collections::hash_map::DefaultHasher; +use std::collections::{HashMap, VecDeque}; use std::hash::{Hash, Hasher}; use std::num::NonZeroUsize; use std::sync::Arc; @@ -2378,8 +2378,7 @@ impl Stats { self.quota_contention_total.load(Ordering::Relaxed) } pub fn get_quota_contention_timeout_total(&self) -> u64 { - self.quota_contention_timeout_total - .load(Ordering::Relaxed) + self.quota_contention_timeout_total.load(Ordering::Relaxed) } pub fn get_quota_acquire_cancelled_total(&self) -> u64 { self.quota_acquire_cancelled_total.load(Ordering::Relaxed) @@ -2536,12 +2535,7 @@ impl Stats { .unwrap_or(0) } - pub fn load_user_quota_state( - &self, - user: &str, - used_bytes: u64, - last_reset_epoch_secs: u64, - ) { + pub fn load_user_quota_state(&self, user: &str, used_bytes: u64, last_reset_epoch_secs: u64) { let stats = self.get_or_create_user_stats_handle(user); stats.quota_used.store(used_bytes, Ordering::Relaxed); stats diff --git a/src/tls_front/cache.rs b/src/tls_front/cache.rs index dc3d34e..bce33c4 100644 --- a/src/tls_front/cache.rs +++ b/src/tls_front/cache.rs @@ -150,9 +150,10 @@ impl TlsFrontCache { is_default: cached.domain == "default", has_cert_info: cached.cert_info.is_some(), has_cert_payload: cached.cert_payload.is_some(), - app_data_records: cached.app_data_records_sizes.len().max( - behavior.app_data_record_sizes.len(), - ), + app_data_records: cached + .app_data_records_sizes + .len() + .max(behavior.app_data_record_sizes.len()), ticket_records: behavior.ticket_record_sizes.len(), change_cipher_spec_count: behavior.change_cipher_spec_count, total_app_data_len: cached.total_app_data_len,