From e1f3efb619e49a42c4b95443fd2046b0a87cfaf0 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 7 Mar 2026 15:37:49 +0300 Subject: [PATCH] API from main --- src/api/mod.rs | 8 +-- src/api/runtime_min.rs | 6 +- src/api/runtime_stats.rs | 2 +- src/api/runtime_zero.rs | 7 +- src/main.rs | 140 +++++++++++++++++++++------------------ 5 files changed, 91 insertions(+), 72 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index a705a46..e63de29 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -11,7 +11,7 @@ use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Method, Request, Response, StatusCode}; use tokio::net::TcpListener; -use tokio::sync::{Mutex, watch}; +use tokio::sync::{Mutex, RwLock, watch}; use tracing::{debug, info, warn}; use crate::config::ProxyConfig; @@ -67,7 +67,7 @@ pub(super) struct ApiRuntimeState { pub(super) struct ApiShared { pub(super) stats: Arc, pub(super) ip_tracker: Arc, - pub(super) me_pool: Option>, + pub(super) me_pool: Arc>>>, pub(super) upstream_manager: Arc, pub(super) config_path: PathBuf, pub(super) startup_detected_ip_v4: Option, @@ -91,7 +91,7 @@ pub async fn serve( listen: SocketAddr, stats: Arc, ip_tracker: Arc, - me_pool: Option>, + me_pool: Arc>>>, upstream_manager: Arc, config_rx: watch::Receiver>, admission_rx: watch::Receiver, @@ -248,7 +248,7 @@ async fn handle( } ("GET", "/v1/runtime/gates") => { let revision = current_revision(&shared.config_path).await?; - let data = build_runtime_gates_data(shared.as_ref(), cfg.as_ref()); + let data = build_runtime_gates_data(shared.as_ref(), cfg.as_ref()).await; Ok(success_response(StatusCode::OK, data, revision)) } ("GET", "/v1/limits/effective") => { diff --git a/src/api/runtime_min.rs b/src/api/runtime_min.rs index 96270df..d3066a3 100644 --- a/src/api/runtime_min.rs +++ b/src/api/runtime_min.rs @@ -260,7 +260,7 @@ pub(super) fn build_security_whitelist_data(cfg: &ProxyConfig) -> SecurityWhitel pub(super) async fn build_runtime_me_pool_state_data(shared: &ApiShared) -> RuntimeMePoolStateData { let now_epoch_secs = now_epoch_secs(); - let Some(pool) = &shared.me_pool else { + let Some(pool) = shared.me_pool.read().await.clone() else { return RuntimeMePoolStateData { enabled: false, reason: Some(SOURCE_UNAVAILABLE_REASON), @@ -350,7 +350,7 @@ pub(super) async fn build_runtime_me_pool_state_data(shared: &ApiShared) -> Runt pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> RuntimeMeQualityData { let now_epoch_secs = now_epoch_secs(); - let Some(pool) = &shared.me_pool else { + let Some(pool) = shared.me_pool.read().await.clone() else { return RuntimeMeQualityData { enabled: false, reason: Some(SOURCE_UNAVAILABLE_REASON), @@ -486,7 +486,7 @@ pub(super) async fn build_runtime_upstream_quality_data( pub(super) async fn build_runtime_nat_stun_data(shared: &ApiShared) -> RuntimeNatStunData { let now_epoch_secs = now_epoch_secs(); - let Some(pool) = &shared.me_pool else { + let Some(pool) = shared.me_pool.read().await.clone() else { return RuntimeNatStunData { enabled: false, reason: Some(SOURCE_UNAVAILABLE_REASON), diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index c69f817..7fae31d 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -297,7 +297,7 @@ async fn get_minimal_payload_cached( } } - let pool = shared.me_pool.as_ref()?; + let pool = shared.me_pool.read().await.clone()?; let status = pool.api_status_snapshot().await; let runtime = pool.api_runtime_snapshot().await; let generated_at_epoch_secs = status.generated_at_epoch_secs; diff --git a/src/api/runtime_zero.rs b/src/api/runtime_zero.rs index 61b6844..e184671 100644 --- a/src/api/runtime_zero.rs +++ b/src/api/runtime_zero.rs @@ -142,12 +142,17 @@ pub(super) fn build_system_info_data( } } -pub(super) fn build_runtime_gates_data(shared: &ApiShared, cfg: &ProxyConfig) -> RuntimeGatesData { +pub(super) async fn build_runtime_gates_data( + shared: &ApiShared, + cfg: &ProxyConfig, +) -> RuntimeGatesData { let me_runtime_ready = if !cfg.general.use_middle_proxy { true } else { shared .me_pool + .read() + .await .as_ref() .map(|pool| pool.is_runtime_ready()) .unwrap_or(false) diff --git a/src/main.rs b/src/main.rs index ca24d0e..a46be25 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use rand::Rng; use tokio::net::TcpListener; use tokio::signal; -use tokio::sync::{Semaphore, mpsc, watch}; +use tokio::sync::{RwLock, Semaphore, mpsc, watch}; use tracing::{debug, error, info, warn}; use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload}; #[cfg(unix)] @@ -473,6 +473,70 @@ async fn main() -> std::result::Result<(), Box> { config.general.upstream_connect_failfast_hard_errors, stats.clone(), )); + let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.load_limits(&config.access.user_max_unique_ips).await; + ip_tracker + .set_limit_policy( + config.access.user_max_unique_ips_mode, + config.access.user_max_unique_ips_window_secs, + ) + .await; + if !config.access.user_max_unique_ips.is_empty() { + info!( + "IP limits configured for {} users", + config.access.user_max_unique_ips.len() + ); + } + if !config.network.dns_overrides.is_empty() { + info!( + "Runtime DNS overrides configured: {} entries", + config.network.dns_overrides.len() + ); + } + + let (api_config_tx, api_config_rx) = watch::channel(Arc::new(config.clone())); + let initial_admission_open = !config.general.use_middle_proxy; + let (admission_tx, admission_rx) = watch::channel(initial_admission_open); + let api_me_pool = Arc::new(RwLock::new(None::>)); + + if config.server.api.enabled { + let listen = match config.server.api.listen.parse::() { + Ok(listen) => listen, + Err(error) => { + warn!( + error = %error, + listen = %config.server.api.listen, + "Invalid server.api.listen; API is disabled" + ); + SocketAddr::from(([127, 0, 0, 1], 0)) + } + }; + if listen.port() != 0 { + let stats_api = stats.clone(); + let ip_tracker_api = ip_tracker.clone(); + let me_pool_api = api_me_pool.clone(); + let upstream_manager_api = upstream_manager.clone(); + let config_rx_api = api_config_rx.clone(); + let admission_rx_api = admission_rx.clone(); + let config_path_api = std::path::PathBuf::from(&config_path); + tokio::spawn(async move { + api::serve( + listen, + stats_api, + ip_tracker_api, + me_pool_api, + upstream_manager_api, + config_rx_api, + admission_rx_api, + config_path_api, + None, + None, + process_started_at_epoch_secs, + ) + .await; + }); + } + } let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len()); tls_domains.push(config.censorship.tls_domain.clone()); @@ -620,26 +684,6 @@ async fn main() -> std::result::Result<(), Box> { let beobachten = Arc::new(BeobachtenStore::new()); let rng = Arc::new(SecureRandom::new()); - // IP Tracker initialization - let ip_tracker = Arc::new(UserIpTracker::new()); - ip_tracker.load_limits(&config.access.user_max_unique_ips).await; - ip_tracker - .set_limit_policy( - config.access.user_max_unique_ips_mode, - config.access.user_max_unique_ips_window_secs, - ) - .await; - - if !config.access.user_max_unique_ips.is_empty() { - info!("IP limits configured for {} users", config.access.user_max_unique_ips.len()); - } - if !config.network.dns_overrides.is_empty() { - info!( - "Runtime DNS overrides configured: {} entries", - config.network.dns_overrides.len() - ); - } - // Connection concurrency limit let max_connections = Arc::new(Semaphore::new(10_000)); @@ -1074,6 +1118,7 @@ async fn main() -> std::result::Result<(), Box> { if let Some(ref pool) = me_pool { pool.set_runtime_ready(true); } + *api_me_pool.write().await = me_pool.clone(); // Background tasks let um_clone = upstream_manager.clone(); @@ -1114,6 +1159,17 @@ async fn main() -> std::result::Result<(), Box> { detected_ip_v4, detected_ip_v6, ); + let mut config_rx_api_bridge = config_rx.clone(); + let api_config_tx_bridge = api_config_tx.clone(); + tokio::spawn(async move { + loop { + if config_rx_api_bridge.changed().await.is_err() { + break; + } + let cfg = config_rx_api_bridge.borrow_and_update().clone(); + api_config_tx_bridge.send_replace(cfg); + } + }); let stats_policy = stats.clone(); let mut config_rx_policy = config_rx.clone(); @@ -1345,7 +1401,6 @@ async fn main() -> std::result::Result<(), Box> { print_proxy_links(&host, port, &config); } - let (admission_tx, admission_rx) = watch::channel(true); if config.general.use_middle_proxy { if let Some(pool) = me_pool.as_ref() { let initial_open = pool.admission_ready_conditional_cast().await; @@ -1546,47 +1601,6 @@ async fn main() -> std::result::Result<(), Box> { }); } - if config.server.api.enabled { - let listen = match config.server.api.listen.parse::() { - Ok(listen) => listen, - Err(error) => { - warn!( - error = %error, - listen = %config.server.api.listen, - "Invalid server.api.listen; API is disabled" - ); - SocketAddr::from(([127, 0, 0, 1], 0)) - } - }; - if listen.port() != 0 { - let stats = stats.clone(); - let ip_tracker_api = ip_tracker.clone(); - let me_pool_api = me_pool.clone(); - let upstream_manager_api = upstream_manager.clone(); - let config_rx_api = config_rx.clone(); - let admission_rx_api = admission_rx.clone(); - let config_path_api = std::path::PathBuf::from(&config_path); - let startup_detected_ip_v4 = detected_ip_v4; - let startup_detected_ip_v6 = detected_ip_v6; - tokio::spawn(async move { - api::serve( - listen, - stats, - ip_tracker_api, - me_pool_api, - upstream_manager_api, - config_rx_api, - admission_rx_api, - config_path_api, - startup_detected_ip_v4, - startup_detected_ip_v6, - process_started_at_epoch_secs, - ) - .await; - }); - } - } - for (listener, listener_proxy_protocol) in listeners { let mut config_rx: tokio::sync::watch::Receiver> = config_rx.clone(); let mut admission_rx_tcp = admission_rx.clone();