diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 83b263d..41573a4 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -216,6 +216,10 @@ pub(crate) fn default_upstream_connect_failfast_hard_errors() -> bool { false } +pub(crate) fn default_rpc_proxy_req_every() -> u64 { + 0 +} + pub(crate) fn default_crypto_pending_buffer() -> usize { 256 * 1024 } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 5ec911c..902811c 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -137,6 +137,7 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig) { != new.general.upstream_unhealthy_fail_threshold || old.general.upstream_connect_failfast_hard_errors != new.general.upstream_connect_failfast_hard_errors + || old.general.rpc_proxy_req_every != new.general.rpc_proxy_req_every { warn!("config reload: general.upstream_* changed; restart required"); } diff --git a/src/config/load.rs b/src/config/load.rs index 9dbd7c3..c051b8e 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -249,6 +249,14 @@ impl ProxyConfig { )); } + if config.general.rpc_proxy_req_every != 0 + && !(10..=300).contains(&config.general.rpc_proxy_req_every) + { + return Err(ProxyError::Config( + "general.rpc_proxy_req_every must be 0 or within [10, 300]".to_string(), + )); + } + if config.general.me_reinit_every_secs == 0 { return Err(ProxyError::Config( "general.me_reinit_every_secs must be > 0".to_string(), @@ -680,6 +688,10 @@ mod tests { cfg.general.upstream_connect_failfast_hard_errors, default_upstream_connect_failfast_hard_errors() ); + assert_eq!( + cfg.general.rpc_proxy_req_every, + default_rpc_proxy_req_every() + ); assert_eq!(cfg.general.update_every, default_update_every()); assert_eq!(cfg.server.listen_addr_ipv4, default_listen_addr_ipv4()); assert_eq!(cfg.server.listen_addr_ipv6, default_listen_addr_ipv6_opt()); @@ -759,6 +771,7 @@ mod tests { general.upstream_connect_failfast_hard_errors, default_upstream_connect_failfast_hard_errors() ); + assert_eq!(general.rpc_proxy_req_every, default_rpc_proxy_req_every()); assert_eq!(general.update_every, default_update_every()); let server = ServerConfig::default(); @@ -1058,6 +1071,62 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn rpc_proxy_req_every_out_of_range_is_rejected() { + let toml = r#" + [general] + rpc_proxy_req_every = 9 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_rpc_proxy_req_every_out_of_range_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.rpc_proxy_req_every must be 0 or within [10, 300]")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn rpc_proxy_req_every_zero_and_valid_range_are_accepted() { + let toml_zero = r#" + [general] + rpc_proxy_req_every = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path_zero = dir.join("telemt_rpc_proxy_req_every_zero_ok_test.toml"); + std::fs::write(&path_zero, toml_zero).unwrap(); + let cfg_zero = ProxyConfig::load(&path_zero).unwrap(); + assert_eq!(cfg_zero.general.rpc_proxy_req_every, 0); + let _ = std::fs::remove_file(path_zero); + + let toml_valid = r#" + [general] + rpc_proxy_req_every = 40 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let path_valid = dir.join("telemt_rpc_proxy_req_every_valid_ok_test.toml"); + std::fs::write(&path_valid, toml_valid).unwrap(); + let cfg_valid = ProxyConfig::load(&path_valid).unwrap(); + assert_eq!(cfg_valid.general.rpc_proxy_req_every, 40); + let _ = std::fs::remove_file(path_valid); + } + #[test] fn me_hardswap_warmup_defaults_are_set() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index 137f585..64be729 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -162,8 +162,8 @@ impl MeBindStaleMode { #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(rename_all = "lowercase")] pub enum MeFloorMode { - #[default] Static, + #[default] Adaptive, } @@ -356,6 +356,11 @@ pub struct GeneralConfig { #[serde(default = "default_true")] pub me_keepalive_payload_random: bool, + /// Interval in seconds for service RPC_PROXY_REQ activity signals to ME. + /// 0 disables service activity signals. + #[serde(default = "default_rpc_proxy_req_every")] + pub rpc_proxy_req_every: u64, + /// Max pending ciphertext buffer per client writer (bytes). /// Controls FakeTLS backpressure vs throughput. #[serde(default = "default_crypto_pending_buffer")] @@ -666,6 +671,7 @@ impl Default for GeneralConfig { me_keepalive_interval_secs: default_keepalive_interval(), me_keepalive_jitter_secs: default_keepalive_jitter(), me_keepalive_payload_random: default_true(), + rpc_proxy_req_every: default_rpc_proxy_req_every(), me_warmup_stagger_enabled: default_true(), me_warmup_step_delay_ms: default_warmup_step_delay_ms(), me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(), diff --git a/src/main.rs b/src/main.rs index 54011cf..f7f9239 100644 --- a/src/main.rs +++ b/src/main.rs @@ -534,6 +534,7 @@ async fn main() -> std::result::Result<(), Box> { config.general.me_keepalive_interval_secs, config.general.me_keepalive_jitter_secs, config.general.me_keepalive_payload_random, + config.general.rpc_proxy_req_every, config.general.me_warmup_stagger_enabled, config.general.me_warmup_step_delay_ms, config.general.me_warmup_step_jitter_ms, diff --git a/src/metrics.rs b/src/metrics.rs index 2460d8e..eae69d1 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -439,6 +439,93 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_rpc_proxy_req_signal_sent_total Service RPC_PROXY_REQ activity signals sent" + ); + let _ = writeln!(out, "# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"); + let _ = writeln!( + out, + "telemt_me_rpc_proxy_req_signal_sent_total {}", + if me_allows_normal { + stats.get_me_rpc_proxy_req_signal_sent_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_rpc_proxy_req_signal_failed_total Service RPC_PROXY_REQ activity signal failures" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_rpc_proxy_req_signal_failed_total counter" + ); + let _ = writeln!( + out, + "telemt_me_rpc_proxy_req_signal_failed_total {}", + if me_allows_normal { + stats.get_me_rpc_proxy_req_signal_failed_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_rpc_proxy_req_signal_skipped_no_meta_total Service RPC_PROXY_REQ skipped due to missing writer metadata" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_rpc_proxy_req_signal_skipped_no_meta_total counter" + ); + let _ = writeln!( + out, + "telemt_me_rpc_proxy_req_signal_skipped_no_meta_total {}", + if me_allows_normal { + stats.get_me_rpc_proxy_req_signal_skipped_no_meta_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_rpc_proxy_req_signal_response_total Service RPC_PROXY_REQ responses observed" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_rpc_proxy_req_signal_response_total counter" + ); + let _ = writeln!( + out, + "telemt_me_rpc_proxy_req_signal_response_total {}", + if me_allows_normal { + stats.get_me_rpc_proxy_req_signal_response_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_rpc_proxy_req_signal_close_sent_total Service RPC_CLOSE_EXT sent after activity signals" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_rpc_proxy_req_signal_close_sent_total counter" + ); + let _ = writeln!( + out, + "telemt_me_rpc_proxy_req_signal_close_sent_total {}", + if me_allows_normal { + stats.get_me_rpc_proxy_req_signal_close_sent_total() + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_me_reconnect_attempts_total ME reconnect attempts"); let _ = writeln!(out, "# TYPE telemt_me_reconnect_attempts_total counter"); let _ = writeln!( @@ -500,6 +587,21 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_idle_close_by_peer_total ME idle writers closed by peer" + ); + let _ = writeln!(out, "# TYPE telemt_me_idle_close_by_peer_total counter"); + let _ = writeln!( + out, + "telemt_me_idle_close_by_peer_total {}", + if me_allows_normal { + stats.get_me_idle_close_by_peer_total() + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_me_crc_mismatch_total ME CRC mismatches"); let _ = writeln!(out, "# TYPE telemt_me_crc_mismatch_total counter"); let _ = writeln!( @@ -1225,6 +1327,12 @@ mod tests { stats.observe_upstream_connect_attempts_per_request(2); stats.observe_upstream_connect_duration_ms(220, true); stats.observe_upstream_connect_duration_ms(1500, false); + stats.increment_me_rpc_proxy_req_signal_sent_total(); + stats.increment_me_rpc_proxy_req_signal_failed_total(); + stats.increment_me_rpc_proxy_req_signal_skipped_no_meta_total(); + stats.increment_me_rpc_proxy_req_signal_response_total(); + stats.increment_me_rpc_proxy_req_signal_close_sent_total(); + stats.increment_me_idle_close_by_peer_total(); stats.increment_user_connects("alice"); stats.increment_user_curr_connects("alice"); stats.add_user_octets_from("alice", 1024); @@ -1257,6 +1365,12 @@ mod tests { assert!( output.contains("telemt_upstream_connect_duration_fail_total{bucket=\"gt_1000ms\"} 1") ); + assert!(output.contains("telemt_me_rpc_proxy_req_signal_sent_total 1")); + assert!(output.contains("telemt_me_rpc_proxy_req_signal_failed_total 1")); + assert!(output.contains("telemt_me_rpc_proxy_req_signal_skipped_no_meta_total 1")); + assert!(output.contains("telemt_me_rpc_proxy_req_signal_response_total 1")); + assert!(output.contains("telemt_me_rpc_proxy_req_signal_close_sent_total 1")); + assert!(output.contains("telemt_me_idle_close_by_peer_total 1")); assert!(output.contains("telemt_user_connections_total{user=\"alice\"} 1")); assert!(output.contains("telemt_user_connections_current{user=\"alice\"} 1")); assert!(output.contains("telemt_user_octets_from_client{user=\"alice\"} 1024")); @@ -1291,6 +1405,8 @@ mod tests { assert!(output.contains("# TYPE telemt_connections_bad_total counter")); assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter")); assert!(output.contains("# TYPE telemt_upstream_connect_attempt_total counter")); + assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter")); + assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); assert!(output.contains( "# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge" diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 6c72b6f..29d7f45 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -46,10 +46,16 @@ pub struct Stats { me_keepalive_failed: AtomicU64, me_keepalive_pong: AtomicU64, me_keepalive_timeout: AtomicU64, + me_rpc_proxy_req_signal_sent_total: AtomicU64, + me_rpc_proxy_req_signal_failed_total: AtomicU64, + me_rpc_proxy_req_signal_skipped_no_meta_total: AtomicU64, + me_rpc_proxy_req_signal_response_total: AtomicU64, + me_rpc_proxy_req_signal_close_sent_total: AtomicU64, me_reconnect_attempts: AtomicU64, me_reconnect_success: AtomicU64, me_handshake_reject_total: AtomicU64, me_reader_eof_total: AtomicU64, + me_idle_close_by_peer_total: AtomicU64, me_crc_mismatch: AtomicU64, me_seq_mismatch: AtomicU64, me_endpoint_quarantine_total: AtomicU64, @@ -289,6 +295,36 @@ impl Stats { self.me_keepalive_timeout.fetch_add(value, Ordering::Relaxed); } } + pub fn increment_me_rpc_proxy_req_signal_sent_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_rpc_proxy_req_signal_sent_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_rpc_proxy_req_signal_failed_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_rpc_proxy_req_signal_failed_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_rpc_proxy_req_signal_skipped_no_meta_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_rpc_proxy_req_signal_skipped_no_meta_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_rpc_proxy_req_signal_response_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_rpc_proxy_req_signal_response_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_rpc_proxy_req_signal_close_sent_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_rpc_proxy_req_signal_close_sent_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_reconnect_attempt(&self) { if self.telemetry_me_allows_normal() { self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); @@ -319,6 +355,12 @@ impl Stats { self.me_reader_eof_total.fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_idle_close_by_peer_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_idle_close_by_peer_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_crc_mismatch(&self) { if self.telemetry_me_allows_normal() { self.me_crc_mismatch.fetch_add(1, Ordering::Relaxed); @@ -575,6 +617,26 @@ impl Stats { pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) } pub fn get_me_keepalive_pong(&self) -> u64 { self.me_keepalive_pong.load(Ordering::Relaxed) } pub fn get_me_keepalive_timeout(&self) -> u64 { self.me_keepalive_timeout.load(Ordering::Relaxed) } + pub fn get_me_rpc_proxy_req_signal_sent_total(&self) -> u64 { + self.me_rpc_proxy_req_signal_sent_total + .load(Ordering::Relaxed) + } + pub fn get_me_rpc_proxy_req_signal_failed_total(&self) -> u64 { + self.me_rpc_proxy_req_signal_failed_total + .load(Ordering::Relaxed) + } + pub fn get_me_rpc_proxy_req_signal_skipped_no_meta_total(&self) -> u64 { + self.me_rpc_proxy_req_signal_skipped_no_meta_total + .load(Ordering::Relaxed) + } + pub fn get_me_rpc_proxy_req_signal_response_total(&self) -> u64 { + self.me_rpc_proxy_req_signal_response_total + .load(Ordering::Relaxed) + } + pub fn get_me_rpc_proxy_req_signal_close_sent_total(&self) -> u64 { + self.me_rpc_proxy_req_signal_close_sent_total + .load(Ordering::Relaxed) + } pub fn get_me_reconnect_attempts(&self) -> u64 { self.me_reconnect_attempts.load(Ordering::Relaxed) } pub fn get_me_reconnect_success(&self) -> u64 { self.me_reconnect_success.load(Ordering::Relaxed) } pub fn get_me_handshake_reject_total(&self) -> u64 { @@ -583,6 +645,9 @@ impl Stats { pub fn get_me_reader_eof_total(&self) -> u64 { self.me_reader_eof_total.load(Ordering::Relaxed) } + pub fn get_me_idle_close_by_peer_total(&self) -> u64 { + self.me_idle_close_by_peer_total.load(Ordering::Relaxed) + } pub fn get_me_crc_mismatch(&self) -> u64 { self.me_crc_mismatch.load(Ordering::Relaxed) } pub fn get_me_seq_mismatch(&self) -> u64 { self.me_seq_mismatch.load(Ordering::Relaxed) } pub fn get_me_endpoint_quarantine_total(&self) -> u64 { diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 55d8409..c9ad34c 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -18,6 +18,10 @@ const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff #[allow(dead_code)] const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1; const SHADOW_ROTATE_RETRY_SECS: u64 = 30; +const IDLE_REFRESH_TRIGGER_BASE_SECS: u64 = 45; +const IDLE_REFRESH_TRIGGER_JITTER_SECS: u64 = 5; +const IDLE_REFRESH_RETRY_SECS: u64 = 8; +const IDLE_REFRESH_SUCCESS_GUARD_SECS: u64 = 5; pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); @@ -27,6 +31,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c let mut outage_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new(); let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new(); loop { @@ -43,6 +48,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut outage_next_attempt, &mut single_endpoint_outage, &mut shadow_rotate_deadline, + &mut idle_refresh_next_attempt, &mut adaptive_idle_since, &mut adaptive_recover_until, ) @@ -58,6 +64,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut outage_next_attempt, &mut single_endpoint_outage, &mut shadow_rotate_deadline, + &mut idle_refresh_next_attempt, &mut adaptive_idle_since, &mut adaptive_recover_until, ) @@ -76,6 +83,7 @@ async fn check_family( outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, single_endpoint_outage: &mut HashSet<(i32, IpFamily)>, shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>, + idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, ) { @@ -120,6 +128,7 @@ async fn check_family( .or_default() .push(writer.id); } + let writer_idle_since = pool.registry.writer_idle_since_snapshot().await; for (dc, endpoints) in dc_endpoints { if endpoints.is_empty() { @@ -171,6 +180,7 @@ async fn check_family( outage_backoff.remove(&key); outage_next_attempt.remove(&key); shadow_rotate_deadline.remove(&key); + idle_refresh_next_attempt.remove(&key); adaptive_idle_since.remove(&key); adaptive_recover_until.remove(&key); info!( @@ -184,6 +194,20 @@ async fn check_family( } if alive >= required { + maybe_refresh_idle_writer_for_dc( + pool, + rng, + key, + dc, + family, + &endpoints, + alive, + required, + &live_writer_ids_by_addr, + &writer_idle_since, + idle_refresh_next_attempt, + ) + .await; maybe_rotate_single_endpoint_shadow( pool, rng, @@ -287,6 +311,113 @@ async fn check_family( } } +async fn maybe_refresh_idle_writer_for_dc( + pool: &Arc, + rng: &Arc, + key: (i32, IpFamily), + dc: i32, + family: IpFamily, + endpoints: &[SocketAddr], + alive: usize, + required: usize, + live_writer_ids_by_addr: &HashMap>, + writer_idle_since: &HashMap, + idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, +) { + if alive < required { + return; + } + + let now = Instant::now(); + if let Some(next) = idle_refresh_next_attempt.get(&key) + && now < *next + { + return; + } + + let now_epoch_secs = MePool::now_epoch_secs(); + let mut candidate: Option<(u64, SocketAddr, u64, u64)> = None; + for endpoint in endpoints { + let Some(writer_ids) = live_writer_ids_by_addr.get(endpoint) else { + continue; + }; + for writer_id in writer_ids { + let Some(idle_since_epoch_secs) = writer_idle_since.get(writer_id).copied() else { + continue; + }; + let idle_age_secs = now_epoch_secs.saturating_sub(idle_since_epoch_secs); + let threshold_secs = IDLE_REFRESH_TRIGGER_BASE_SECS + + (*writer_id % (IDLE_REFRESH_TRIGGER_JITTER_SECS + 1)); + if idle_age_secs < threshold_secs { + continue; + } + if candidate + .as_ref() + .map(|(_, _, age, _)| idle_age_secs > *age) + .unwrap_or(true) + { + candidate = Some((*writer_id, *endpoint, idle_age_secs, threshold_secs)); + } + } + } + + let Some((old_writer_id, endpoint, idle_age_secs, threshold_secs)) = candidate else { + return; + }; + + let rotate_ok = match tokio::time::timeout(pool.me_one_timeout, pool.connect_one(endpoint, rng.as_ref())).await { + Ok(Ok(())) => true, + Ok(Err(error)) => { + debug!( + dc = %dc, + ?family, + %endpoint, + old_writer_id, + idle_age_secs, + threshold_secs, + %error, + "Idle writer pre-refresh connect failed" + ); + false + } + Err(_) => { + debug!( + dc = %dc, + ?family, + %endpoint, + old_writer_id, + idle_age_secs, + threshold_secs, + "Idle writer pre-refresh connect timed out" + ); + false + } + }; + + if !rotate_ok { + idle_refresh_next_attempt.insert(key, now + Duration::from_secs(IDLE_REFRESH_RETRY_SECS)); + return; + } + + pool.mark_writer_draining_with_timeout(old_writer_id, pool.force_close_timeout(), false) + .await; + idle_refresh_next_attempt.insert( + key, + now + Duration::from_secs(IDLE_REFRESH_SUCCESS_GUARD_SECS), + ); + info!( + dc = %dc, + ?family, + %endpoint, + old_writer_id, + idle_age_secs, + threshold_secs, + alive, + required, + "Idle writer refreshed before upstream idle timeout" + ); +} + async fn should_reduce_floor_for_idle( pool: &Arc, key: (i32, IpFamily), diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 5ae922a..8c185be 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -94,6 +94,7 @@ pub struct MePool { pub(super) me_keepalive_interval: Duration, pub(super) me_keepalive_jitter: Duration, pub(super) me_keepalive_payload_random: bool, + pub(super) rpc_proxy_req_every_secs: AtomicU64, pub(super) me_warmup_stagger_enabled: bool, pub(super) me_warmup_step_delay: Duration, pub(super) me_warmup_step_jitter: Duration, @@ -192,6 +193,7 @@ impl MePool { me_keepalive_interval_secs: u64, me_keepalive_jitter_secs: u64, me_keepalive_payload_random: bool, + rpc_proxy_req_every_secs: u64, me_warmup_stagger_enabled: bool, me_warmup_step_delay_ms: u64, me_warmup_step_jitter_ms: u64, @@ -272,6 +274,7 @@ impl MePool { me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs), me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs), me_keepalive_payload_random, + rpc_proxy_req_every_secs: AtomicU64::new(rpc_proxy_req_every_secs), me_warmup_stagger_enabled, me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms), me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms), diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 455757e..1e86ea3 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -2,6 +2,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering}; use std::time::{Duration, Instant}; +use std::io::ErrorKind; use bytes::BytesMut; use rand::Rng; @@ -12,16 +13,22 @@ use tracing::{debug, info, warn}; use crate::config::MeBindStaleMode; use crate::crypto::SecureRandom; use crate::error::{ProxyError, Result}; -use crate::protocol::constants::RPC_PING_U32; +use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32}; use super::codec::{RpcWriter, WriterCommand}; use super::pool::{MePool, MeWriter, WriterContour}; use super::reader::reader_loop; use super::registry::BoundConn; +use super::wire::build_proxy_req_payload; const ME_ACTIVE_PING_SECS: u64 = 25; const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5; +const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700; + +fn is_me_peer_closed_error(error: &ProxyError) -> bool { + matches!(error, ProxyError::Io(ioe) if ioe.kind() == ErrorKind::UnexpectedEof) +} impl MePool { pub(crate) async fn prune_closed_writers(self: &Arc) { @@ -115,6 +122,7 @@ impl MePool { allow_drain_fallback: allow_drain_fallback.clone(), }; self.writers.write().await.push(writer.clone()); + self.registry.mark_writer_idle(writer_id).await; self.conn_count.fetch_add(1, Ordering::Relaxed); self.writer_available.notify_one(); @@ -124,6 +132,7 @@ impl MePool { let ping_tracker_reader = ping_tracker.clone(); let rtt_stats = self.rtt_stats.clone(); let stats_reader = self.stats.clone(); + let stats_reader_close = self.stats.clone(); let stats_ping = self.stats.clone(); let pool = Arc::downgrade(self); let cancel_ping = cancel.clone(); @@ -135,6 +144,13 @@ impl MePool { let keepalive_enabled = self.me_keepalive_enabled; let keepalive_interval = self.me_keepalive_interval; let keepalive_jitter = self.me_keepalive_jitter; + let rpc_proxy_req_every_secs = self.rpc_proxy_req_every_secs.load(Ordering::Relaxed); + let tx_signal = tx.clone(); + let stats_signal = self.stats.clone(); + let cancel_signal = cancel.clone(); + let cleanup_for_signal = cleanup_done.clone(); + let pool_signal = Arc::downgrade(self); + let keepalive_jitter_signal = self.me_keepalive_jitter; let cancel_reader_token = cancel.clone(); let cancel_ping_token = cancel_ping.clone(); @@ -156,6 +172,15 @@ impl MePool { cancel_reader_token.clone(), ) .await; + let idle_close_by_peer = if let Err(e) = res.as_ref() { + is_me_peer_closed_error(e) && reg.is_writer_empty(writer_id).await + } else { + false + }; + if idle_close_by_peer { + stats_reader_close.increment_me_idle_close_by_peer_total(); + info!(writer_id, "ME socket closed by peer on idle writer"); + } if let Some(pool) = pool.upgrade() && cleanup_for_reader .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) @@ -164,7 +189,9 @@ impl MePool { pool.remove_writer_and_close_clients(writer_id).await; } if let Err(e) = res { - warn!(error = %e, "ME reader ended"); + if !idle_close_by_peer { + warn!(error = %e, "ME reader ended"); + } } let mut ws = writers_arc.write().await; ws.retain(|w| w.id != writer_id); @@ -253,6 +280,116 @@ impl MePool { } }); + tokio::spawn(async move { + if rpc_proxy_req_every_secs == 0 { + return; + } + + let interval = Duration::from_secs(rpc_proxy_req_every_secs); + let startup_jitter_ms = { + let jitter_cap_ms = interval.as_millis() / 2; + let effective_jitter_ms = keepalive_jitter_signal + .as_millis() + .min(jitter_cap_ms) + .max(1); + rand::rng().random_range(0..=effective_jitter_ms as u64) + }; + + tokio::select! { + _ = cancel_signal.cancelled() => return, + _ = tokio::time::sleep(Duration::from_millis(startup_jitter_ms)) => {} + } + + loop { + let wait = { + let jitter_cap_ms = interval.as_millis() / 2; + let effective_jitter_ms = keepalive_jitter_signal + .as_millis() + .min(jitter_cap_ms) + .max(1); + interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64)) + }; + + tokio::select! { + _ = cancel_signal.cancelled() => break, + _ = tokio::time::sleep(wait) => {} + } + + let Some(pool) = pool_signal.upgrade() else { + break; + }; + + let Some(meta) = pool.registry.get_last_writer_meta(writer_id).await else { + stats_signal.increment_me_rpc_proxy_req_signal_skipped_no_meta_total(); + continue; + }; + + let (conn_id, mut service_rx) = pool.registry.register().await; + pool.registry + .bind_writer(conn_id, writer_id, tx_signal.clone(), meta.clone()) + .await; + + let payload = build_proxy_req_payload( + conn_id, + meta.client_addr, + meta.our_addr, + &[], + pool.proxy_tag.as_deref(), + meta.proto_flags, + ); + + if tx_signal.send(WriterCommand::DataAndFlush(payload)).await.is_err() { + stats_signal.increment_me_rpc_proxy_req_signal_failed_total(); + let _ = pool.registry.unregister(conn_id).await; + cancel_signal.cancel(); + if cleanup_for_signal + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() + { + pool.remove_writer_and_close_clients(writer_id).await; + } + break; + } + + stats_signal.increment_me_rpc_proxy_req_signal_sent_total(); + + if matches!( + tokio::time::timeout( + Duration::from_millis(ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS), + service_rx.recv(), + ) + .await, + Ok(Some(_)) + ) { + stats_signal.increment_me_rpc_proxy_req_signal_response_total(); + } + + let mut close_payload = Vec::with_capacity(12); + close_payload.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); + close_payload.extend_from_slice(&conn_id.to_le_bytes()); + + if tx_signal + .send(WriterCommand::DataAndFlush(close_payload)) + .await + .is_err() + { + stats_signal.increment_me_rpc_proxy_req_signal_failed_total(); + let _ = pool.registry.unregister(conn_id).await; + cancel_signal.cancel(); + if cleanup_for_signal + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() + { + pool.remove_writer_and_close_clients(writer_id).await; + } + break; + } + + stats_signal.increment_me_rpc_proxy_req_signal_close_sent_total(); + let _ = pool.registry.unregister(conn_id).await; + } + }); + Ok(()) } diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 223fa71..4a66654 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -1,7 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::atomic::{AtomicU8, AtomicU64, Ordering}; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::{mpsc, RwLock}; use tokio::sync::mpsc::error::TrySendError; @@ -51,6 +51,8 @@ struct RegistryInner { writer_for_conn: HashMap, conns_for_writer: HashMap>, meta: HashMap, + last_meta_for_writer: HashMap, + writer_idle_since_epoch_secs: HashMap, } impl RegistryInner { @@ -61,6 +63,8 @@ impl RegistryInner { writer_for_conn: HashMap::new(), conns_for_writer: HashMap::new(), meta: HashMap::new(), + last_meta_for_writer: HashMap::new(), + writer_idle_since_epoch_secs: HashMap::new(), } } } @@ -74,6 +78,13 @@ pub struct ConnRegistry { } impl ConnRegistry { + fn now_epoch_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + } + pub fn new() -> Self { let start = rand::random::() | 1; Self { @@ -121,8 +132,16 @@ impl ConnRegistry { inner.map.remove(&id); inner.meta.remove(&id); if let Some(writer_id) = inner.writer_for_conn.remove(&id) { - if let Some(set) = inner.conns_for_writer.get_mut(&writer_id) { + let became_empty = if let Some(set) = inner.conns_for_writer.get_mut(&writer_id) { set.remove(&id); + set.is_empty() + } else { + false + }; + if became_empty { + inner + .writer_idle_since_epoch_secs + .insert(writer_id, Self::now_epoch_secs()); } return Some(writer_id); } @@ -191,8 +210,10 @@ impl ConnRegistry { meta: ConnMeta, ) { let mut inner = self.inner.write().await; - inner.meta.entry(conn_id).or_insert(meta); + inner.meta.entry(conn_id).or_insert(meta.clone()); inner.writer_for_conn.insert(conn_id, writer_id); + inner.last_meta_for_writer.insert(writer_id, meta); + inner.writer_idle_since_epoch_secs.remove(&writer_id); inner.writers.entry(writer_id).or_insert_with(|| tx.clone()); inner .conns_for_writer @@ -201,6 +222,25 @@ impl ConnRegistry { .insert(conn_id); } + pub async fn mark_writer_idle(&self, writer_id: u64) { + let mut inner = self.inner.write().await; + inner.conns_for_writer.entry(writer_id).or_insert_with(HashSet::new); + inner + .writer_idle_since_epoch_secs + .entry(writer_id) + .or_insert(Self::now_epoch_secs()); + } + + pub async fn get_last_writer_meta(&self, writer_id: u64) -> Option { + let inner = self.inner.read().await; + inner.last_meta_for_writer.get(&writer_id).cloned() + } + + pub async fn writer_idle_since_snapshot(&self) -> HashMap { + let inner = self.inner.read().await; + inner.writer_idle_since_epoch_secs.clone() + } + pub async fn get_writer(&self, conn_id: u64) -> Option { let inner = self.inner.read().await; let writer_id = inner.writer_for_conn.get(&conn_id).cloned()?; @@ -211,6 +251,8 @@ impl ConnRegistry { pub async fn writer_lost(&self, writer_id: u64) -> Vec { let mut inner = self.inner.write().await; inner.writers.remove(&writer_id); + inner.last_meta_for_writer.remove(&writer_id); + inner.writer_idle_since_epoch_secs.remove(&writer_id); let conns = inner .conns_for_writer .remove(&writer_id) diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 3b57c4c..ba4a419 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -1,4 +1,5 @@ use std::cmp::Reverse; +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::Ordering; @@ -18,6 +19,9 @@ use super::wire::build_proxy_req_payload; use rand::seq::SliceRandom; use super::registry::ConnMeta; +const IDLE_WRITER_PENALTY_MID_SECS: u64 = 45; +const IDLE_WRITER_PENALTY_HIGH_SECS: u64 = 55; + impl MePool { /// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default. pub async fn send_proxy_req( @@ -152,6 +156,8 @@ impl MePool { return Err(ProxyError::Proxy("No ME writers available for target DC".into())); } } + let writer_idle_since = self.registry.writer_idle_since_snapshot().await; + let now_epoch_secs = Self::now_epoch_secs(); if self.me_deterministic_writer_sort.load(Ordering::Relaxed) { candidate_indices.sort_by(|lhs, rhs| { @@ -161,6 +167,11 @@ impl MePool { self.writer_contour_rank_for_selection(left), (left.generation < self.current_generation()) as usize, left.degraded.load(Ordering::Relaxed) as usize, + self.writer_idle_rank_for_selection( + left, + &writer_idle_since, + now_epoch_secs, + ), Reverse(left.tx.capacity()), left.addr, left.id, @@ -169,6 +180,11 @@ impl MePool { self.writer_contour_rank_for_selection(right), (right.generation < self.current_generation()) as usize, right.degraded.load(Ordering::Relaxed) as usize, + self.writer_idle_rank_for_selection( + right, + &writer_idle_since, + now_epoch_secs, + ), Reverse(right.tx.capacity()), right.addr, right.id, @@ -184,6 +200,11 @@ impl MePool { self.writer_contour_rank_for_selection(w), stale, degraded as usize, + self.writer_idle_rank_for_selection( + w, + &writer_idle_since, + now_epoch_secs, + ), Reverse(w.tx.capacity()), ) }); @@ -367,4 +388,23 @@ impl MePool { WriterContour::Draining => 2, } } + + fn writer_idle_rank_for_selection( + &self, + writer: &super::pool::MeWriter, + idle_since_by_writer: &HashMap, + now_epoch_secs: u64, + ) -> usize { + let Some(idle_since) = idle_since_by_writer.get(&writer.id).copied() else { + return 0; + }; + let idle_age_secs = now_epoch_secs.saturating_sub(idle_since); + if idle_age_secs >= IDLE_WRITER_PENALTY_HIGH_SECS { + 2 + } else if idle_age_secs >= IDLE_WRITER_PENALTY_MID_SECS { + 1 + } else { + 0 + } + } }