stabilize hot-reload across overwrite bursts

This commit is contained in:
sabraman 2026-03-23 02:03:51 +03:00
parent e35d69c61f
commit 4469dd9f01
1 changed files with 217 additions and 13 deletions

View File

@ -36,7 +36,9 @@ use crate::config::{
LogLevel, MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel, MeWriterPickMode, LogLevel, MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel, MeWriterPickMode,
}; };
const HOT_RELOAD_STABLE_SNAPSHOTS: u8 = 2;
const HOT_RELOAD_DEBOUNCE: Duration = Duration::from_millis(50); const HOT_RELOAD_DEBOUNCE: Duration = Duration::from_millis(50);
const HOT_RELOAD_STABLE_RECHECK: Duration = Duration::from_millis(75);
// ── Hot fields ──────────────────────────────────────────────────────────────── // ── Hot fields ────────────────────────────────────────────────────────────────
@ -335,12 +337,16 @@ impl WatchManifest {
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct ReloadState { struct ReloadState {
applied_snapshot_hash: Option<u64>, applied_snapshot_hash: Option<u64>,
candidate_snapshot_hash: Option<u64>,
candidate_hits: u8,
} }
impl ReloadState { impl ReloadState {
fn new(applied_snapshot_hash: Option<u64>) -> Self { fn new(applied_snapshot_hash: Option<u64>) -> Self {
Self { Self {
applied_snapshot_hash, applied_snapshot_hash,
candidate_snapshot_hash: None,
candidate_hits: 0,
} }
} }
@ -348,8 +354,32 @@ impl ReloadState {
self.applied_snapshot_hash == Some(hash) self.applied_snapshot_hash == Some(hash)
} }
fn observe_candidate(&mut self, hash: u64) -> u8 {
if self.candidate_snapshot_hash == Some(hash) {
self.candidate_hits = self.candidate_hits.saturating_add(1);
} else {
self.candidate_snapshot_hash = Some(hash);
self.candidate_hits = 1;
}
self.candidate_hits
}
fn reset_candidate(&mut self) {
self.candidate_snapshot_hash = None;
self.candidate_hits = 0;
}
fn mark_applied(&mut self, hash: u64) { fn mark_applied(&mut self, hash: u64) {
self.applied_snapshot_hash = Some(hash); self.applied_snapshot_hash = Some(hash);
self.reset_candidate();
}
fn pending_candidate(&self) -> Option<(u64, u8)> {
let hash = self.candidate_snapshot_hash?;
if self.candidate_hits < HOT_RELOAD_STABLE_SNAPSHOTS {
return Some((hash, self.candidate_hits));
}
None
} }
} }
@ -1176,6 +1206,7 @@ fn reload_config(
let loaded = match ProxyConfig::load_with_metadata(config_path) { let loaded = match ProxyConfig::load_with_metadata(config_path) {
Ok(loaded) => loaded, Ok(loaded) => loaded,
Err(e) => { Err(e) => {
reload_state.reset_candidate();
error!("config reload: failed to parse {:?}: {}", config_path, e); error!("config reload: failed to parse {:?}: {}", config_path, e);
return None; return None;
} }
@ -1188,6 +1219,7 @@ fn reload_config(
let next_manifest = WatchManifest::from_source_files(&source_files); let next_manifest = WatchManifest::from_source_files(&source_files);
if let Err(e) = new_cfg.validate() { if let Err(e) = new_cfg.validate() {
reload_state.reset_candidate();
error!( error!(
"config reload: validation failed: {}; keeping old config", "config reload: validation failed: {}; keeping old config",
e e
@ -1199,6 +1231,11 @@ fn reload_config(
return Some(next_manifest); return Some(next_manifest);
} }
let candidate_hits = reload_state.observe_candidate(rendered_hash);
if candidate_hits < HOT_RELOAD_STABLE_SNAPSHOTS {
return Some(next_manifest);
}
let old_cfg = config_tx.borrow().clone(); let old_cfg = config_tx.borrow().clone();
let applied_cfg = overlay_hot_fields(&old_cfg, &new_cfg); let applied_cfg = overlay_hot_fields(&old_cfg, &new_cfg);
let old_hot = HotFields::from_config(&old_cfg); let old_hot = HotFields::from_config(&old_cfg);
@ -1218,6 +1255,7 @@ fn reload_config(
if old_hot.dns_overrides != applied_hot.dns_overrides if old_hot.dns_overrides != applied_hot.dns_overrides
&& let Err(e) = crate::network::dns_overrides::install_entries(&applied_hot.dns_overrides) && let Err(e) = crate::network::dns_overrides::install_entries(&applied_hot.dns_overrides)
{ {
reload_state.reset_candidate();
error!( error!(
"config reload: invalid network.dns_overrides: {}; keeping old config", "config reload: invalid network.dns_overrides: {}; keeping old config",
e e
@ -1238,6 +1276,45 @@ fn reload_config(
Some(next_manifest) Some(next_manifest)
} }
async fn reload_with_internal_stable_rechecks(
config_path: &PathBuf,
config_tx: &watch::Sender<Arc<ProxyConfig>>,
log_tx: &watch::Sender<LogLevel>,
detected_ip_v4: Option<IpAddr>,
detected_ip_v6: Option<IpAddr>,
reload_state: &mut ReloadState,
) -> Option<WatchManifest> {
let mut next_manifest = reload_config(
config_path,
config_tx,
log_tx,
detected_ip_v4,
detected_ip_v6,
reload_state,
);
for _ in 0..3 {
if reload_state.pending_candidate().is_none() {
break;
}
tokio::time::sleep(HOT_RELOAD_STABLE_RECHECK).await;
let recheck_manifest = reload_config(
config_path,
config_tx,
log_tx,
detected_ip_v4,
detected_ip_v6,
reload_state,
);
if recheck_manifest.is_some() {
next_manifest = recheck_manifest;
}
}
next_manifest
}
// ── Public API ──────────────────────────────────────────────────────────────── // ── Public API ────────────────────────────────────────────────────────────────
/// Spawn the hot-reload watcher task. /// Spawn the hot-reload watcher task.
@ -1370,25 +1447,27 @@ pub fn spawn_config_watcher(
tokio::time::sleep(HOT_RELOAD_DEBOUNCE).await; tokio::time::sleep(HOT_RELOAD_DEBOUNCE).await;
while notify_rx.try_recv().is_ok() {} while notify_rx.try_recv().is_ok() {}
let mut next_manifest = reload_config( let mut next_manifest = reload_with_internal_stable_rechecks(
&config_path, &config_path,
&config_tx, &config_tx,
&log_tx, &log_tx,
detected_ip_v4, detected_ip_v4,
detected_ip_v6, detected_ip_v6,
&mut reload_state, &mut reload_state,
); )
.await;
if next_manifest.is_none() { if next_manifest.is_none() {
tokio::time::sleep(HOT_RELOAD_DEBOUNCE).await; tokio::time::sleep(HOT_RELOAD_DEBOUNCE).await;
while notify_rx.try_recv().is_ok() {} while notify_rx.try_recv().is_ok() {}
next_manifest = reload_config( next_manifest = reload_with_internal_stable_rechecks(
&config_path, &config_path,
&config_tx, &config_tx,
&log_tx, &log_tx,
detected_ip_v4, detected_ip_v4,
detected_ip_v6, detected_ip_v6,
&mut reload_state, &mut reload_state,
); )
.await;
} }
if let Some(next_manifest) = next_manifest { if let Some(next_manifest) = next_manifest {
@ -1439,6 +1518,10 @@ mod tests {
std::fs::write(path, config).unwrap(); std::fs::write(path, config).unwrap();
} }
fn write_reload_config_contents(path: &Path, contents: &str) {
std::fs::write(path, contents).unwrap();
}
fn temp_config_path(prefix: &str) -> PathBuf { fn temp_config_path(prefix: &str) -> PathBuf {
let nonce = std::time::SystemTime::now() let nonce = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
@ -1527,8 +1610,8 @@ mod tests {
assert!(!config_equal(&applied, &new)); assert!(!config_equal(&applied, &new));
} }
#[test] #[tokio::test]
fn reload_applies_hot_change_on_first_observed_snapshot() { async fn reload_applies_hot_change_on_first_observed_snapshot() {
let initial_tag = "11111111111111111111111111111111"; let initial_tag = "11111111111111111111111111111111";
let final_tag = "22222222222222222222222222222222"; let final_tag = "22222222222222222222222222222222";
let path = temp_config_path("telemt_hot_reload_stable"); let path = temp_config_path("telemt_hot_reload_stable");
@ -1543,7 +1626,16 @@ mod tests {
let mut reload_state = ReloadState::new(Some(initial_hash)); let mut reload_state = ReloadState::new(Some(initial_hash));
write_reload_config(&path, Some(final_tag), None); write_reload_config(&path, Some(final_tag), None);
reload_config(&path, &config_tx, &log_tx, None, None, &mut reload_state).unwrap(); reload_with_internal_stable_rechecks(
&path,
&config_tx,
&log_tx,
None,
None,
&mut reload_state,
)
.await
.unwrap();
assert_eq!( assert_eq!(
config_tx.borrow().general.ad_tag.as_deref(), config_tx.borrow().general.ad_tag.as_deref(),
Some(final_tag) Some(final_tag)
@ -1552,8 +1644,8 @@ mod tests {
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }
#[test] #[tokio::test]
fn reload_keeps_hot_apply_when_non_hot_fields_change() { async fn reload_keeps_hot_apply_when_non_hot_fields_change() {
let initial_tag = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; let initial_tag = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
let final_tag = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; let final_tag = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb";
let path = temp_config_path("telemt_hot_reload_mixed"); let path = temp_config_path("telemt_hot_reload_mixed");
@ -1568,7 +1660,16 @@ mod tests {
let mut reload_state = ReloadState::new(Some(initial_hash)); let mut reload_state = ReloadState::new(Some(initial_hash));
write_reload_config(&path, Some(final_tag), Some(initial_cfg.server.port + 1)); write_reload_config(&path, Some(final_tag), Some(initial_cfg.server.port + 1));
reload_config(&path, &config_tx, &log_tx, None, None, &mut reload_state).unwrap(); reload_with_internal_stable_rechecks(
&path,
&config_tx,
&log_tx,
None,
None,
&mut reload_state,
)
.await
.unwrap();
let applied = config_tx.borrow().clone(); let applied = config_tx.borrow().clone();
assert_eq!(applied.general.ad_tag.as_deref(), Some(final_tag)); assert_eq!(applied.general.ad_tag.as_deref(), Some(final_tag));
@ -1577,8 +1678,8 @@ mod tests {
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }
#[test] #[tokio::test]
fn reload_recovers_after_parse_error_on_next_attempt() { async fn reload_recovers_after_parse_error_on_next_attempt() {
let initial_tag = "cccccccccccccccccccccccccccccccc"; let initial_tag = "cccccccccccccccccccccccccccccccc";
let final_tag = "dddddddddddddddddddddddddddddddd"; let final_tag = "dddddddddddddddddddddddddddddddd";
let path = temp_config_path("telemt_hot_reload_parse_recovery"); let path = temp_config_path("telemt_hot_reload_parse_recovery");
@ -1600,7 +1701,16 @@ mod tests {
); );
write_reload_config(&path, Some(final_tag), None); write_reload_config(&path, Some(final_tag), None);
reload_config(&path, &config_tx, &log_tx, None, None, &mut reload_state).unwrap(); reload_with_internal_stable_rechecks(
&path,
&config_tx,
&log_tx,
None,
None,
&mut reload_state,
)
.await
.unwrap();
assert_eq!( assert_eq!(
config_tx.borrow().general.ad_tag.as_deref(), config_tx.borrow().general.ad_tag.as_deref(),
Some(final_tag) Some(final_tag)
@ -1608,4 +1718,98 @@ mod tests {
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }
#[tokio::test]
async fn reload_internal_rechecks_skip_partial_valid_overwrite_state() {
let path = temp_config_path("telemt_hot_reload_partial_valid_overwrite");
let initial_contents = r#"
[general]
ad_tag = "11111111111111111111111111111111"
[censorship]
tls_domain = "m.vk.ru"
[access.users]
default = "00000000000000000000000000000000"
hello = "11111111111111111111111111111111"
"#;
let partial_contents = r#"
[access.users]
default = "00000000000000000000000000000000"
"#;
write_reload_config_contents(&path, initial_contents);
let initial_cfg = Arc::new(ProxyConfig::load(&path).unwrap());
let initial_hash = ProxyConfig::load_with_metadata(&path)
.unwrap()
.rendered_hash;
let (config_tx, _config_rx) = watch::channel(initial_cfg.clone());
let (log_tx, _log_rx) = watch::channel(initial_cfg.general.log_level.clone());
let mut reload_state = ReloadState::new(Some(initial_hash));
write_reload_config_contents(&path, partial_contents);
let final_path = path.clone();
let final_contents = initial_contents.to_string();
tokio::spawn(async move {
tokio::time::sleep(HOT_RELOAD_STABLE_RECHECK / 2).await;
write_reload_config_contents(&final_path, &final_contents);
});
reload_with_internal_stable_rechecks(
&path,
&config_tx,
&log_tx,
None,
None,
&mut reload_state,
)
.await
.unwrap();
let applied = config_tx.borrow().clone();
assert_eq!(applied.access.users, initial_cfg.access.users);
assert_eq!(applied.general.ad_tag, initial_cfg.general.ad_tag);
assert_eq!(applied.censorship.tls_domain, initial_cfg.censorship.tls_domain);
let _ = std::fs::remove_file(path);
}
#[tokio::test]
async fn reload_internal_rechecks_apply_last_snapshot_after_rapid_valid_updates() {
let path = temp_config_path("telemt_hot_reload_rapid_valid_updates");
let initial_tag = "11111111111111111111111111111111";
let middle_tag = "22222222222222222222222222222222";
let final_tag = "33333333333333333333333333333333";
write_reload_config(&path, Some(initial_tag), None);
let initial_cfg = Arc::new(ProxyConfig::load(&path).unwrap());
let initial_hash = ProxyConfig::load_with_metadata(&path)
.unwrap()
.rendered_hash;
let (config_tx, _config_rx) = watch::channel(initial_cfg.clone());
let (log_tx, _log_rx) = watch::channel(initial_cfg.general.log_level.clone());
let mut reload_state = ReloadState::new(Some(initial_hash));
write_reload_config(&path, Some(middle_tag), None);
let final_path = path.clone();
tokio::spawn(async move {
tokio::time::sleep(HOT_RELOAD_STABLE_RECHECK / 2).await;
write_reload_config(&final_path, Some(final_tag), None);
});
reload_with_internal_stable_rechecks(
&path,
&config_tx,
&log_tx,
None,
None,
&mut reload_state,
)
.await
.unwrap();
assert_eq!(config_tx.borrow().general.ad_tag.as_deref(), Some(final_tag));
let _ = std::fs::remove_file(path);
}
} }