From 4469dd9f01afa3a7a67017bb06e205bb6728392b Mon Sep 17 00:00:00 2001 From: sabraman Date: Mon, 23 Mar 2026 02:03:51 +0300 Subject: [PATCH] stabilize hot-reload across overwrite bursts --- src/config/hot_reload.rs | 230 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 217 insertions(+), 13 deletions(-) diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index e580b7f..7c82637 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -36,7 +36,9 @@ use crate::config::{ LogLevel, MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel, MeWriterPickMode, }; +const HOT_RELOAD_STABLE_SNAPSHOTS: u8 = 2; const HOT_RELOAD_DEBOUNCE: Duration = Duration::from_millis(50); +const HOT_RELOAD_STABLE_RECHECK: Duration = Duration::from_millis(75); // ── Hot fields ──────────────────────────────────────────────────────────────── @@ -335,12 +337,16 @@ impl WatchManifest { #[derive(Debug, Default)] struct ReloadState { applied_snapshot_hash: Option, + candidate_snapshot_hash: Option, + candidate_hits: u8, } impl ReloadState { fn new(applied_snapshot_hash: Option) -> Self { Self { applied_snapshot_hash, + candidate_snapshot_hash: None, + candidate_hits: 0, } } @@ -348,8 +354,32 @@ impl ReloadState { self.applied_snapshot_hash == Some(hash) } + fn observe_candidate(&mut self, hash: u64) -> u8 { + if self.candidate_snapshot_hash == Some(hash) { + self.candidate_hits = self.candidate_hits.saturating_add(1); + } else { + self.candidate_snapshot_hash = Some(hash); + self.candidate_hits = 1; + } + self.candidate_hits + } + + fn reset_candidate(&mut self) { + self.candidate_snapshot_hash = None; + self.candidate_hits = 0; + } + fn mark_applied(&mut self, hash: u64) { self.applied_snapshot_hash = Some(hash); + self.reset_candidate(); + } + + fn pending_candidate(&self) -> Option<(u64, u8)> { + let hash = self.candidate_snapshot_hash?; + if self.candidate_hits < HOT_RELOAD_STABLE_SNAPSHOTS { + return Some((hash, self.candidate_hits)); + } + None } } @@ -1176,6 +1206,7 @@ fn reload_config( let loaded = match ProxyConfig::load_with_metadata(config_path) { Ok(loaded) => loaded, Err(e) => { + reload_state.reset_candidate(); error!("config reload: failed to parse {:?}: {}", config_path, e); return None; } @@ -1188,6 +1219,7 @@ fn reload_config( let next_manifest = WatchManifest::from_source_files(&source_files); if let Err(e) = new_cfg.validate() { + reload_state.reset_candidate(); error!( "config reload: validation failed: {}; keeping old config", e @@ -1199,6 +1231,11 @@ fn reload_config( return Some(next_manifest); } + let candidate_hits = reload_state.observe_candidate(rendered_hash); + if candidate_hits < HOT_RELOAD_STABLE_SNAPSHOTS { + return Some(next_manifest); + } + let old_cfg = config_tx.borrow().clone(); let applied_cfg = overlay_hot_fields(&old_cfg, &new_cfg); let old_hot = HotFields::from_config(&old_cfg); @@ -1218,6 +1255,7 @@ fn reload_config( if old_hot.dns_overrides != applied_hot.dns_overrides && let Err(e) = crate::network::dns_overrides::install_entries(&applied_hot.dns_overrides) { + reload_state.reset_candidate(); error!( "config reload: invalid network.dns_overrides: {}; keeping old config", e @@ -1238,6 +1276,45 @@ fn reload_config( Some(next_manifest) } +async fn reload_with_internal_stable_rechecks( + config_path: &PathBuf, + config_tx: &watch::Sender>, + log_tx: &watch::Sender, + detected_ip_v4: Option, + detected_ip_v6: Option, + reload_state: &mut ReloadState, +) -> Option { + let mut next_manifest = reload_config( + config_path, + config_tx, + log_tx, + detected_ip_v4, + detected_ip_v6, + reload_state, + ); + + for _ in 0..3 { + if reload_state.pending_candidate().is_none() { + break; + } + + tokio::time::sleep(HOT_RELOAD_STABLE_RECHECK).await; + let recheck_manifest = reload_config( + config_path, + config_tx, + log_tx, + detected_ip_v4, + detected_ip_v6, + reload_state, + ); + if recheck_manifest.is_some() { + next_manifest = recheck_manifest; + } + } + + next_manifest +} + // ── Public API ──────────────────────────────────────────────────────────────── /// Spawn the hot-reload watcher task. @@ -1370,25 +1447,27 @@ pub fn spawn_config_watcher( tokio::time::sleep(HOT_RELOAD_DEBOUNCE).await; while notify_rx.try_recv().is_ok() {} - let mut next_manifest = reload_config( + let mut next_manifest = reload_with_internal_stable_rechecks( &config_path, &config_tx, &log_tx, detected_ip_v4, detected_ip_v6, &mut reload_state, - ); + ) + .await; if next_manifest.is_none() { tokio::time::sleep(HOT_RELOAD_DEBOUNCE).await; while notify_rx.try_recv().is_ok() {} - next_manifest = reload_config( + next_manifest = reload_with_internal_stable_rechecks( &config_path, &config_tx, &log_tx, detected_ip_v4, detected_ip_v6, &mut reload_state, - ); + ) + .await; } if let Some(next_manifest) = next_manifest { @@ -1439,6 +1518,10 @@ mod tests { std::fs::write(path, config).unwrap(); } + fn write_reload_config_contents(path: &Path, contents: &str) { + std::fs::write(path, contents).unwrap(); + } + fn temp_config_path(prefix: &str) -> PathBuf { let nonce = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -1527,8 +1610,8 @@ mod tests { assert!(!config_equal(&applied, &new)); } - #[test] - fn reload_applies_hot_change_on_first_observed_snapshot() { + #[tokio::test] + async fn reload_applies_hot_change_on_first_observed_snapshot() { let initial_tag = "11111111111111111111111111111111"; let final_tag = "22222222222222222222222222222222"; let path = temp_config_path("telemt_hot_reload_stable"); @@ -1543,7 +1626,16 @@ mod tests { let mut reload_state = ReloadState::new(Some(initial_hash)); write_reload_config(&path, Some(final_tag), None); - reload_config(&path, &config_tx, &log_tx, None, None, &mut reload_state).unwrap(); + reload_with_internal_stable_rechecks( + &path, + &config_tx, + &log_tx, + None, + None, + &mut reload_state, + ) + .await + .unwrap(); assert_eq!( config_tx.borrow().general.ad_tag.as_deref(), Some(final_tag) @@ -1552,8 +1644,8 @@ mod tests { let _ = std::fs::remove_file(path); } - #[test] - fn reload_keeps_hot_apply_when_non_hot_fields_change() { + #[tokio::test] + async fn reload_keeps_hot_apply_when_non_hot_fields_change() { let initial_tag = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; let final_tag = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; let path = temp_config_path("telemt_hot_reload_mixed"); @@ -1568,7 +1660,16 @@ mod tests { let mut reload_state = ReloadState::new(Some(initial_hash)); write_reload_config(&path, Some(final_tag), Some(initial_cfg.server.port + 1)); - reload_config(&path, &config_tx, &log_tx, None, None, &mut reload_state).unwrap(); + reload_with_internal_stable_rechecks( + &path, + &config_tx, + &log_tx, + None, + None, + &mut reload_state, + ) + .await + .unwrap(); let applied = config_tx.borrow().clone(); assert_eq!(applied.general.ad_tag.as_deref(), Some(final_tag)); @@ -1577,8 +1678,8 @@ mod tests { let _ = std::fs::remove_file(path); } - #[test] - fn reload_recovers_after_parse_error_on_next_attempt() { + #[tokio::test] + async fn reload_recovers_after_parse_error_on_next_attempt() { let initial_tag = "cccccccccccccccccccccccccccccccc"; let final_tag = "dddddddddddddddddddddddddddddddd"; let path = temp_config_path("telemt_hot_reload_parse_recovery"); @@ -1600,7 +1701,16 @@ mod tests { ); write_reload_config(&path, Some(final_tag), None); - reload_config(&path, &config_tx, &log_tx, None, None, &mut reload_state).unwrap(); + reload_with_internal_stable_rechecks( + &path, + &config_tx, + &log_tx, + None, + None, + &mut reload_state, + ) + .await + .unwrap(); assert_eq!( config_tx.borrow().general.ad_tag.as_deref(), Some(final_tag) @@ -1608,4 +1718,98 @@ mod tests { let _ = std::fs::remove_file(path); } + + #[tokio::test] + async fn reload_internal_rechecks_skip_partial_valid_overwrite_state() { + let path = temp_config_path("telemt_hot_reload_partial_valid_overwrite"); + let initial_contents = r#" + [general] + ad_tag = "11111111111111111111111111111111" + + [censorship] + tls_domain = "m.vk.ru" + + [access.users] + default = "00000000000000000000000000000000" + hello = "11111111111111111111111111111111" + "#; + let partial_contents = r#" + [access.users] + default = "00000000000000000000000000000000" + "#; + + write_reload_config_contents(&path, initial_contents); + let initial_cfg = Arc::new(ProxyConfig::load(&path).unwrap()); + let initial_hash = ProxyConfig::load_with_metadata(&path) + .unwrap() + .rendered_hash; + let (config_tx, _config_rx) = watch::channel(initial_cfg.clone()); + let (log_tx, _log_rx) = watch::channel(initial_cfg.general.log_level.clone()); + let mut reload_state = ReloadState::new(Some(initial_hash)); + + write_reload_config_contents(&path, partial_contents); + let final_path = path.clone(); + let final_contents = initial_contents.to_string(); + tokio::spawn(async move { + tokio::time::sleep(HOT_RELOAD_STABLE_RECHECK / 2).await; + write_reload_config_contents(&final_path, &final_contents); + }); + + reload_with_internal_stable_rechecks( + &path, + &config_tx, + &log_tx, + None, + None, + &mut reload_state, + ) + .await + .unwrap(); + + let applied = config_tx.borrow().clone(); + assert_eq!(applied.access.users, initial_cfg.access.users); + assert_eq!(applied.general.ad_tag, initial_cfg.general.ad_tag); + assert_eq!(applied.censorship.tls_domain, initial_cfg.censorship.tls_domain); + + let _ = std::fs::remove_file(path); + } + + #[tokio::test] + async fn reload_internal_rechecks_apply_last_snapshot_after_rapid_valid_updates() { + let path = temp_config_path("telemt_hot_reload_rapid_valid_updates"); + let initial_tag = "11111111111111111111111111111111"; + let middle_tag = "22222222222222222222222222222222"; + let final_tag = "33333333333333333333333333333333"; + + write_reload_config(&path, Some(initial_tag), None); + let initial_cfg = Arc::new(ProxyConfig::load(&path).unwrap()); + let initial_hash = ProxyConfig::load_with_metadata(&path) + .unwrap() + .rendered_hash; + let (config_tx, _config_rx) = watch::channel(initial_cfg.clone()); + let (log_tx, _log_rx) = watch::channel(initial_cfg.general.log_level.clone()); + let mut reload_state = ReloadState::new(Some(initial_hash)); + + write_reload_config(&path, Some(middle_tag), None); + let final_path = path.clone(); + tokio::spawn(async move { + tokio::time::sleep(HOT_RELOAD_STABLE_RECHECK / 2).await; + write_reload_config(&final_path, Some(final_tag), None); + }); + + reload_with_internal_stable_rechecks( + &path, + &config_tx, + &log_tx, + None, + None, + &mut reload_state, + ) + .await + .unwrap(); + + assert_eq!(config_tx.borrow().general.ad_tag.as_deref(), Some(final_tag)); + + let _ = std::fs::remove_file(path); + } }