From b8add810185ffc30221e32601f6b065e04a97fed Mon Sep 17 00:00:00 2001 From: artemws <59208085+artemws@users.noreply.github.com> Date: Fri, 20 Feb 2026 14:18:09 +0200 Subject: [PATCH 01/10] Implement hot-reload for config and log level Added hot-reload functionality for configuration and log level. --- src/main.rs | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/src/main.rs b/src/main.rs index 320c2c5..d8ab79e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,6 +27,7 @@ mod tls_front; mod util; use crate::config::{LogLevel, ProxyConfig}; +use crate::config::hot_reload::spawn_config_watcher; use crate::crypto::SecureRandom; use crate::ip_tracker::UserIpTracker; use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe}; @@ -469,6 +470,16 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai // Freeze config after possible fallback decision let config = Arc::new(config); + // ── Hot-reload watcher ──────────────────────────────────────────────── + // Spawns a background task that watches the config file and reloads it + // on SIGHUP (Unix) or every 60 seconds. Each accept-loop clones the + // receiver and calls `.borrow_and_update().clone()` per connection. + let (config_rx, mut log_level_rx) = spawn_config_watcher( + std::path::PathBuf::from(&config_path), + config.clone(), + std::time::Duration::from_secs(60), + ); + let replay_checker = Arc::new(ReplayChecker::new( config.access.replay_check_len, Duration::from_secs(config.access.replay_window_secs), @@ -760,7 +771,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai has_unix_listener = true; - let config = config.clone(); + let mut config_rx_unix = config_rx.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); @@ -779,7 +790,8 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai let conn_id = unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let fake_peer = SocketAddr::from(([127, 0, 0, 1], (conn_id % 65535) as u16)); - let config = config.clone(); + // Pick up the latest config atomically for this connection. + let config = config_rx_unix.borrow_and_update().clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); @@ -813,7 +825,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai std::process::exit(1); } - // Switch to user-configured log level after startup + // Switch to user-configured log level after startup (before starting listeners) let runtime_filter = if has_rust_log { EnvFilter::from_default_env() } else if matches!(effective_log_level, LogLevel::Silent) { @@ -825,6 +837,22 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai .reload(runtime_filter) .expect("Failed to switch log filter"); + // Apply log_level changes to the tracing reload handle. + // Note: the initial runtime_filter is applied below (after startup); this + // task handles subsequent hot-reload changes only. + tokio::spawn(async move { + loop { + if log_level_rx.changed().await.is_err() { + break; + } + let level = log_level_rx.borrow_and_update().clone(); + let new_filter = tracing_subscriber::EnvFilter::new(level.to_filter_str()); + if let Err(e) = filter_handle.reload(new_filter) { + tracing::error!("config reload: failed to update log filter: {}", e); + } + } + }); + if let Some(port) = config.server.metrics_port { let stats = stats.clone(); let whitelist = config.server.metrics_whitelist.clone(); @@ -834,7 +862,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai } for listener in listeners { - let config = config.clone(); + let mut config_rx = config_rx.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); @@ -848,7 +876,8 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai loop { match listener.accept().await { Ok((stream, peer_addr)) => { - let config = config.clone(); + // Pick up the latest config atomically for this connection. + let config = config_rx.borrow_and_update().clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); From 26cf6ff4fa25aeb3fe50d295910c16e16127fd7f Mon Sep 17 00:00:00 2001 From: artemws <59208085+artemws@users.noreply.github.com> Date: Fri, 20 Feb 2026 14:18:30 +0200 Subject: [PATCH 02/10] Add files via upload --- src/config/hot_reload.rs | 175 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 src/config/hot_reload.rs diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs new file mode 100644 index 0000000..e7b3724 --- /dev/null +++ b/src/config/hot_reload.rs @@ -0,0 +1,175 @@ +//! Hot-reload: watches the config file and reloads it on SIGHUP (Unix) +//! or on a periodic timer (all platforms). +//! +//! # What can be reloaded without restart +//! +//! | Section | Field | Effect | +//! |-----------|-------------------------------|---------------------------------| +//! | `general` | `log_level` | Filter updated via `log_level_tx` | +//! | `general` | `ad_tag` | Passed on next connection | +//! | `general` | `middle_proxy_pool_size` | Passed on next connection | +//! | `general` | `me_keepalive_*` | Passed on next connection | +//! | `access` | All user/quota fields | Effective immediately | +//! +//! Fields that require re-binding sockets (`server.port`, `censorship.*`, +//! `network.*`, `use_middle_proxy`) are **not** applied; a warning is emitted. +//! +//! # Usage +//! +//! ```rust,ignore +//! let (config_rx, log_level_rx) = spawn_config_watcher( +//! PathBuf::from("config.toml"), +//! initial_config.clone(), +//! Duration::from_secs(60), +//! ); +//! +//! // In each accept-loop, get a fresh snapshot per connection: +//! let config = config_rx.borrow_and_update().clone(); +//! +//! // In a separate task, apply log_level changes to the tracing filter: +//! tokio::spawn(async move { +//! loop { +//! log_level_rx.changed().await.ok(); +//! let level = log_level_rx.borrow().clone(); +//! filter_handle.reload(EnvFilter::new(level.to_filter_str())).ok(); +//! } +//! }); +//! ``` + +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::watch; +use tracing::{error, info, warn}; + +use crate::config::LogLevel; +use super::load::ProxyConfig; + +/// Fields that are safe to swap without restarting listeners. +#[derive(Debug, Clone, PartialEq)] +pub struct HotFields { + pub log_level: LogLevel, + pub ad_tag: Option, + pub middle_proxy_pool_size: usize, + pub me_keepalive_enabled: bool, + pub me_keepalive_interval_secs: u64, + pub me_keepalive_jitter_secs: u64, + pub me_keepalive_payload_random: bool, + pub access: crate::config::AccessConfig, +} + +impl HotFields { + pub fn from_config(cfg: &ProxyConfig) -> Self { + Self { + log_level: cfg.general.log_level.clone(), + ad_tag: cfg.general.ad_tag.clone(), + middle_proxy_pool_size: cfg.general.middle_proxy_pool_size, + me_keepalive_enabled: cfg.general.me_keepalive_enabled, + me_keepalive_interval_secs: cfg.general.me_keepalive_interval_secs, + me_keepalive_jitter_secs: cfg.general.me_keepalive_jitter_secs, + me_keepalive_payload_random: cfg.general.me_keepalive_payload_random, + access: cfg.access.clone(), + } + } +} + +/// Warn if any non-hot fields changed (i.e. require restart). +fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig) { + if old.server.port != new.server.port { + warn!( + "config reload: server.port changed ({} → {}); restart required", + old.server.port, new.server.port + ); + } + if old.censorship.tls_domain != new.censorship.tls_domain { + warn!( + "config reload: censorship.tls_domain changed ('{}' → '{}'); restart required", + old.censorship.tls_domain, new.censorship.tls_domain + ); + } + if old.network.ipv4 != new.network.ipv4 || old.network.ipv6 != new.network.ipv6 { + warn!("config reload: network.ipv4/ipv6 changed; restart required"); + } + if old.general.use_middle_proxy != new.general.use_middle_proxy { + warn!("config reload: use_middle_proxy changed; restart required"); + } +} + +/// Spawn the hot-reload watcher task. +/// +/// Returns: +/// - `watch::Receiver>` — every accept-loop should call +/// `.borrow_and_update().clone()` per accepted connection. +/// - `watch::Receiver` — caller should watch this and apply changes +/// to the `tracing` reload handle (avoids lifetime/generic issues). +pub fn spawn_config_watcher( + config_path: PathBuf, + initial: Arc, + reload_interval: Duration, +) -> (watch::Receiver>, watch::Receiver) { + let initial_level = initial.general.log_level.clone(); + let (config_tx, config_rx) = watch::channel(initial); + let (log_tx, log_rx) = watch::channel(initial_level); + + tokio::spawn(async move { + // On Unix, also listen for SIGHUP. + #[cfg(unix)] + let mut sighup = { + use tokio::signal::unix::{signal, SignalKind}; + signal(SignalKind::hangup()).expect("Failed to register SIGHUP handler") + }; + + let mut interval = tokio::time::interval(reload_interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + // Wait for either a timer tick or SIGHUP. + #[cfg(unix)] + tokio::select! { + _ = interval.tick() => {}, + _ = sighup.recv() => { + info!("SIGHUP received — reloading config from {:?}", config_path); + } + } + #[cfg(not(unix))] + interval.tick().await; + + let new_cfg = match ProxyConfig::load(&config_path) { + Ok(c) => c, + Err(e) => { + error!("config reload: failed to parse {:?}: {}", config_path, e); + continue; + } + }; + + if let Err(e) = new_cfg.validate() { + error!("config reload: validation failed: {}; keeping old config", e); + continue; + } + + let old_cfg = config_tx.borrow().clone(); + let old_hot = HotFields::from_config(&old_cfg); + let new_hot = HotFields::from_config(&new_cfg); + + if old_hot == new_hot { + // Nothing changed in hot fields — skip silent tick. + continue; + } + + warn_non_hot_changes(&old_cfg, &new_cfg); + + // Notify log_level change (caller applies it to the tracing filter). + if old_hot.log_level != new_hot.log_level { + info!("config reload: log_level → '{}'", new_hot.log_level); + log_tx.send(new_hot.log_level.clone()).ok(); + } + + // Broadcast the new config snapshot. + info!("config reload: hot changes applied"); + config_tx.send(Arc::new(new_cfg)).ok(); + } + }); + + (config_rx, log_rx) +} From 766806f5dfde890d9a6e6ff48a1bdfab866899b9 Mon Sep 17 00:00:00 2001 From: artemws <59208085+artemws@users.noreply.github.com> Date: Fri, 20 Feb 2026 14:19:04 +0200 Subject: [PATCH 03/10] Add hot_reload module to config --- src/config/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/config/mod.rs b/src/config/mod.rs index a82d92b..c7187ad 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -3,6 +3,7 @@ pub(crate) mod defaults; mod types; mod load; +pub mod hot_reload; pub use load::ProxyConfig; pub use types::*; From 26270bc6514e6850ffeb320a6d4ddff894245e8d Mon Sep 17 00:00:00 2001 From: artemws <59208085+artemws@users.noreply.github.com> Date: Fri, 20 Feb 2026 14:27:31 +0200 Subject: [PATCH 04/10] Specify types for config_rx in main.rs --- src/main.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index d8ab79e..b290a4e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -474,7 +474,10 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai // Spawns a background task that watches the config file and reloads it // on SIGHUP (Unix) or every 60 seconds. Each accept-loop clones the // receiver and calls `.borrow_and_update().clone()` per connection. - let (config_rx, mut log_level_rx) = spawn_config_watcher( + let (config_rx, mut log_level_rx): ( + tokio::sync::watch::Receiver>, + tokio::sync::watch::Receiver, + ) = spawn_config_watcher( std::path::PathBuf::from(&config_path), config.clone(), std::time::Duration::from_secs(60), @@ -771,7 +774,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai has_unix_listener = true; - let mut config_rx_unix = config_rx.clone(); + let mut config_rx_unix: tokio::sync::watch::Receiver> = config_rx.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); @@ -862,7 +865,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai } for listener in listeners { - let mut config_rx = config_rx.clone(); + let mut config_rx: tokio::sync::watch::Receiver> = config_rx.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); From 3e0dc91db6bf4fe0e6d8a2ddedd6f99608ea19cb Mon Sep 17 00:00:00 2001 From: artemws <59208085+artemws@users.noreply.github.com> Date: Fri, 20 Feb 2026 14:37:00 +0200 Subject: [PATCH 05/10] Add PartialEq to AccessConfig struct --- src/config/types.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config/types.rs b/src/config/types.rs index 9aea28a..fa69c12 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -412,7 +412,7 @@ impl Default for AntiCensorshipConfig { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct AccessConfig { #[serde(default)] pub users: HashMap, From 25b18ab0648ed7caaa0fbb875df7b0f899b920b2 Mon Sep 17 00:00:00 2001 From: artemws <59208085+artemws@users.noreply.github.com> Date: Fri, 20 Feb 2026 14:50:37 +0200 Subject: [PATCH 06/10] Enhance logging for hot reload configuration changes Added detailed logging for various configuration changes during hot reload, including log level, ad tag, middle proxy pool size, and user access changes. --- src/config/hot_reload.rs | 132 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 129 insertions(+), 3 deletions(-) diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index e7b3724..fc64bd1 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -159,14 +159,140 @@ pub fn spawn_config_watcher( warn_non_hot_changes(&old_cfg, &new_cfg); - // Notify log_level change (caller applies it to the tracing filter). + // ── Detailed diff logging ───────────────────────────────────── + + // log_level if old_hot.log_level != new_hot.log_level { - info!("config reload: log_level → '{}'", new_hot.log_level); + info!( + "config reload: log_level: '{}' → '{}'", + old_hot.log_level, new_hot.log_level + ); log_tx.send(new_hot.log_level.clone()).ok(); } + // ad_tag + if old_hot.ad_tag != new_hot.ad_tag { + info!( + "config reload: ad_tag: {} → {}", + old_hot.ad_tag.as_deref().unwrap_or("none"), + new_hot.ad_tag.as_deref().unwrap_or("none"), + ); + } + + // middle_proxy_pool_size + if old_hot.middle_proxy_pool_size != new_hot.middle_proxy_pool_size { + info!( + "config reload: middle_proxy_pool_size: {} → {}", + old_hot.middle_proxy_pool_size, new_hot.middle_proxy_pool_size, + ); + } + + // me_keepalive + if old_hot.me_keepalive_enabled != new_hot.me_keepalive_enabled + || old_hot.me_keepalive_interval_secs != new_hot.me_keepalive_interval_secs + || old_hot.me_keepalive_jitter_secs != new_hot.me_keepalive_jitter_secs + || old_hot.me_keepalive_payload_random != new_hot.me_keepalive_payload_random + { + info!( + "config reload: me_keepalive: enabled={} interval={}s jitter={}s random_payload={}", + new_hot.me_keepalive_enabled, + new_hot.me_keepalive_interval_secs, + new_hot.me_keepalive_jitter_secs, + new_hot.me_keepalive_payload_random, + ); + } + + // access.users — added / removed / changed + if old_hot.access.users != new_hot.access.users { + let added: Vec<&String> = new_hot.access.users.keys() + .filter(|u| !old_hot.access.users.contains_key(*u)) + .collect(); + let removed: Vec<&String> = old_hot.access.users.keys() + .filter(|u| !new_hot.access.users.contains_key(*u)) + .collect(); + let changed: Vec<&String> = new_hot.access.users.keys() + .filter(|u| { + old_hot.access.users.get(*u) + .map(|old_s| old_s != &new_hot.access.users[*u]) + .unwrap_or(false) + }) + .collect(); + + if !added.is_empty() { + let names: Vec<&str> = added.iter().map(|s| s.as_str()).collect(); + info!("config reload: users added: [{}]", names.join(", ")); + + // Print TG proxy links for each newly added user. + let host = new_cfg.general.links.public_host.as_deref() + .unwrap_or("YOUR_SERVER_IP"); + let port = new_cfg.general.links.public_port + .unwrap_or(new_cfg.server.port); + let tls_domain = &new_cfg.censorship.tls_domain; + let mut tls_domains = vec![tls_domain.clone()]; + for d in &new_cfg.censorship.tls_domains { + if !tls_domains.contains(d) { tls_domains.push(d.clone()); } + } + + for user in &added { + if let Some(secret) = new_hot.access.users.get(*user) { + info!(target: "telemt::links", "--- New user: {} ---", user); + if new_cfg.general.modes.classic { + info!( + target: "telemt::links", + " Classic: tg://proxy?server={}&port={}&secret={}", + host, port, secret + ); + } + if new_cfg.general.modes.secure { + info!( + target: "telemt::links", + " DD: tg://proxy?server={}&port={}&secret=dd{}", + host, port, secret + ); + } + if new_cfg.general.modes.tls { + for domain in &tls_domains { + let domain_hex = hex::encode(domain.as_bytes()); + info!( + target: "telemt::links", + " EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}", + host, port, secret, domain_hex + ); + } + } + info!(target: "telemt::links", "--------------------"); + } + } + } + if !removed.is_empty() { + let names: Vec<&str> = removed.iter().map(|s| s.as_str()).collect(); + info!("config reload: users removed: [{}]", names.join(", ")); + } + if !changed.is_empty() { + let names: Vec<&str> = changed.iter().map(|s| s.as_str()).collect(); + info!("config reload: users secret changed: [{}]", names.join(", ")); + } + } + + // access quotas / limits + if old_hot.access.user_max_tcp_conns != new_hot.access.user_max_tcp_conns { + info!("config reload: user_max_tcp_conns updated ({} entries)", + new_hot.access.user_max_tcp_conns.len()); + } + if old_hot.access.user_expirations != new_hot.access.user_expirations { + info!("config reload: user_expirations updated ({} entries)", + new_hot.access.user_expirations.len()); + } + if old_hot.access.user_data_quota != new_hot.access.user_data_quota { + info!("config reload: user_data_quota updated ({} entries)", + new_hot.access.user_data_quota.len()); + } + if old_hot.access.user_max_unique_ips != new_hot.access.user_max_unique_ips { + info!("config reload: user_max_unique_ips updated ({} entries)", + new_hot.access.user_max_unique_ips.len()); + } + // Broadcast the new config snapshot. - info!("config reload: hot changes applied"); config_tx.send(Arc::new(new_cfg)).ok(); } }); From 82bb93e8da138ee7dbe6df7ae76466ae8beeae82 Mon Sep 17 00:00:00 2001 From: artemws <59208085+artemws@users.noreply.github.com> Date: Fri, 20 Feb 2026 15:28:58 +0200 Subject: [PATCH 07/10] Add notify dependency for macOS file events --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index ed1e824..62f32d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ anyhow = "1.0" # HTTP reqwest = { version = "0.12", features = ["rustls-tls"], default-features = false } +notify = { version = "6", features = ["macos_fsevent"] } hyper = { version = "1", features = ["server", "http1"] } hyper-util = { version = "0.1", features = ["tokio", "server-auto"] } http-body-util = "0.1" From 0f6621d3590793691894d14a4876e9dbd274eae5 Mon Sep 17 00:00:00 2001 From: artemws <59208085+artemws@users.noreply.github.com> Date: Fri, 20 Feb 2026 15:29:20 +0200 Subject: [PATCH 08/10] Refactor hot-reload watcher implementation --- src/main.rs | 34 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/src/main.rs b/src/main.rs index b290a4e..4b33c9f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -470,19 +470,6 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai // Freeze config after possible fallback decision let config = Arc::new(config); - // ── Hot-reload watcher ──────────────────────────────────────────────── - // Spawns a background task that watches the config file and reloads it - // on SIGHUP (Unix) or every 60 seconds. Each accept-loop clones the - // receiver and calls `.borrow_and_update().clone()` per connection. - let (config_rx, mut log_level_rx): ( - tokio::sync::watch::Receiver>, - tokio::sync::watch::Receiver, - ) = spawn_config_watcher( - std::path::PathBuf::from(&config_path), - config.clone(), - std::time::Duration::from_secs(60), - ); - let replay_checker = Arc::new(ReplayChecker::new( config.access.replay_check_len, Duration::from_secs(config.access.replay_window_secs), @@ -670,6 +657,19 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai detected_ip_v4, detected_ip_v6 ); + // ── Hot-reload watcher ──────────────────────────────────────────────── + // Uses inotify to detect file changes instantly (SIGHUP also works). + // detected_ip_v4/v6 are passed so newly added users get correct TG links. + let (config_rx, mut log_level_rx): ( + tokio::sync::watch::Receiver>, + tokio::sync::watch::Receiver, + ) = spawn_config_watcher( + std::path::PathBuf::from(&config_path), + config.clone(), + detected_ip_v4, + detected_ip_v6, + ); + let mut listeners = Vec::new(); for listener_conf in &config.server.listeners { @@ -793,7 +793,6 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai let conn_id = unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let fake_peer = SocketAddr::from(([127, 0, 0, 1], (conn_id % 65535) as u16)); - // Pick up the latest config atomically for this connection. let config = config_rx_unix.borrow_and_update().clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); @@ -828,7 +827,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai std::process::exit(1); } - // Switch to user-configured log level after startup (before starting listeners) + // Switch to user-configured log level after startup let runtime_filter = if has_rust_log { EnvFilter::from_default_env() } else if matches!(effective_log_level, LogLevel::Silent) { @@ -840,9 +839,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai .reload(runtime_filter) .expect("Failed to switch log filter"); - // Apply log_level changes to the tracing reload handle. - // Note: the initial runtime_filter is applied below (after startup); this - // task handles subsequent hot-reload changes only. + // Apply log_level changes from hot-reload to the tracing filter. tokio::spawn(async move { loop { if log_level_rx.changed().await.is_err() { @@ -879,7 +876,6 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai loop { match listener.accept().await { Ok((stream, peer_addr)) => { - // Pick up the latest config atomically for this connection. let config = config_rx.borrow_and_update().clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); From 953fab68c4e61f307148db821a4737f015437046 Mon Sep 17 00:00:00 2001 From: artemws <59208085+artemws@users.noreply.github.com> Date: Fri, 20 Feb 2026 15:29:37 +0200 Subject: [PATCH 09/10] Refactor hot-reload mechanism to use notify crate Updated hot-reload functionality to use notify crate for file watching and improved documentation. --- src/config/hot_reload.rs | 559 ++++++++++++++++++++++++--------------- 1 file changed, 342 insertions(+), 217 deletions(-) diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index fc64bd1..8278312 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -1,51 +1,33 @@ -//! Hot-reload: watches the config file and reloads it on SIGHUP (Unix) -//! or on a periodic timer (all platforms). +//! Hot-reload: watches the config file via inotify (Linux) / FSEvents (macOS) +//! / ReadDirectoryChangesW (Windows) using the `notify` crate. +//! SIGHUP is also supported on Unix as an additional manual trigger. //! //! # What can be reloaded without restart //! -//! | Section | Field | Effect | -//! |-----------|-------------------------------|---------------------------------| +//! | Section | Field | Effect | +//! |-----------|-------------------------------|-----------------------------------| //! | `general` | `log_level` | Filter updated via `log_level_tx` | -//! | `general` | `ad_tag` | Passed on next connection | -//! | `general` | `middle_proxy_pool_size` | Passed on next connection | -//! | `general` | `me_keepalive_*` | Passed on next connection | -//! | `access` | All user/quota fields | Effective immediately | +//! | `general` | `ad_tag` | Passed on next connection | +//! | `general` | `middle_proxy_pool_size` | Passed on next connection | +//! | `general` | `me_keepalive_*` | Passed on next connection | +//! | `access` | All user/quota fields | Effective immediately | //! //! Fields that require re-binding sockets (`server.port`, `censorship.*`, //! `network.*`, `use_middle_proxy`) are **not** applied; a warning is emitted. -//! -//! # Usage -//! -//! ```rust,ignore -//! let (config_rx, log_level_rx) = spawn_config_watcher( -//! PathBuf::from("config.toml"), -//! initial_config.clone(), -//! Duration::from_secs(60), -//! ); -//! -//! // In each accept-loop, get a fresh snapshot per connection: -//! let config = config_rx.borrow_and_update().clone(); -//! -//! // In a separate task, apply log_level changes to the tracing filter: -//! tokio::spawn(async move { -//! loop { -//! log_level_rx.changed().await.ok(); -//! let level = log_level_rx.borrow().clone(); -//! filter_handle.reload(EnvFilter::new(level.to_filter_str())).ok(); -//! } -//! }); -//! ``` +use std::net::IpAddr; use std::path::PathBuf; use std::sync::Arc; -use std::time::Duration; -use tokio::sync::watch; +use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher}; +use tokio::sync::{mpsc, watch}; use tracing::{error, info, warn}; use crate::config::LogLevel; use super::load::ProxyConfig; +// ── Hot fields ──────────────────────────────────────────────────────────────── + /// Fields that are safe to swap without restarting listeners. #[derive(Debug, Clone, PartialEq)] pub struct HotFields { @@ -74,7 +56,9 @@ impl HotFields { } } -/// Warn if any non-hot fields changed (i.e. require restart). +// ── Helpers ─────────────────────────────────────────────────────────────────── + +/// Warn if any non-hot fields changed (require restart). fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig) { if old.server.port != new.server.port { warn!( @@ -96,206 +80,347 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig) { } } +/// Resolve the public host for link generation — mirrors the logic in main.rs. +/// +/// Priority: +/// 1. `[general.links] public_host` — explicit override in config +/// 2. `detected_ip_v4` — from STUN/interface probe at startup +/// 3. `detected_ip_v6` — fallback +/// 4. `"UNKNOWN"` — warn the user to set `public_host` +fn resolve_link_host( + cfg: &ProxyConfig, + detected_ip_v4: Option, + detected_ip_v6: Option, +) -> String { + if let Some(ref h) = cfg.general.links.public_host { + return h.clone(); + } + detected_ip_v4 + .or(detected_ip_v6) + .map(|ip| ip.to_string()) + .unwrap_or_else(|| { + warn!( + "config reload: could not determine public IP for proxy links. \ + Set [general.links] public_host in config." + ); + "UNKNOWN".to_string() + }) +} + +/// Print TG proxy links for a single user — mirrors print_proxy_links() in main.rs. +fn print_user_links(user: &str, secret: &str, host: &str, port: u16, cfg: &ProxyConfig) { + info!(target: "telemt::links", "--- New user: {} ---", user); + if cfg.general.modes.classic { + info!( + target: "telemt::links", + " Classic: tg://proxy?server={}&port={}&secret={}", + host, port, secret + ); + } + if cfg.general.modes.secure { + info!( + target: "telemt::links", + " DD: tg://proxy?server={}&port={}&secret=dd{}", + host, port, secret + ); + } + if cfg.general.modes.tls { + let mut domains = vec![cfg.censorship.tls_domain.clone()]; + for d in &cfg.censorship.tls_domains { + if !domains.contains(d) { + domains.push(d.clone()); + } + } + for domain in &domains { + let domain_hex = hex::encode(domain.as_bytes()); + info!( + target: "telemt::links", + " EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}", + host, port, secret, domain_hex + ); + } + } + info!(target: "telemt::links", "--------------------"); +} + +/// Log all detected changes and emit TG links for new users. +fn log_changes( + old_hot: &HotFields, + new_hot: &HotFields, + new_cfg: &ProxyConfig, + log_tx: &watch::Sender, + detected_ip_v4: Option, + detected_ip_v6: Option, +) { + if old_hot.log_level != new_hot.log_level { + info!( + "config reload: log_level: '{}' → '{}'", + old_hot.log_level, new_hot.log_level + ); + log_tx.send(new_hot.log_level.clone()).ok(); + } + + if old_hot.ad_tag != new_hot.ad_tag { + info!( + "config reload: ad_tag: {} → {}", + old_hot.ad_tag.as_deref().unwrap_or("none"), + new_hot.ad_tag.as_deref().unwrap_or("none"), + ); + } + + if old_hot.middle_proxy_pool_size != new_hot.middle_proxy_pool_size { + info!( + "config reload: middle_proxy_pool_size: {} → {}", + old_hot.middle_proxy_pool_size, new_hot.middle_proxy_pool_size, + ); + } + + if old_hot.me_keepalive_enabled != new_hot.me_keepalive_enabled + || old_hot.me_keepalive_interval_secs != new_hot.me_keepalive_interval_secs + || old_hot.me_keepalive_jitter_secs != new_hot.me_keepalive_jitter_secs + || old_hot.me_keepalive_payload_random != new_hot.me_keepalive_payload_random + { + info!( + "config reload: me_keepalive: enabled={} interval={}s jitter={}s random_payload={}", + new_hot.me_keepalive_enabled, + new_hot.me_keepalive_interval_secs, + new_hot.me_keepalive_jitter_secs, + new_hot.me_keepalive_payload_random, + ); + } + + if old_hot.access.users != new_hot.access.users { + let mut added: Vec<&String> = new_hot.access.users.keys() + .filter(|u| !old_hot.access.users.contains_key(*u)) + .collect(); + added.sort(); + + let mut removed: Vec<&String> = old_hot.access.users.keys() + .filter(|u| !new_hot.access.users.contains_key(*u)) + .collect(); + removed.sort(); + + let mut changed: Vec<&String> = new_hot.access.users.keys() + .filter(|u| { + old_hot.access.users.get(*u) + .map(|s| s != &new_hot.access.users[*u]) + .unwrap_or(false) + }) + .collect(); + changed.sort(); + + if !added.is_empty() { + info!( + "config reload: users added: [{}]", + added.iter().map(|s| s.as_str()).collect::>().join(", ") + ); + let host = resolve_link_host(new_cfg, detected_ip_v4, detected_ip_v6); + let port = new_cfg.general.links.public_port.unwrap_or(new_cfg.server.port); + for user in &added { + if let Some(secret) = new_hot.access.users.get(*user) { + print_user_links(user, secret, &host, port, new_cfg); + } + } + } + if !removed.is_empty() { + info!( + "config reload: users removed: [{}]", + removed.iter().map(|s| s.as_str()).collect::>().join(", ") + ); + } + if !changed.is_empty() { + info!( + "config reload: users secret changed: [{}]", + changed.iter().map(|s| s.as_str()).collect::>().join(", ") + ); + } + } + + if old_hot.access.user_max_tcp_conns != new_hot.access.user_max_tcp_conns { + info!( + "config reload: user_max_tcp_conns updated ({} entries)", + new_hot.access.user_max_tcp_conns.len() + ); + } + if old_hot.access.user_expirations != new_hot.access.user_expirations { + info!( + "config reload: user_expirations updated ({} entries)", + new_hot.access.user_expirations.len() + ); + } + if old_hot.access.user_data_quota != new_hot.access.user_data_quota { + info!( + "config reload: user_data_quota updated ({} entries)", + new_hot.access.user_data_quota.len() + ); + } + if old_hot.access.user_max_unique_ips != new_hot.access.user_max_unique_ips { + info!( + "config reload: user_max_unique_ips updated ({} entries)", + new_hot.access.user_max_unique_ips.len() + ); + } +} + +/// Load config, validate, diff against current, and broadcast if changed. +fn reload_config( + config_path: &PathBuf, + config_tx: &watch::Sender>, + log_tx: &watch::Sender, + detected_ip_v4: Option, + detected_ip_v6: Option, +) { + let new_cfg = match ProxyConfig::load(config_path) { + Ok(c) => c, + Err(e) => { + error!("config reload: failed to parse {:?}: {}", config_path, e); + return; + } + }; + + if let Err(e) = new_cfg.validate() { + error!("config reload: validation failed: {}; keeping old config", e); + return; + } + + let old_cfg = config_tx.borrow().clone(); + let old_hot = HotFields::from_config(&old_cfg); + let new_hot = HotFields::from_config(&new_cfg); + + if old_hot == new_hot { + return; + } + + warn_non_hot_changes(&old_cfg, &new_cfg); + log_changes(&old_hot, &new_hot, &new_cfg, log_tx, detected_ip_v4, detected_ip_v6); + config_tx.send(Arc::new(new_cfg)).ok(); +} + +// ── Public API ──────────────────────────────────────────────────────────────── + /// Spawn the hot-reload watcher task. /// -/// Returns: -/// - `watch::Receiver>` — every accept-loop should call -/// `.borrow_and_update().clone()` per accepted connection. -/// - `watch::Receiver` — caller should watch this and apply changes -/// to the `tracing` reload handle (avoids lifetime/generic issues). +/// Uses `notify` (inotify on Linux) to detect file changes instantly. +/// SIGHUP is also handled on Unix as an additional manual trigger. +/// +/// `detected_ip_v4` / `detected_ip_v6` are the IPs discovered during the +/// startup probe — used when generating proxy links for newly added users, +/// matching the same logic as the startup output. pub fn spawn_config_watcher( config_path: PathBuf, initial: Arc, - reload_interval: Duration, + detected_ip_v4: Option, + detected_ip_v6: Option, ) -> (watch::Receiver>, watch::Receiver) { let initial_level = initial.general.log_level.clone(); let (config_tx, config_rx) = watch::channel(initial); let (log_tx, log_rx) = watch::channel(initial_level); - tokio::spawn(async move { - // On Unix, also listen for SIGHUP. - #[cfg(unix)] - let mut sighup = { - use tokio::signal::unix::{signal, SignalKind}; - signal(SignalKind::hangup()).expect("Failed to register SIGHUP handler") - }; + // Bridge: sync notify callback → async task via mpsc. + let (notify_tx, mut notify_rx) = mpsc::channel::<()>(4); - let mut interval = tokio::time::interval(reload_interval); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // Watch the parent directory rather than the file itself, because many + // editors (vim, nano, systemd-sysusers) write via rename, which would + // cause inotify to lose track of the original inode. + let watch_dir = config_path + .parent() + .unwrap_or_else(|| std::path::Path::new(".")) + .to_path_buf(); - loop { - // Wait for either a timer tick or SIGHUP. - #[cfg(unix)] - tokio::select! { - _ = interval.tick() => {}, - _ = sighup.recv() => { - info!("SIGHUP received — reloading config from {:?}", config_path); - } - } - #[cfg(not(unix))] - interval.tick().await; + let config_file = config_path.clone(); + let tx_clone = notify_tx.clone(); - let new_cfg = match ProxyConfig::load(&config_path) { - Ok(c) => c, - Err(e) => { - error!("config reload: failed to parse {:?}: {}", config_path, e); - continue; - } - }; + let watcher_result = recommended_watcher(move |res: notify::Result| { + let Ok(event) = res else { return }; - if let Err(e) = new_cfg.validate() { - error!("config reload: validation failed: {}; keeping old config", e); - continue; - } - - let old_cfg = config_tx.borrow().clone(); - let old_hot = HotFields::from_config(&old_cfg); - let new_hot = HotFields::from_config(&new_cfg); - - if old_hot == new_hot { - // Nothing changed in hot fields — skip silent tick. - continue; - } - - warn_non_hot_changes(&old_cfg, &new_cfg); - - // ── Detailed diff logging ───────────────────────────────────── - - // log_level - if old_hot.log_level != new_hot.log_level { - info!( - "config reload: log_level: '{}' → '{}'", - old_hot.log_level, new_hot.log_level - ); - log_tx.send(new_hot.log_level.clone()).ok(); - } - - // ad_tag - if old_hot.ad_tag != new_hot.ad_tag { - info!( - "config reload: ad_tag: {} → {}", - old_hot.ad_tag.as_deref().unwrap_or("none"), - new_hot.ad_tag.as_deref().unwrap_or("none"), - ); - } - - // middle_proxy_pool_size - if old_hot.middle_proxy_pool_size != new_hot.middle_proxy_pool_size { - info!( - "config reload: middle_proxy_pool_size: {} → {}", - old_hot.middle_proxy_pool_size, new_hot.middle_proxy_pool_size, - ); - } - - // me_keepalive - if old_hot.me_keepalive_enabled != new_hot.me_keepalive_enabled - || old_hot.me_keepalive_interval_secs != new_hot.me_keepalive_interval_secs - || old_hot.me_keepalive_jitter_secs != new_hot.me_keepalive_jitter_secs - || old_hot.me_keepalive_payload_random != new_hot.me_keepalive_payload_random - { - info!( - "config reload: me_keepalive: enabled={} interval={}s jitter={}s random_payload={}", - new_hot.me_keepalive_enabled, - new_hot.me_keepalive_interval_secs, - new_hot.me_keepalive_jitter_secs, - new_hot.me_keepalive_payload_random, - ); - } - - // access.users — added / removed / changed - if old_hot.access.users != new_hot.access.users { - let added: Vec<&String> = new_hot.access.users.keys() - .filter(|u| !old_hot.access.users.contains_key(*u)) - .collect(); - let removed: Vec<&String> = old_hot.access.users.keys() - .filter(|u| !new_hot.access.users.contains_key(*u)) - .collect(); - let changed: Vec<&String> = new_hot.access.users.keys() - .filter(|u| { - old_hot.access.users.get(*u) - .map(|old_s| old_s != &new_hot.access.users[*u]) - .unwrap_or(false) - }) - .collect(); - - if !added.is_empty() { - let names: Vec<&str> = added.iter().map(|s| s.as_str()).collect(); - info!("config reload: users added: [{}]", names.join(", ")); - - // Print TG proxy links for each newly added user. - let host = new_cfg.general.links.public_host.as_deref() - .unwrap_or("YOUR_SERVER_IP"); - let port = new_cfg.general.links.public_port - .unwrap_or(new_cfg.server.port); - let tls_domain = &new_cfg.censorship.tls_domain; - let mut tls_domains = vec![tls_domain.clone()]; - for d in &new_cfg.censorship.tls_domains { - if !tls_domains.contains(d) { tls_domains.push(d.clone()); } - } - - for user in &added { - if let Some(secret) = new_hot.access.users.get(*user) { - info!(target: "telemt::links", "--- New user: {} ---", user); - if new_cfg.general.modes.classic { - info!( - target: "telemt::links", - " Classic: tg://proxy?server={}&port={}&secret={}", - host, port, secret - ); - } - if new_cfg.general.modes.secure { - info!( - target: "telemt::links", - " DD: tg://proxy?server={}&port={}&secret=dd{}", - host, port, secret - ); - } - if new_cfg.general.modes.tls { - for domain in &tls_domains { - let domain_hex = hex::encode(domain.as_bytes()); - info!( - target: "telemt::links", - " EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}", - host, port, secret, domain_hex - ); - } - } - info!(target: "telemt::links", "--------------------"); - } - } - } - if !removed.is_empty() { - let names: Vec<&str> = removed.iter().map(|s| s.as_str()).collect(); - info!("config reload: users removed: [{}]", names.join(", ")); - } - if !changed.is_empty() { - let names: Vec<&str> = changed.iter().map(|s| s.as_str()).collect(); - info!("config reload: users secret changed: [{}]", names.join(", ")); - } - } - - // access quotas / limits - if old_hot.access.user_max_tcp_conns != new_hot.access.user_max_tcp_conns { - info!("config reload: user_max_tcp_conns updated ({} entries)", - new_hot.access.user_max_tcp_conns.len()); - } - if old_hot.access.user_expirations != new_hot.access.user_expirations { - info!("config reload: user_expirations updated ({} entries)", - new_hot.access.user_expirations.len()); - } - if old_hot.access.user_data_quota != new_hot.access.user_data_quota { - info!("config reload: user_data_quota updated ({} entries)", - new_hot.access.user_data_quota.len()); - } - if old_hot.access.user_max_unique_ips != new_hot.access.user_max_unique_ips { - info!("config reload: user_max_unique_ips updated ({} entries)", - new_hot.access.user_max_unique_ips.len()); - } - - // Broadcast the new config snapshot. - config_tx.send(Arc::new(new_cfg)).ok(); + let is_our_file = event.paths.iter().any(|p| p == &config_file); + if !is_our_file { + return; + } + let relevant = matches!( + event.kind, + EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) + ); + if relevant { + let _ = tx_clone.try_send(()); } }); + match watcher_result { + Ok(mut watcher) => { + match watcher.watch(&watch_dir, RecursiveMode::NonRecursive) { + Ok(()) => info!("config watcher: watching {:?} via inotify", config_path), + Err(e) => warn!( + "config watcher: failed to watch {:?}: {}; use SIGHUP to reload", + watch_dir, e + ), + } + + tokio::spawn(async move { + let _watcher = watcher; // keep alive + + #[cfg(unix)] + let mut sighup = { + use tokio::signal::unix::{SignalKind, signal}; + signal(SignalKind::hangup()).expect("Failed to register SIGHUP handler") + }; + + loop { + #[cfg(unix)] + tokio::select! { + msg = notify_rx.recv() => { + if msg.is_none() { break; } + } + _ = sighup.recv() => { + info!("SIGHUP received — reloading {:?}", config_path); + } + } + #[cfg(not(unix))] + if notify_rx.recv().await.is_none() { break; } + + // Debounce: drain extra events fired within 50ms. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + while notify_rx.try_recv().is_ok() {} + + reload_config( + &config_path, + &config_tx, + &log_tx, + detected_ip_v4, + detected_ip_v6, + ); + } + }); + } + Err(e) => { + warn!( + "config watcher: inotify unavailable ({}); only SIGHUP will trigger reload", + e + ); + // Fall back to SIGHUP-only. + tokio::spawn(async move { + #[cfg(unix)] + { + use tokio::signal::unix::{SignalKind, signal}; + let mut sighup = signal(SignalKind::hangup()) + .expect("Failed to register SIGHUP handler"); + loop { + sighup.recv().await; + info!("SIGHUP received — reloading {:?}", config_path); + reload_config( + &config_path, + &config_tx, + &log_tx, + detected_ip_v4, + detected_ip_v6, + ); + } + } + #[cfg(not(unix))] + let _ = (config_tx, log_tx, config_path); + }); + } + } + (config_rx, log_rx) } From ea88a40c8fd8bcf78fb70366999968e4fa80934b Mon Sep 17 00:00:00 2001 From: artemws <59208085+artemws@users.noreply.github.com> Date: Fri, 20 Feb 2026 15:37:44 +0200 Subject: [PATCH 10/10] Add config path canonicalization Canonicalize the config path to match notify events. --- src/config/hot_reload.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 8278312..246f8a3 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -319,6 +319,13 @@ pub fn spawn_config_watcher( // Bridge: sync notify callback → async task via mpsc. let (notify_tx, mut notify_rx) = mpsc::channel::<()>(4); + // Canonicalize the config path so it matches what notify returns in events + // (notify always gives absolute paths, but config_path may be relative). + let config_path = match config_path.canonicalize() { + Ok(p) => p, + Err(_) => config_path.to_path_buf(), // file doesn't exist yet, use as-is + }; + // Watch the parent directory rather than the file itself, because many // editors (vim, nano, systemd-sysusers) write via rename, which would // cause inotify to lose track of the original inode.