diff --git a/src/config/defaults.rs b/src/config/defaults.rs index d92ae78..41573a4 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -12,8 +12,8 @@ const DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS: u8 = 2; const DEFAULT_ME_ADAPTIVE_FLOOR_IDLE_SECS: u64 = 90; const DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT: u8 = 1; const DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS: u64 = 180; -const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 3; -const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 4; +const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2; +const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5; const DEFAULT_LISTEN_ADDR_IPV6: &str = "::"; const DEFAULT_ACCESS_USER: &str = "default"; const DEFAULT_ACCESS_SECRET: &str = "00000000000000000000000000000000"; @@ -205,13 +205,21 @@ pub(crate) fn default_upstream_connect_retry_attempts() -> u32 { } pub(crate) fn default_upstream_connect_retry_backoff_ms() -> u64 { - 250 + 100 } pub(crate) fn default_upstream_unhealthy_fail_threshold() -> u32 { DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD } +pub(crate) fn default_upstream_connect_failfast_hard_errors() -> bool { + false +} + +pub(crate) fn default_rpc_proxy_req_every() -> u64 { + 0 +} + pub(crate) fn default_crypto_pending_buffer() -> usize { 256 * 1024 } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index caec078..902811c 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -135,6 +135,9 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig) { != new.general.upstream_connect_retry_backoff_ms || old.general.upstream_unhealthy_fail_threshold != new.general.upstream_unhealthy_fail_threshold + || old.general.upstream_connect_failfast_hard_errors + != new.general.upstream_connect_failfast_hard_errors + || old.general.rpc_proxy_req_every != new.general.rpc_proxy_req_every { warn!("config reload: general.upstream_* changed; restart required"); } diff --git a/src/config/load.rs b/src/config/load.rs index e549b55..c051b8e 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -249,6 +249,14 @@ impl ProxyConfig { )); } + if config.general.rpc_proxy_req_every != 0 + && !(10..=300).contains(&config.general.rpc_proxy_req_every) + { + return Err(ProxyError::Config( + "general.rpc_proxy_req_every must be 0 or within [10, 300]".to_string(), + )); + } + if config.general.me_reinit_every_secs == 0 { return Err(ProxyError::Config( "general.me_reinit_every_secs must be > 0".to_string(), @@ -676,6 +684,14 @@ mod tests { cfg.general.upstream_unhealthy_fail_threshold, default_upstream_unhealthy_fail_threshold() ); + assert_eq!( + cfg.general.upstream_connect_failfast_hard_errors, + default_upstream_connect_failfast_hard_errors() + ); + assert_eq!( + cfg.general.rpc_proxy_req_every, + default_rpc_proxy_req_every() + ); assert_eq!(cfg.general.update_every, default_update_every()); assert_eq!(cfg.server.listen_addr_ipv4, default_listen_addr_ipv4()); assert_eq!(cfg.server.listen_addr_ipv6, default_listen_addr_ipv6_opt()); @@ -751,6 +767,11 @@ mod tests { general.upstream_unhealthy_fail_threshold, default_upstream_unhealthy_fail_threshold() ); + assert_eq!( + general.upstream_connect_failfast_hard_errors, + default_upstream_connect_failfast_hard_errors() + ); + assert_eq!(general.rpc_proxy_req_every, default_rpc_proxy_req_every()); assert_eq!(general.update_every, default_update_every()); let server = ServerConfig::default(); @@ -1050,6 +1071,62 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn rpc_proxy_req_every_out_of_range_is_rejected() { + let toml = r#" + [general] + rpc_proxy_req_every = 9 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_rpc_proxy_req_every_out_of_range_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.rpc_proxy_req_every must be 0 or within [10, 300]")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn rpc_proxy_req_every_zero_and_valid_range_are_accepted() { + let toml_zero = r#" + [general] + rpc_proxy_req_every = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path_zero = dir.join("telemt_rpc_proxy_req_every_zero_ok_test.toml"); + std::fs::write(&path_zero, toml_zero).unwrap(); + let cfg_zero = ProxyConfig::load(&path_zero).unwrap(); + assert_eq!(cfg_zero.general.rpc_proxy_req_every, 0); + let _ = std::fs::remove_file(path_zero); + + let toml_valid = r#" + [general] + rpc_proxy_req_every = 40 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let path_valid = dir.join("telemt_rpc_proxy_req_every_valid_ok_test.toml"); + std::fs::write(&path_valid, toml_valid).unwrap(); + let cfg_valid = ProxyConfig::load(&path_valid).unwrap(); + assert_eq!(cfg_valid.general.rpc_proxy_req_every, 40); + let _ = std::fs::remove_file(path_valid); + } + #[test] fn me_hardswap_warmup_defaults_are_set() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index 0d255ba..64be729 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -162,8 +162,8 @@ impl MeBindStaleMode { #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(rename_all = "lowercase")] pub enum MeFloorMode { - #[default] Static, + #[default] Adaptive, } @@ -356,6 +356,11 @@ pub struct GeneralConfig { #[serde(default = "default_true")] pub me_keepalive_payload_random: bool, + /// Interval in seconds for service RPC_PROXY_REQ activity signals to ME. + /// 0 disables service activity signals. + #[serde(default = "default_rpc_proxy_req_every")] + pub rpc_proxy_req_every: u64, + /// Max pending ciphertext buffer per client writer (bytes). /// Controls FakeTLS backpressure vs throughput. #[serde(default = "default_crypto_pending_buffer")] @@ -472,6 +477,10 @@ pub struct GeneralConfig { #[serde(default = "default_upstream_unhealthy_fail_threshold")] pub upstream_unhealthy_fail_threshold: u32, + /// Skip additional retries for hard non-transient upstream connect errors. + #[serde(default = "default_upstream_connect_failfast_hard_errors")] + pub upstream_connect_failfast_hard_errors: bool, + /// Ignore STUN/interface IP mismatch (keep using Middle Proxy even if NAT detected). #[serde(default)] pub stun_iface_mismatch_ignore: bool, @@ -662,6 +671,7 @@ impl Default for GeneralConfig { me_keepalive_interval_secs: default_keepalive_interval(), me_keepalive_jitter_secs: default_keepalive_jitter(), me_keepalive_payload_random: default_true(), + rpc_proxy_req_every: default_rpc_proxy_req_every(), me_warmup_stagger_enabled: default_true(), me_warmup_step_delay_ms: default_warmup_step_delay_ms(), me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(), @@ -682,6 +692,7 @@ impl Default for GeneralConfig { upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(), upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(), upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(), + upstream_connect_failfast_hard_errors: default_upstream_connect_failfast_hard_errors(), stun_iface_mismatch_ignore: false, unknown_dc_log_path: default_unknown_dc_log_path(), log_level: LogLevel::Normal, diff --git a/src/main.rs b/src/main.rs index b890233..f7f9239 100644 --- a/src/main.rs +++ b/src/main.rs @@ -261,11 +261,16 @@ async fn main() -> std::result::Result<(), Box> { warn!("Using default tls_domain. Consider setting a custom domain."); } + let stats = Arc::new(Stats::new()); + stats.apply_telemetry_policy(TelemetryPolicy::from_config(&config.general.telemetry)); + let upstream_manager = Arc::new(UpstreamManager::new( config.upstreams.clone(), config.general.upstream_connect_retry_attempts, config.general.upstream_connect_retry_backoff_ms, config.general.upstream_unhealthy_fail_threshold, + config.general.upstream_connect_failfast_hard_errors, + stats.clone(), )); let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len()); @@ -411,8 +416,6 @@ async fn main() -> std::result::Result<(), Box> { let prefer_ipv6 = decision.prefer_ipv6(); let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me); - let stats = Arc::new(Stats::new()); - stats.apply_telemetry_policy(TelemetryPolicy::from_config(&config.general.telemetry)); let beobachten = Arc::new(BeobachtenStore::new()); let rng = Arc::new(SecureRandom::new()); @@ -531,6 +534,7 @@ async fn main() -> std::result::Result<(), Box> { config.general.me_keepalive_interval_secs, config.general.me_keepalive_jitter_secs, config.general.me_keepalive_payload_random, + config.general.rpc_proxy_req_every, config.general.me_warmup_stagger_enabled, config.general.me_warmup_step_delay_ms, config.general.me_warmup_step_jitter_ms, diff --git a/src/metrics.rs b/src/metrics.rs index f8a6716..eae69d1 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -202,6 +202,195 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_upstream_connect_attempt_total Upstream connect attempts across all requests" + ); + let _ = writeln!(out, "# TYPE telemt_upstream_connect_attempt_total counter"); + let _ = writeln!( + out, + "telemt_upstream_connect_attempt_total {}", + if core_enabled { + stats.get_upstream_connect_attempt_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_upstream_connect_success_total Successful upstream connect request cycles" + ); + let _ = writeln!(out, "# TYPE telemt_upstream_connect_success_total counter"); + let _ = writeln!( + out, + "telemt_upstream_connect_success_total {}", + if core_enabled { + stats.get_upstream_connect_success_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_upstream_connect_fail_total Failed upstream connect request cycles" + ); + let _ = writeln!(out, "# TYPE telemt_upstream_connect_fail_total counter"); + let _ = writeln!( + out, + "telemt_upstream_connect_fail_total {}", + if core_enabled { + stats.get_upstream_connect_fail_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_upstream_connect_failfast_hard_error_total Hard errors that triggered upstream connect failfast" + ); + let _ = writeln!( + out, + "# TYPE telemt_upstream_connect_failfast_hard_error_total counter" + ); + let _ = writeln!( + out, + "telemt_upstream_connect_failfast_hard_error_total {}", + if core_enabled { + stats.get_upstream_connect_failfast_hard_error_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_upstream_connect_attempts_per_request Histogram-like buckets for attempts per upstream connect request cycle" + ); + let _ = writeln!(out, "# TYPE telemt_upstream_connect_attempts_per_request counter"); + let _ = writeln!( + out, + "telemt_upstream_connect_attempts_per_request{{bucket=\"1\"}} {}", + if core_enabled { + stats.get_upstream_connect_attempts_bucket_1() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_upstream_connect_attempts_per_request{{bucket=\"2\"}} {}", + if core_enabled { + stats.get_upstream_connect_attempts_bucket_2() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_upstream_connect_attempts_per_request{{bucket=\"3_4\"}} {}", + if core_enabled { + stats.get_upstream_connect_attempts_bucket_3_4() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_upstream_connect_attempts_per_request{{bucket=\"gt_4\"}} {}", + if core_enabled { + stats.get_upstream_connect_attempts_bucket_gt_4() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_upstream_connect_duration_success_total Histogram-like buckets of successful upstream connect cycle duration" + ); + let _ = writeln!(out, "# TYPE telemt_upstream_connect_duration_success_total counter"); + let _ = writeln!( + out, + "telemt_upstream_connect_duration_success_total{{bucket=\"le_100ms\"}} {}", + if core_enabled { + stats.get_upstream_connect_duration_success_bucket_le_100ms() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_upstream_connect_duration_success_total{{bucket=\"101_500ms\"}} {}", + if core_enabled { + stats.get_upstream_connect_duration_success_bucket_101_500ms() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_upstream_connect_duration_success_total{{bucket=\"501_1000ms\"}} {}", + if core_enabled { + stats.get_upstream_connect_duration_success_bucket_501_1000ms() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_upstream_connect_duration_success_total{{bucket=\"gt_1000ms\"}} {}", + if core_enabled { + stats.get_upstream_connect_duration_success_bucket_gt_1000ms() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_upstream_connect_duration_fail_total Histogram-like buckets of failed upstream connect cycle duration" + ); + let _ = writeln!(out, "# TYPE telemt_upstream_connect_duration_fail_total counter"); + let _ = writeln!( + out, + "telemt_upstream_connect_duration_fail_total{{bucket=\"le_100ms\"}} {}", + if core_enabled { + stats.get_upstream_connect_duration_fail_bucket_le_100ms() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_upstream_connect_duration_fail_total{{bucket=\"101_500ms\"}} {}", + if core_enabled { + stats.get_upstream_connect_duration_fail_bucket_101_500ms() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_upstream_connect_duration_fail_total{{bucket=\"501_1000ms\"}} {}", + if core_enabled { + stats.get_upstream_connect_duration_fail_bucket_501_1000ms() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_upstream_connect_duration_fail_total{{bucket=\"gt_1000ms\"}} {}", + if core_enabled { + stats.get_upstream_connect_duration_fail_bucket_gt_1000ms() + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_me_keepalive_sent_total ME keepalive frames sent"); let _ = writeln!(out, "# TYPE telemt_me_keepalive_sent_total counter"); let _ = writeln!( @@ -250,6 +439,93 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_rpc_proxy_req_signal_sent_total Service RPC_PROXY_REQ activity signals sent" + ); + let _ = writeln!(out, "# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"); + let _ = writeln!( + out, + "telemt_me_rpc_proxy_req_signal_sent_total {}", + if me_allows_normal { + stats.get_me_rpc_proxy_req_signal_sent_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_rpc_proxy_req_signal_failed_total Service RPC_PROXY_REQ activity signal failures" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_rpc_proxy_req_signal_failed_total counter" + ); + let _ = writeln!( + out, + "telemt_me_rpc_proxy_req_signal_failed_total {}", + if me_allows_normal { + stats.get_me_rpc_proxy_req_signal_failed_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_rpc_proxy_req_signal_skipped_no_meta_total Service RPC_PROXY_REQ skipped due to missing writer metadata" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_rpc_proxy_req_signal_skipped_no_meta_total counter" + ); + let _ = writeln!( + out, + "telemt_me_rpc_proxy_req_signal_skipped_no_meta_total {}", + if me_allows_normal { + stats.get_me_rpc_proxy_req_signal_skipped_no_meta_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_rpc_proxy_req_signal_response_total Service RPC_PROXY_REQ responses observed" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_rpc_proxy_req_signal_response_total counter" + ); + let _ = writeln!( + out, + "telemt_me_rpc_proxy_req_signal_response_total {}", + if me_allows_normal { + stats.get_me_rpc_proxy_req_signal_response_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_rpc_proxy_req_signal_close_sent_total Service RPC_CLOSE_EXT sent after activity signals" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_rpc_proxy_req_signal_close_sent_total counter" + ); + let _ = writeln!( + out, + "telemt_me_rpc_proxy_req_signal_close_sent_total {}", + if me_allows_normal { + stats.get_me_rpc_proxy_req_signal_close_sent_total() + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_me_reconnect_attempts_total ME reconnect attempts"); let _ = writeln!(out, "# TYPE telemt_me_reconnect_attempts_total counter"); let _ = writeln!( @@ -311,6 +587,21 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_idle_close_by_peer_total ME idle writers closed by peer" + ); + let _ = writeln!(out, "# TYPE telemt_me_idle_close_by_peer_total counter"); + let _ = writeln!( + out, + "telemt_me_idle_close_by_peer_total {}", + if me_allows_normal { + stats.get_me_idle_close_by_peer_total() + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_me_crc_mismatch_total ME CRC mismatches"); let _ = writeln!(out, "# TYPE telemt_me_crc_mismatch_total counter"); let _ = writeln!( @@ -1028,6 +1319,20 @@ mod tests { stats.increment_connects_all(); stats.increment_connects_bad(); stats.increment_handshake_timeouts(); + stats.increment_upstream_connect_attempt_total(); + stats.increment_upstream_connect_attempt_total(); + stats.increment_upstream_connect_success_total(); + stats.increment_upstream_connect_fail_total(); + stats.increment_upstream_connect_failfast_hard_error_total(); + stats.observe_upstream_connect_attempts_per_request(2); + stats.observe_upstream_connect_duration_ms(220, true); + stats.observe_upstream_connect_duration_ms(1500, false); + stats.increment_me_rpc_proxy_req_signal_sent_total(); + stats.increment_me_rpc_proxy_req_signal_failed_total(); + stats.increment_me_rpc_proxy_req_signal_skipped_no_meta_total(); + stats.increment_me_rpc_proxy_req_signal_response_total(); + stats.increment_me_rpc_proxy_req_signal_close_sent_total(); + stats.increment_me_idle_close_by_peer_total(); stats.increment_user_connects("alice"); stats.increment_user_curr_connects("alice"); stats.add_user_octets_from("alice", 1024); @@ -1045,6 +1350,27 @@ mod tests { assert!(output.contains("telemt_connections_total 2")); assert!(output.contains("telemt_connections_bad_total 1")); assert!(output.contains("telemt_handshake_timeouts_total 1")); + assert!(output.contains("telemt_upstream_connect_attempt_total 2")); + assert!(output.contains("telemt_upstream_connect_success_total 1")); + assert!(output.contains("telemt_upstream_connect_fail_total 1")); + assert!(output.contains("telemt_upstream_connect_failfast_hard_error_total 1")); + assert!( + output.contains("telemt_upstream_connect_attempts_per_request{bucket=\"2\"} 1") + ); + assert!( + output.contains( + "telemt_upstream_connect_duration_success_total{bucket=\"101_500ms\"} 1" + ) + ); + assert!( + output.contains("telemt_upstream_connect_duration_fail_total{bucket=\"gt_1000ms\"} 1") + ); + assert!(output.contains("telemt_me_rpc_proxy_req_signal_sent_total 1")); + assert!(output.contains("telemt_me_rpc_proxy_req_signal_failed_total 1")); + assert!(output.contains("telemt_me_rpc_proxy_req_signal_skipped_no_meta_total 1")); + assert!(output.contains("telemt_me_rpc_proxy_req_signal_response_total 1")); + assert!(output.contains("telemt_me_rpc_proxy_req_signal_close_sent_total 1")); + assert!(output.contains("telemt_me_idle_close_by_peer_total 1")); assert!(output.contains("telemt_user_connections_total{user=\"alice\"} 1")); assert!(output.contains("telemt_user_connections_current{user=\"alice\"} 1")); assert!(output.contains("telemt_user_octets_from_client{user=\"alice\"} 1024")); @@ -1078,6 +1404,9 @@ mod tests { assert!(output.contains("# TYPE telemt_connections_total counter")); assert!(output.contains("# TYPE telemt_connections_bad_total counter")); assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter")); + assert!(output.contains("# TYPE telemt_upstream_connect_attempt_total counter")); + assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter")); + assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); assert!(output.contains( "# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge" diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 8152599..29d7f45 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -26,14 +26,36 @@ pub struct Stats { connects_all: AtomicU64, connects_bad: AtomicU64, handshake_timeouts: AtomicU64, + upstream_connect_attempt_total: AtomicU64, + upstream_connect_success_total: AtomicU64, + upstream_connect_fail_total: AtomicU64, + upstream_connect_failfast_hard_error_total: AtomicU64, + upstream_connect_attempts_bucket_1: AtomicU64, + upstream_connect_attempts_bucket_2: AtomicU64, + upstream_connect_attempts_bucket_3_4: AtomicU64, + upstream_connect_attempts_bucket_gt_4: AtomicU64, + upstream_connect_duration_success_bucket_le_100ms: AtomicU64, + upstream_connect_duration_success_bucket_101_500ms: AtomicU64, + upstream_connect_duration_success_bucket_501_1000ms: AtomicU64, + upstream_connect_duration_success_bucket_gt_1000ms: AtomicU64, + upstream_connect_duration_fail_bucket_le_100ms: AtomicU64, + upstream_connect_duration_fail_bucket_101_500ms: AtomicU64, + upstream_connect_duration_fail_bucket_501_1000ms: AtomicU64, + upstream_connect_duration_fail_bucket_gt_1000ms: AtomicU64, me_keepalive_sent: AtomicU64, me_keepalive_failed: AtomicU64, me_keepalive_pong: AtomicU64, me_keepalive_timeout: AtomicU64, + me_rpc_proxy_req_signal_sent_total: AtomicU64, + me_rpc_proxy_req_signal_failed_total: AtomicU64, + me_rpc_proxy_req_signal_skipped_no_meta_total: AtomicU64, + me_rpc_proxy_req_signal_response_total: AtomicU64, + me_rpc_proxy_req_signal_close_sent_total: AtomicU64, me_reconnect_attempts: AtomicU64, me_reconnect_success: AtomicU64, me_handshake_reject_total: AtomicU64, me_reader_eof_total: AtomicU64, + me_idle_close_by_peer_total: AtomicU64, me_crc_mismatch: AtomicU64, me_seq_mismatch: AtomicU64, me_endpoint_quarantine_total: AtomicU64, @@ -155,6 +177,99 @@ impl Stats { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); } } + pub fn increment_upstream_connect_attempt_total(&self) { + if self.telemetry_core_enabled() { + self.upstream_connect_attempt_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_upstream_connect_success_total(&self) { + if self.telemetry_core_enabled() { + self.upstream_connect_success_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_upstream_connect_fail_total(&self) { + if self.telemetry_core_enabled() { + self.upstream_connect_fail_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_upstream_connect_failfast_hard_error_total(&self) { + if self.telemetry_core_enabled() { + self.upstream_connect_failfast_hard_error_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn observe_upstream_connect_attempts_per_request(&self, attempts: u32) { + if !self.telemetry_core_enabled() { + return; + } + match attempts { + 0 => {} + 1 => { + self.upstream_connect_attempts_bucket_1 + .fetch_add(1, Ordering::Relaxed); + } + 2 => { + self.upstream_connect_attempts_bucket_2 + .fetch_add(1, Ordering::Relaxed); + } + 3..=4 => { + self.upstream_connect_attempts_bucket_3_4 + .fetch_add(1, Ordering::Relaxed); + } + _ => { + self.upstream_connect_attempts_bucket_gt_4 + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn observe_upstream_connect_duration_ms(&self, duration_ms: u64, success: bool) { + if !self.telemetry_core_enabled() { + return; + } + let bucket = match duration_ms { + 0..=100 => 0u8, + 101..=500 => 1u8, + 501..=1000 => 2u8, + _ => 3u8, + }; + match (success, bucket) { + (true, 0) => { + self.upstream_connect_duration_success_bucket_le_100ms + .fetch_add(1, Ordering::Relaxed); + } + (true, 1) => { + self.upstream_connect_duration_success_bucket_101_500ms + .fetch_add(1, Ordering::Relaxed); + } + (true, 2) => { + self.upstream_connect_duration_success_bucket_501_1000ms + .fetch_add(1, Ordering::Relaxed); + } + (true, _) => { + self.upstream_connect_duration_success_bucket_gt_1000ms + .fetch_add(1, Ordering::Relaxed); + } + (false, 0) => { + self.upstream_connect_duration_fail_bucket_le_100ms + .fetch_add(1, Ordering::Relaxed); + } + (false, 1) => { + self.upstream_connect_duration_fail_bucket_101_500ms + .fetch_add(1, Ordering::Relaxed); + } + (false, 2) => { + self.upstream_connect_duration_fail_bucket_501_1000ms + .fetch_add(1, Ordering::Relaxed); + } + (false, _) => { + self.upstream_connect_duration_fail_bucket_gt_1000ms + .fetch_add(1, Ordering::Relaxed); + } + } + } pub fn increment_me_keepalive_sent(&self) { if self.telemetry_me_allows_debug() { self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); @@ -180,6 +295,36 @@ impl Stats { self.me_keepalive_timeout.fetch_add(value, Ordering::Relaxed); } } + pub fn increment_me_rpc_proxy_req_signal_sent_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_rpc_proxy_req_signal_sent_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_rpc_proxy_req_signal_failed_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_rpc_proxy_req_signal_failed_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_rpc_proxy_req_signal_skipped_no_meta_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_rpc_proxy_req_signal_skipped_no_meta_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_rpc_proxy_req_signal_response_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_rpc_proxy_req_signal_response_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_rpc_proxy_req_signal_close_sent_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_rpc_proxy_req_signal_close_sent_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_reconnect_attempt(&self) { if self.telemetry_me_allows_normal() { self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); @@ -210,6 +355,12 @@ impl Stats { self.me_reader_eof_total.fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_idle_close_by_peer_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_idle_close_by_peer_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_crc_mismatch(&self) { if self.telemetry_me_allows_normal() { self.me_crc_mismatch.fetch_add(1, Ordering::Relaxed); @@ -466,6 +617,26 @@ impl Stats { pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) } pub fn get_me_keepalive_pong(&self) -> u64 { self.me_keepalive_pong.load(Ordering::Relaxed) } pub fn get_me_keepalive_timeout(&self) -> u64 { self.me_keepalive_timeout.load(Ordering::Relaxed) } + pub fn get_me_rpc_proxy_req_signal_sent_total(&self) -> u64 { + self.me_rpc_proxy_req_signal_sent_total + .load(Ordering::Relaxed) + } + pub fn get_me_rpc_proxy_req_signal_failed_total(&self) -> u64 { + self.me_rpc_proxy_req_signal_failed_total + .load(Ordering::Relaxed) + } + pub fn get_me_rpc_proxy_req_signal_skipped_no_meta_total(&self) -> u64 { + self.me_rpc_proxy_req_signal_skipped_no_meta_total + .load(Ordering::Relaxed) + } + pub fn get_me_rpc_proxy_req_signal_response_total(&self) -> u64 { + self.me_rpc_proxy_req_signal_response_total + .load(Ordering::Relaxed) + } + pub fn get_me_rpc_proxy_req_signal_close_sent_total(&self) -> u64 { + self.me_rpc_proxy_req_signal_close_sent_total + .load(Ordering::Relaxed) + } pub fn get_me_reconnect_attempts(&self) -> u64 { self.me_reconnect_attempts.load(Ordering::Relaxed) } pub fn get_me_reconnect_success(&self) -> u64 { self.me_reconnect_success.load(Ordering::Relaxed) } pub fn get_me_handshake_reject_total(&self) -> u64 { @@ -474,6 +645,9 @@ impl Stats { pub fn get_me_reader_eof_total(&self) -> u64 { self.me_reader_eof_total.load(Ordering::Relaxed) } + pub fn get_me_idle_close_by_peer_total(&self) -> u64 { + self.me_idle_close_by_peer_total.load(Ordering::Relaxed) + } pub fn get_me_crc_mismatch(&self) -> u64 { self.me_crc_mismatch.load(Ordering::Relaxed) } pub fn get_me_seq_mismatch(&self) -> u64 { self.me_seq_mismatch.load(Ordering::Relaxed) } pub fn get_me_endpoint_quarantine_total(&self) -> u64 { @@ -703,6 +877,65 @@ impl Stats { } pub fn get_handshake_timeouts(&self) -> u64 { self.handshake_timeouts.load(Ordering::Relaxed) } + pub fn get_upstream_connect_attempt_total(&self) -> u64 { + self.upstream_connect_attempt_total.load(Ordering::Relaxed) + } + pub fn get_upstream_connect_success_total(&self) -> u64 { + self.upstream_connect_success_total.load(Ordering::Relaxed) + } + pub fn get_upstream_connect_fail_total(&self) -> u64 { + self.upstream_connect_fail_total.load(Ordering::Relaxed) + } + pub fn get_upstream_connect_failfast_hard_error_total(&self) -> u64 { + self.upstream_connect_failfast_hard_error_total + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_attempts_bucket_1(&self) -> u64 { + self.upstream_connect_attempts_bucket_1.load(Ordering::Relaxed) + } + pub fn get_upstream_connect_attempts_bucket_2(&self) -> u64 { + self.upstream_connect_attempts_bucket_2.load(Ordering::Relaxed) + } + pub fn get_upstream_connect_attempts_bucket_3_4(&self) -> u64 { + self.upstream_connect_attempts_bucket_3_4 + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_attempts_bucket_gt_4(&self) -> u64 { + self.upstream_connect_attempts_bucket_gt_4 + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_success_bucket_le_100ms(&self) -> u64 { + self.upstream_connect_duration_success_bucket_le_100ms + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_success_bucket_101_500ms(&self) -> u64 { + self.upstream_connect_duration_success_bucket_101_500ms + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_success_bucket_501_1000ms(&self) -> u64 { + self.upstream_connect_duration_success_bucket_501_1000ms + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_success_bucket_gt_1000ms(&self) -> u64 { + self.upstream_connect_duration_success_bucket_gt_1000ms + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_fail_bucket_le_100ms(&self) -> u64 { + self.upstream_connect_duration_fail_bucket_le_100ms + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_fail_bucket_101_500ms(&self) -> u64 { + self.upstream_connect_duration_fail_bucket_101_500ms + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_fail_bucket_501_1000ms(&self) -> u64 { + self.upstream_connect_duration_fail_bucket_501_1000ms + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_fail_bucket_gt_1000ms(&self) -> u64 { + self.upstream_connect_duration_fail_bucket_gt_1000ms + .load(Ordering::Relaxed) + } pub fn iter_user_stats(&self) -> dashmap::iter::Iter<'_, String, UserStats> { self.user_stats.iter() diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 55d8409..c9ad34c 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -18,6 +18,10 @@ const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff #[allow(dead_code)] const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1; const SHADOW_ROTATE_RETRY_SECS: u64 = 30; +const IDLE_REFRESH_TRIGGER_BASE_SECS: u64 = 45; +const IDLE_REFRESH_TRIGGER_JITTER_SECS: u64 = 5; +const IDLE_REFRESH_RETRY_SECS: u64 = 8; +const IDLE_REFRESH_SUCCESS_GUARD_SECS: u64 = 5; pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); @@ -27,6 +31,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c let mut outage_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new(); let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new(); loop { @@ -43,6 +48,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut outage_next_attempt, &mut single_endpoint_outage, &mut shadow_rotate_deadline, + &mut idle_refresh_next_attempt, &mut adaptive_idle_since, &mut adaptive_recover_until, ) @@ -58,6 +64,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut outage_next_attempt, &mut single_endpoint_outage, &mut shadow_rotate_deadline, + &mut idle_refresh_next_attempt, &mut adaptive_idle_since, &mut adaptive_recover_until, ) @@ -76,6 +83,7 @@ async fn check_family( outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, single_endpoint_outage: &mut HashSet<(i32, IpFamily)>, shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>, + idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, ) { @@ -120,6 +128,7 @@ async fn check_family( .or_default() .push(writer.id); } + let writer_idle_since = pool.registry.writer_idle_since_snapshot().await; for (dc, endpoints) in dc_endpoints { if endpoints.is_empty() { @@ -171,6 +180,7 @@ async fn check_family( outage_backoff.remove(&key); outage_next_attempt.remove(&key); shadow_rotate_deadline.remove(&key); + idle_refresh_next_attempt.remove(&key); adaptive_idle_since.remove(&key); adaptive_recover_until.remove(&key); info!( @@ -184,6 +194,20 @@ async fn check_family( } if alive >= required { + maybe_refresh_idle_writer_for_dc( + pool, + rng, + key, + dc, + family, + &endpoints, + alive, + required, + &live_writer_ids_by_addr, + &writer_idle_since, + idle_refresh_next_attempt, + ) + .await; maybe_rotate_single_endpoint_shadow( pool, rng, @@ -287,6 +311,113 @@ async fn check_family( } } +async fn maybe_refresh_idle_writer_for_dc( + pool: &Arc, + rng: &Arc, + key: (i32, IpFamily), + dc: i32, + family: IpFamily, + endpoints: &[SocketAddr], + alive: usize, + required: usize, + live_writer_ids_by_addr: &HashMap>, + writer_idle_since: &HashMap, + idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, +) { + if alive < required { + return; + } + + let now = Instant::now(); + if let Some(next) = idle_refresh_next_attempt.get(&key) + && now < *next + { + return; + } + + let now_epoch_secs = MePool::now_epoch_secs(); + let mut candidate: Option<(u64, SocketAddr, u64, u64)> = None; + for endpoint in endpoints { + let Some(writer_ids) = live_writer_ids_by_addr.get(endpoint) else { + continue; + }; + for writer_id in writer_ids { + let Some(idle_since_epoch_secs) = writer_idle_since.get(writer_id).copied() else { + continue; + }; + let idle_age_secs = now_epoch_secs.saturating_sub(idle_since_epoch_secs); + let threshold_secs = IDLE_REFRESH_TRIGGER_BASE_SECS + + (*writer_id % (IDLE_REFRESH_TRIGGER_JITTER_SECS + 1)); + if idle_age_secs < threshold_secs { + continue; + } + if candidate + .as_ref() + .map(|(_, _, age, _)| idle_age_secs > *age) + .unwrap_or(true) + { + candidate = Some((*writer_id, *endpoint, idle_age_secs, threshold_secs)); + } + } + } + + let Some((old_writer_id, endpoint, idle_age_secs, threshold_secs)) = candidate else { + return; + }; + + let rotate_ok = match tokio::time::timeout(pool.me_one_timeout, pool.connect_one(endpoint, rng.as_ref())).await { + Ok(Ok(())) => true, + Ok(Err(error)) => { + debug!( + dc = %dc, + ?family, + %endpoint, + old_writer_id, + idle_age_secs, + threshold_secs, + %error, + "Idle writer pre-refresh connect failed" + ); + false + } + Err(_) => { + debug!( + dc = %dc, + ?family, + %endpoint, + old_writer_id, + idle_age_secs, + threshold_secs, + "Idle writer pre-refresh connect timed out" + ); + false + } + }; + + if !rotate_ok { + idle_refresh_next_attempt.insert(key, now + Duration::from_secs(IDLE_REFRESH_RETRY_SECS)); + return; + } + + pool.mark_writer_draining_with_timeout(old_writer_id, pool.force_close_timeout(), false) + .await; + idle_refresh_next_attempt.insert( + key, + now + Duration::from_secs(IDLE_REFRESH_SUCCESS_GUARD_SECS), + ); + info!( + dc = %dc, + ?family, + %endpoint, + old_writer_id, + idle_age_secs, + threshold_secs, + alive, + required, + "Idle writer refreshed before upstream idle timeout" + ); +} + async fn should_reduce_floor_for_idle( pool: &Arc, key: (i32, IpFamily), diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 5ae922a..8c185be 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -94,6 +94,7 @@ pub struct MePool { pub(super) me_keepalive_interval: Duration, pub(super) me_keepalive_jitter: Duration, pub(super) me_keepalive_payload_random: bool, + pub(super) rpc_proxy_req_every_secs: AtomicU64, pub(super) me_warmup_stagger_enabled: bool, pub(super) me_warmup_step_delay: Duration, pub(super) me_warmup_step_jitter: Duration, @@ -192,6 +193,7 @@ impl MePool { me_keepalive_interval_secs: u64, me_keepalive_jitter_secs: u64, me_keepalive_payload_random: bool, + rpc_proxy_req_every_secs: u64, me_warmup_stagger_enabled: bool, me_warmup_step_delay_ms: u64, me_warmup_step_jitter_ms: u64, @@ -272,6 +274,7 @@ impl MePool { me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs), me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs), me_keepalive_payload_random, + rpc_proxy_req_every_secs: AtomicU64::new(rpc_proxy_req_every_secs), me_warmup_stagger_enabled, me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms), me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms), diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 455757e..1e86ea3 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -2,6 +2,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering}; use std::time::{Duration, Instant}; +use std::io::ErrorKind; use bytes::BytesMut; use rand::Rng; @@ -12,16 +13,22 @@ use tracing::{debug, info, warn}; use crate::config::MeBindStaleMode; use crate::crypto::SecureRandom; use crate::error::{ProxyError, Result}; -use crate::protocol::constants::RPC_PING_U32; +use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32}; use super::codec::{RpcWriter, WriterCommand}; use super::pool::{MePool, MeWriter, WriterContour}; use super::reader::reader_loop; use super::registry::BoundConn; +use super::wire::build_proxy_req_payload; const ME_ACTIVE_PING_SECS: u64 = 25; const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5; +const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700; + +fn is_me_peer_closed_error(error: &ProxyError) -> bool { + matches!(error, ProxyError::Io(ioe) if ioe.kind() == ErrorKind::UnexpectedEof) +} impl MePool { pub(crate) async fn prune_closed_writers(self: &Arc) { @@ -115,6 +122,7 @@ impl MePool { allow_drain_fallback: allow_drain_fallback.clone(), }; self.writers.write().await.push(writer.clone()); + self.registry.mark_writer_idle(writer_id).await; self.conn_count.fetch_add(1, Ordering::Relaxed); self.writer_available.notify_one(); @@ -124,6 +132,7 @@ impl MePool { let ping_tracker_reader = ping_tracker.clone(); let rtt_stats = self.rtt_stats.clone(); let stats_reader = self.stats.clone(); + let stats_reader_close = self.stats.clone(); let stats_ping = self.stats.clone(); let pool = Arc::downgrade(self); let cancel_ping = cancel.clone(); @@ -135,6 +144,13 @@ impl MePool { let keepalive_enabled = self.me_keepalive_enabled; let keepalive_interval = self.me_keepalive_interval; let keepalive_jitter = self.me_keepalive_jitter; + let rpc_proxy_req_every_secs = self.rpc_proxy_req_every_secs.load(Ordering::Relaxed); + let tx_signal = tx.clone(); + let stats_signal = self.stats.clone(); + let cancel_signal = cancel.clone(); + let cleanup_for_signal = cleanup_done.clone(); + let pool_signal = Arc::downgrade(self); + let keepalive_jitter_signal = self.me_keepalive_jitter; let cancel_reader_token = cancel.clone(); let cancel_ping_token = cancel_ping.clone(); @@ -156,6 +172,15 @@ impl MePool { cancel_reader_token.clone(), ) .await; + let idle_close_by_peer = if let Err(e) = res.as_ref() { + is_me_peer_closed_error(e) && reg.is_writer_empty(writer_id).await + } else { + false + }; + if idle_close_by_peer { + stats_reader_close.increment_me_idle_close_by_peer_total(); + info!(writer_id, "ME socket closed by peer on idle writer"); + } if let Some(pool) = pool.upgrade() && cleanup_for_reader .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) @@ -164,7 +189,9 @@ impl MePool { pool.remove_writer_and_close_clients(writer_id).await; } if let Err(e) = res { - warn!(error = %e, "ME reader ended"); + if !idle_close_by_peer { + warn!(error = %e, "ME reader ended"); + } } let mut ws = writers_arc.write().await; ws.retain(|w| w.id != writer_id); @@ -253,6 +280,116 @@ impl MePool { } }); + tokio::spawn(async move { + if rpc_proxy_req_every_secs == 0 { + return; + } + + let interval = Duration::from_secs(rpc_proxy_req_every_secs); + let startup_jitter_ms = { + let jitter_cap_ms = interval.as_millis() / 2; + let effective_jitter_ms = keepalive_jitter_signal + .as_millis() + .min(jitter_cap_ms) + .max(1); + rand::rng().random_range(0..=effective_jitter_ms as u64) + }; + + tokio::select! { + _ = cancel_signal.cancelled() => return, + _ = tokio::time::sleep(Duration::from_millis(startup_jitter_ms)) => {} + } + + loop { + let wait = { + let jitter_cap_ms = interval.as_millis() / 2; + let effective_jitter_ms = keepalive_jitter_signal + .as_millis() + .min(jitter_cap_ms) + .max(1); + interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64)) + }; + + tokio::select! { + _ = cancel_signal.cancelled() => break, + _ = tokio::time::sleep(wait) => {} + } + + let Some(pool) = pool_signal.upgrade() else { + break; + }; + + let Some(meta) = pool.registry.get_last_writer_meta(writer_id).await else { + stats_signal.increment_me_rpc_proxy_req_signal_skipped_no_meta_total(); + continue; + }; + + let (conn_id, mut service_rx) = pool.registry.register().await; + pool.registry + .bind_writer(conn_id, writer_id, tx_signal.clone(), meta.clone()) + .await; + + let payload = build_proxy_req_payload( + conn_id, + meta.client_addr, + meta.our_addr, + &[], + pool.proxy_tag.as_deref(), + meta.proto_flags, + ); + + if tx_signal.send(WriterCommand::DataAndFlush(payload)).await.is_err() { + stats_signal.increment_me_rpc_proxy_req_signal_failed_total(); + let _ = pool.registry.unregister(conn_id).await; + cancel_signal.cancel(); + if cleanup_for_signal + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() + { + pool.remove_writer_and_close_clients(writer_id).await; + } + break; + } + + stats_signal.increment_me_rpc_proxy_req_signal_sent_total(); + + if matches!( + tokio::time::timeout( + Duration::from_millis(ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS), + service_rx.recv(), + ) + .await, + Ok(Some(_)) + ) { + stats_signal.increment_me_rpc_proxy_req_signal_response_total(); + } + + let mut close_payload = Vec::with_capacity(12); + close_payload.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); + close_payload.extend_from_slice(&conn_id.to_le_bytes()); + + if tx_signal + .send(WriterCommand::DataAndFlush(close_payload)) + .await + .is_err() + { + stats_signal.increment_me_rpc_proxy_req_signal_failed_total(); + let _ = pool.registry.unregister(conn_id).await; + cancel_signal.cancel(); + if cleanup_for_signal + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() + { + pool.remove_writer_and_close_clients(writer_id).await; + } + break; + } + + stats_signal.increment_me_rpc_proxy_req_signal_close_sent_total(); + let _ = pool.registry.unregister(conn_id).await; + } + }); + Ok(()) } diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 223fa71..4a66654 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -1,7 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::atomic::{AtomicU8, AtomicU64, Ordering}; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::{mpsc, RwLock}; use tokio::sync::mpsc::error::TrySendError; @@ -51,6 +51,8 @@ struct RegistryInner { writer_for_conn: HashMap, conns_for_writer: HashMap>, meta: HashMap, + last_meta_for_writer: HashMap, + writer_idle_since_epoch_secs: HashMap, } impl RegistryInner { @@ -61,6 +63,8 @@ impl RegistryInner { writer_for_conn: HashMap::new(), conns_for_writer: HashMap::new(), meta: HashMap::new(), + last_meta_for_writer: HashMap::new(), + writer_idle_since_epoch_secs: HashMap::new(), } } } @@ -74,6 +78,13 @@ pub struct ConnRegistry { } impl ConnRegistry { + fn now_epoch_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + } + pub fn new() -> Self { let start = rand::random::() | 1; Self { @@ -121,8 +132,16 @@ impl ConnRegistry { inner.map.remove(&id); inner.meta.remove(&id); if let Some(writer_id) = inner.writer_for_conn.remove(&id) { - if let Some(set) = inner.conns_for_writer.get_mut(&writer_id) { + let became_empty = if let Some(set) = inner.conns_for_writer.get_mut(&writer_id) { set.remove(&id); + set.is_empty() + } else { + false + }; + if became_empty { + inner + .writer_idle_since_epoch_secs + .insert(writer_id, Self::now_epoch_secs()); } return Some(writer_id); } @@ -191,8 +210,10 @@ impl ConnRegistry { meta: ConnMeta, ) { let mut inner = self.inner.write().await; - inner.meta.entry(conn_id).or_insert(meta); + inner.meta.entry(conn_id).or_insert(meta.clone()); inner.writer_for_conn.insert(conn_id, writer_id); + inner.last_meta_for_writer.insert(writer_id, meta); + inner.writer_idle_since_epoch_secs.remove(&writer_id); inner.writers.entry(writer_id).or_insert_with(|| tx.clone()); inner .conns_for_writer @@ -201,6 +222,25 @@ impl ConnRegistry { .insert(conn_id); } + pub async fn mark_writer_idle(&self, writer_id: u64) { + let mut inner = self.inner.write().await; + inner.conns_for_writer.entry(writer_id).or_insert_with(HashSet::new); + inner + .writer_idle_since_epoch_secs + .entry(writer_id) + .or_insert(Self::now_epoch_secs()); + } + + pub async fn get_last_writer_meta(&self, writer_id: u64) -> Option { + let inner = self.inner.read().await; + inner.last_meta_for_writer.get(&writer_id).cloned() + } + + pub async fn writer_idle_since_snapshot(&self) -> HashMap { + let inner = self.inner.read().await; + inner.writer_idle_since_epoch_secs.clone() + } + pub async fn get_writer(&self, conn_id: u64) -> Option { let inner = self.inner.read().await; let writer_id = inner.writer_for_conn.get(&conn_id).cloned()?; @@ -211,6 +251,8 @@ impl ConnRegistry { pub async fn writer_lost(&self, writer_id: u64) -> Vec { let mut inner = self.inner.write().await; inner.writers.remove(&writer_id); + inner.last_meta_for_writer.remove(&writer_id); + inner.writer_idle_since_epoch_secs.remove(&writer_id); let conns = inner .conns_for_writer .remove(&writer_id) diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 3b57c4c..ba4a419 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -1,4 +1,5 @@ use std::cmp::Reverse; +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::Ordering; @@ -18,6 +19,9 @@ use super::wire::build_proxy_req_payload; use rand::seq::SliceRandom; use super::registry::ConnMeta; +const IDLE_WRITER_PENALTY_MID_SECS: u64 = 45; +const IDLE_WRITER_PENALTY_HIGH_SECS: u64 = 55; + impl MePool { /// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default. pub async fn send_proxy_req( @@ -152,6 +156,8 @@ impl MePool { return Err(ProxyError::Proxy("No ME writers available for target DC".into())); } } + let writer_idle_since = self.registry.writer_idle_since_snapshot().await; + let now_epoch_secs = Self::now_epoch_secs(); if self.me_deterministic_writer_sort.load(Ordering::Relaxed) { candidate_indices.sort_by(|lhs, rhs| { @@ -161,6 +167,11 @@ impl MePool { self.writer_contour_rank_for_selection(left), (left.generation < self.current_generation()) as usize, left.degraded.load(Ordering::Relaxed) as usize, + self.writer_idle_rank_for_selection( + left, + &writer_idle_since, + now_epoch_secs, + ), Reverse(left.tx.capacity()), left.addr, left.id, @@ -169,6 +180,11 @@ impl MePool { self.writer_contour_rank_for_selection(right), (right.generation < self.current_generation()) as usize, right.degraded.load(Ordering::Relaxed) as usize, + self.writer_idle_rank_for_selection( + right, + &writer_idle_since, + now_epoch_secs, + ), Reverse(right.tx.capacity()), right.addr, right.id, @@ -184,6 +200,11 @@ impl MePool { self.writer_contour_rank_for_selection(w), stale, degraded as usize, + self.writer_idle_rank_for_selection( + w, + &writer_idle_since, + now_epoch_secs, + ), Reverse(w.tx.capacity()), ) }); @@ -367,4 +388,23 @@ impl MePool { WriterContour::Draining => 2, } } + + fn writer_idle_rank_for_selection( + &self, + writer: &super::pool::MeWriter, + idle_since_by_writer: &HashMap, + now_epoch_secs: u64, + ) -> usize { + let Some(idle_since) = idle_since_by_writer.get(&writer.id).copied() else { + return 0; + }; + let idle_age_secs = now_epoch_secs.saturating_sub(idle_since); + if idle_age_secs >= IDLE_WRITER_PENALTY_HIGH_SECS { + 2 + } else if idle_age_secs >= IDLE_WRITER_PENALTY_MID_SECS { + 1 + } else { + 0 + } + } } diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index fa7b0a6..1e2dd1e 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -19,6 +19,7 @@ use crate::config::{UpstreamConfig, UpstreamType}; use crate::error::{Result, ProxyError}; use crate::network::dns_overrides::{resolve_socket_addr, split_host_port}; use crate::protocol::constants::{TG_DATACENTERS_V4, TG_DATACENTERS_V6, TG_DATACENTER_PORT}; +use crate::stats::Stats; use crate::transport::socket::{create_outgoing_socket_bound, resolve_interface_ip}; use crate::transport::socks::{connect_socks4, connect_socks5}; @@ -188,6 +189,8 @@ pub struct UpstreamManager { connect_retry_attempts: u32, connect_retry_backoff: Duration, unhealthy_fail_threshold: u32, + connect_failfast_hard_errors: bool, + stats: Arc, } impl UpstreamManager { @@ -196,6 +199,8 @@ impl UpstreamManager { connect_retry_attempts: u32, connect_retry_backoff_ms: u64, unhealthy_fail_threshold: u32, + connect_failfast_hard_errors: bool, + stats: Arc, ) -> Self { let states = configs.into_iter() .filter(|c| c.enabled) @@ -207,6 +212,8 @@ impl UpstreamManager { connect_retry_attempts: connect_retry_attempts.max(1), connect_retry_backoff: Duration::from_millis(connect_retry_backoff_ms), unhealthy_fail_threshold: unhealthy_fail_threshold.max(1), + connect_failfast_hard_errors, + stats, } } @@ -349,6 +356,34 @@ impl UpstreamManager { } } + fn retry_backoff_with_jitter(&self) -> Duration { + if self.connect_retry_backoff.is_zero() { + return Duration::ZERO; + } + let base_ms = self.connect_retry_backoff.as_millis() as u64; + if base_ms == 0 { + return self.connect_retry_backoff; + } + let jitter_cap_ms = (base_ms / 2).max(1); + let jitter_ms = rand::rng().gen_range(0..=jitter_cap_ms); + Duration::from_millis(base_ms.saturating_add(jitter_ms)) + } + + fn is_hard_connect_error(error: &ProxyError) -> bool { + match error { + ProxyError::Config(_) | ProxyError::ConnectionRefused { .. } => true, + ProxyError::Io(ioe) => matches!( + ioe.kind(), + std::io::ErrorKind::ConnectionRefused + | std::io::ErrorKind::AddrInUse + | std::io::ErrorKind::AddrNotAvailable + | std::io::ErrorKind::InvalidInput + | std::io::ErrorKind::Unsupported + ), + _ => false, + } + } + /// Select upstream using latency-weighted random selection. async fn select_upstream(&self, dc_idx: Option, scope: Option<&str>) -> Option { let upstreams = self.upstreams.read().await; @@ -459,8 +494,12 @@ impl UpstreamManager { guard.get(idx).map(|u| u.bind_rr.clone()) }; + let connect_started_at = Instant::now(); let mut last_error: Option = None; + let mut attempts_used = 0u32; for attempt in 1..=self.connect_retry_attempts { + attempts_used = attempt; + self.stats.increment_upstream_connect_attempt_total(); let start = Instant::now(); match self .connect_via_upstream(&upstream, target, bind_rr.clone()) @@ -468,6 +507,13 @@ impl UpstreamManager { { Ok((stream, egress)) => { let rtt_ms = start.elapsed().as_secs_f64() * 1000.0; + self.stats.increment_upstream_connect_success_total(); + self.stats + .observe_upstream_connect_attempts_per_request(attempts_used); + self.stats.observe_upstream_connect_duration_ms( + connect_started_at.elapsed().as_millis() as u64, + true, + ); let mut guard = self.upstreams.write().await; if let Some(u) = guard.get_mut(idx) { if !u.healthy { @@ -491,7 +537,13 @@ impl UpstreamManager { return Ok((stream, egress)); } Err(e) => { - if attempt < self.connect_retry_attempts { + let hard_error = + self.connect_failfast_hard_errors && Self::is_hard_connect_error(&e); + if hard_error { + self.stats + .increment_upstream_connect_failfast_hard_error_total(); + } + if attempt < self.connect_retry_attempts && !hard_error { debug!( attempt, attempts = self.connect_retry_attempts, @@ -499,21 +551,43 @@ impl UpstreamManager { error = %e, "Upstream connect attempt failed, retrying" ); - if !self.connect_retry_backoff.is_zero() { - tokio::time::sleep(self.connect_retry_backoff).await; + let backoff = self.retry_backoff_with_jitter(); + if !backoff.is_zero() { + tokio::time::sleep(backoff).await; } + } else if hard_error { + debug!( + attempt, + attempts = self.connect_retry_attempts, + target = %target, + error = %e, + "Upstream connect failed with hard error, failfast is active" + ); } last_error = Some(e); + if hard_error { + break; + } } } } + self.stats.increment_upstream_connect_fail_total(); + self.stats + .observe_upstream_connect_attempts_per_request(attempts_used); + self.stats.observe_upstream_connect_duration_ms( + connect_started_at.elapsed().as_millis() as u64, + false, + ); + let error = last_error.unwrap_or_else(|| { ProxyError::Config("Upstream connect attempts exhausted".to_string()) }); let mut guard = self.upstreams.write().await; if let Some(u) = guard.get_mut(idx) { + // Intermediate attempts are intentionally ignored here. + // Health state is degraded only when the entire connect cycle fails. u.fails += 1; warn!( fails = u.fails, @@ -1364,4 +1438,20 @@ mod tests { .contains(&"198.51.100.2:443".parse::().unwrap())); assert!(dc9.fallback.is_empty()); } + + #[test] + fn hard_connect_error_classification_detects_connection_refused() { + let error = ProxyError::ConnectionRefused { + addr: "127.0.0.1:443".to_string(), + }; + assert!(UpstreamManager::is_hard_connect_error(&error)); + } + + #[test] + fn hard_connect_error_classification_skips_timeouts() { + let error = ProxyError::ConnectionTimeout { + addr: "127.0.0.1:443".to_string(), + }; + assert!(!UpstreamManager::is_hard_connect_error(&error)); + } }