ReRoute state in API

This commit is contained in:
Alexey 2026-03-10 00:59:25 +03:00
parent 6fa01d4c36
commit 959d385015
No known key found for this signature in database
4 changed files with 50 additions and 1 deletions

View File

@ -16,6 +16,7 @@ use tracing::{debug, info, warn};
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
use crate::ip_tracker::UserIpTracker; use crate::ip_tracker::UserIpTracker;
use crate::proxy::route_mode::RouteRuntimeController;
use crate::startup::StartupTracker; use crate::startup::StartupTracker;
use crate::stats::Stats; use crate::stats::Stats;
use crate::transport::middle_proxy::MePool; use crate::transport::middle_proxy::MePool;
@ -84,6 +85,7 @@ pub(super) struct ApiShared {
pub(super) request_id: Arc<AtomicU64>, pub(super) request_id: Arc<AtomicU64>,
pub(super) runtime_state: Arc<ApiRuntimeState>, pub(super) runtime_state: Arc<ApiRuntimeState>,
pub(super) startup_tracker: Arc<StartupTracker>, pub(super) startup_tracker: Arc<StartupTracker>,
pub(super) route_runtime: Arc<RouteRuntimeController>,
} }
impl ApiShared { impl ApiShared {
@ -101,6 +103,7 @@ pub async fn serve(
stats: Arc<Stats>, stats: Arc<Stats>,
ip_tracker: Arc<UserIpTracker>, ip_tracker: Arc<UserIpTracker>,
me_pool: Arc<RwLock<Option<Arc<MePool>>>>, me_pool: Arc<RwLock<Option<Arc<MePool>>>>,
route_runtime: Arc<RouteRuntimeController>,
upstream_manager: Arc<UpstreamManager>, upstream_manager: Arc<UpstreamManager>,
config_rx: watch::Receiver<Arc<ProxyConfig>>, config_rx: watch::Receiver<Arc<ProxyConfig>>,
admission_rx: watch::Receiver<bool>, admission_rx: watch::Receiver<bool>,
@ -147,6 +150,7 @@ pub async fn serve(
request_id: Arc::new(AtomicU64::new(1)), request_id: Arc::new(AtomicU64::new(1)),
runtime_state: runtime_state.clone(), runtime_state: runtime_state.clone(),
startup_tracker, startup_tracker,
route_runtime,
}); });
spawn_runtime_watchers( spawn_runtime_watchers(

View File

@ -3,6 +3,7 @@ use std::sync::atomic::Ordering;
use serde::Serialize; use serde::Serialize;
use crate::config::{MeFloorMode, MeWriterPickMode, ProxyConfig, UserMaxUniqueIpsMode}; use crate::config::{MeFloorMode, MeWriterPickMode, ProxyConfig, UserMaxUniqueIpsMode};
use crate::proxy::route_mode::RelayRouteMode;
use super::ApiShared; use super::ApiShared;
use super::runtime_init::build_runtime_startup_summary; use super::runtime_init::build_runtime_startup_summary;
@ -35,6 +36,10 @@ pub(super) struct RuntimeGatesData {
pub(super) me_runtime_ready: bool, pub(super) me_runtime_ready: bool,
pub(super) me2dc_fallback_enabled: bool, pub(super) me2dc_fallback_enabled: bool,
pub(super) use_middle_proxy: bool, pub(super) use_middle_proxy: bool,
pub(super) route_mode: &'static str,
pub(super) reroute_active: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) reroute_to_direct_at_epoch_secs: Option<u64>,
pub(super) startup_status: &'static str, pub(super) startup_status: &'static str,
pub(super) startup_stage: String, pub(super) startup_stage: String,
pub(super) startup_progress_pct: f64, pub(super) startup_progress_pct: f64,
@ -157,6 +162,16 @@ pub(super) async fn build_runtime_gates_data(
cfg: &ProxyConfig, cfg: &ProxyConfig,
) -> RuntimeGatesData { ) -> RuntimeGatesData {
let startup_summary = build_runtime_startup_summary(shared).await; let startup_summary = build_runtime_startup_summary(shared).await;
let route_state = shared.route_runtime.snapshot();
let route_mode = route_state.mode.as_str();
let reroute_active = cfg.general.use_middle_proxy
&& cfg.general.me2dc_fallback
&& matches!(route_state.mode, RelayRouteMode::Direct);
let reroute_to_direct_at_epoch_secs = if reroute_active {
shared.route_runtime.direct_since_epoch_secs()
} else {
None
};
let me_runtime_ready = if !cfg.general.use_middle_proxy { let me_runtime_ready = if !cfg.general.use_middle_proxy {
true true
} else { } else {
@ -175,6 +190,9 @@ pub(super) async fn build_runtime_gates_data(
me_runtime_ready, me_runtime_ready,
me2dc_fallback_enabled: cfg.general.me2dc_fallback, me2dc_fallback_enabled: cfg.general.me2dc_fallback,
use_middle_proxy: cfg.general.use_middle_proxy, use_middle_proxy: cfg.general.use_middle_proxy,
route_mode,
reroute_active,
reroute_to_direct_at_epoch_secs,
startup_status: startup_summary.status, startup_status: startup_summary.status,
startup_stage: startup_summary.stage, startup_stage: startup_summary.stage,
startup_progress_pct: startup_summary.progress_pct, startup_progress_pct: startup_summary.progress_pct,

View File

@ -220,6 +220,7 @@ pub async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
let ip_tracker_api = ip_tracker.clone(); let ip_tracker_api = ip_tracker.clone();
let me_pool_api = api_me_pool.clone(); let me_pool_api = api_me_pool.clone();
let upstream_manager_api = upstream_manager.clone(); let upstream_manager_api = upstream_manager.clone();
let route_runtime_api = route_runtime.clone();
let config_rx_api = api_config_rx.clone(); let config_rx_api = api_config_rx.clone();
let admission_rx_api = admission_rx.clone(); let admission_rx_api = admission_rx.clone();
let config_path_api = std::path::PathBuf::from(&config_path); let config_path_api = std::path::PathBuf::from(&config_path);
@ -231,6 +232,7 @@ pub async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
stats_api, stats_api,
ip_tracker_api, ip_tracker_api,
me_pool_api, me_pool_api,
route_runtime_api,
upstream_manager_api, upstream_manager_api,
config_rx_api, config_rx_api,
admission_rx_api, admission_rx_api,

View File

@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicU8, AtomicU64, Ordering}; use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
use std::time::Duration; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::watch; use tokio::sync::watch;
@ -43,6 +43,7 @@ pub(crate) struct RouteCutoverState {
pub(crate) struct RouteRuntimeController { pub(crate) struct RouteRuntimeController {
mode: Arc<AtomicU8>, mode: Arc<AtomicU8>,
generation: Arc<AtomicU64>, generation: Arc<AtomicU64>,
direct_since_epoch_secs: Arc<AtomicU64>,
tx: watch::Sender<RouteCutoverState>, tx: watch::Sender<RouteCutoverState>,
} }
@ -53,9 +54,15 @@ impl RouteRuntimeController {
generation: 0, generation: 0,
}; };
let (tx, _rx) = watch::channel(initial); let (tx, _rx) = watch::channel(initial);
let direct_since_epoch_secs = if matches!(initial_mode, RelayRouteMode::Direct) {
now_epoch_secs()
} else {
0
};
Self { Self {
mode: Arc::new(AtomicU8::new(initial_mode.as_u8())), mode: Arc::new(AtomicU8::new(initial_mode.as_u8())),
generation: Arc::new(AtomicU64::new(0)), generation: Arc::new(AtomicU64::new(0)),
direct_since_epoch_secs: Arc::new(AtomicU64::new(direct_since_epoch_secs)),
tx, tx,
} }
} }
@ -71,11 +78,22 @@ impl RouteRuntimeController {
self.tx.subscribe() self.tx.subscribe()
} }
pub(crate) fn direct_since_epoch_secs(&self) -> Option<u64> {
let value = self.direct_since_epoch_secs.load(Ordering::Relaxed);
(value > 0).then_some(value)
}
pub(crate) fn set_mode(&self, mode: RelayRouteMode) -> Option<RouteCutoverState> { pub(crate) fn set_mode(&self, mode: RelayRouteMode) -> Option<RouteCutoverState> {
let previous = self.mode.swap(mode.as_u8(), Ordering::Relaxed); let previous = self.mode.swap(mode.as_u8(), Ordering::Relaxed);
if previous == mode.as_u8() { if previous == mode.as_u8() {
return None; return None;
} }
if matches!(mode, RelayRouteMode::Direct) {
self.direct_since_epoch_secs
.store(now_epoch_secs(), Ordering::Relaxed);
} else {
self.direct_since_epoch_secs.store(0, Ordering::Relaxed);
}
let generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1; let generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1;
let next = RouteCutoverState { mode, generation }; let next = RouteCutoverState { mode, generation };
self.tx.send_replace(next); self.tx.send_replace(next);
@ -83,6 +101,13 @@ impl RouteRuntimeController {
} }
} }
fn now_epoch_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|value| value.as_secs())
.unwrap_or(0)
}
pub(crate) fn is_session_affected_by_cutover( pub(crate) fn is_session_affected_by_cutover(
current: RouteCutoverState, current: RouteCutoverState,
_session_mode: RelayRouteMode, _session_mode: RelayRouteMode,