From 161af515587010322b5ebdc98203122ed871e2bc Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Tue, 10 Mar 2026 00:02:39 +0300 Subject: [PATCH] User Management in API --- src/api/config_store.rs | 162 ++++++++++++++++++++++++++ src/api/users.rs | 38 ++++++- src/maestro/admission.rs | 16 ++- src/maestro/me_startup.rs | 232 ++++++++++++++++++++++++++------------ src/metrics.rs | 93 +++++++++++++-- 5 files changed, 457 insertions(+), 84 deletions(-) diff --git a/src/api/config_store.rs b/src/api/config_store.rs index e7fbbca..f0da554 100644 --- a/src/api/config_store.rs +++ b/src/api/config_store.rs @@ -1,13 +1,39 @@ +use std::collections::BTreeMap; use std::io::Write; use std::path::{Path, PathBuf}; +use chrono::{DateTime, Utc}; use hyper::header::IF_MATCH; +use serde::Serialize; use sha2::{Digest, Sha256}; use crate::config::ProxyConfig; use super::model::ApiFailure; +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(super) enum AccessSection { + Users, + UserAdTags, + UserMaxTcpConns, + UserExpirations, + UserDataQuota, + UserMaxUniqueIps, +} + +impl AccessSection { + fn table_name(self) -> &'static str { + match self { + Self::Users => "access.users", + Self::UserAdTags => "access.user_ad_tags", + Self::UserMaxTcpConns => "access.user_max_tcp_conns", + Self::UserExpirations => "access.user_expirations", + Self::UserDataQuota => "access.user_data_quota", + Self::UserMaxUniqueIps => "access.user_max_unique_ips", + } + } +} + pub(super) fn parse_if_match(headers: &hyper::HeaderMap) -> Option { headers .get(IF_MATCH) @@ -66,6 +92,142 @@ pub(super) async fn save_config_to_disk( Ok(compute_revision(&serialized)) } +pub(super) async fn save_access_sections_to_disk( + config_path: &Path, + cfg: &ProxyConfig, + sections: &[AccessSection], +) -> Result { + let mut content = tokio::fs::read_to_string(config_path) + .await + .map_err(|e| ApiFailure::internal(format!("failed to read config: {}", e)))?; + + let mut applied = Vec::new(); + for section in sections { + if applied.contains(section) { + continue; + } + let rendered = render_access_section(cfg, *section)?; + content = upsert_toml_table(&content, section.table_name(), &rendered); + applied.push(*section); + } + + write_atomic(config_path.to_path_buf(), content.clone()).await?; + Ok(compute_revision(&content)) +} + +fn render_access_section(cfg: &ProxyConfig, section: AccessSection) -> Result { + let body = match section { + AccessSection::Users => { + let rows: BTreeMap = cfg + .access + .users + .iter() + .map(|(key, value)| (key.clone(), value.clone())) + .collect(); + serialize_table_body(&rows)? + } + AccessSection::UserAdTags => { + let rows: BTreeMap = cfg + .access + .user_ad_tags + .iter() + .map(|(key, value)| (key.clone(), value.clone())) + .collect(); + serialize_table_body(&rows)? + } + AccessSection::UserMaxTcpConns => { + let rows: BTreeMap = cfg + .access + .user_max_tcp_conns + .iter() + .map(|(key, value)| (key.clone(), *value)) + .collect(); + serialize_table_body(&rows)? + } + AccessSection::UserExpirations => { + let rows: BTreeMap> = cfg + .access + .user_expirations + .iter() + .map(|(key, value)| (key.clone(), *value)) + .collect(); + serialize_table_body(&rows)? + } + AccessSection::UserDataQuota => { + let rows: BTreeMap = cfg + .access + .user_data_quota + .iter() + .map(|(key, value)| (key.clone(), *value)) + .collect(); + serialize_table_body(&rows)? + } + AccessSection::UserMaxUniqueIps => { + let rows: BTreeMap = cfg + .access + .user_max_unique_ips + .iter() + .map(|(key, value)| (key.clone(), *value)) + .collect(); + serialize_table_body(&rows)? + } + }; + + let mut out = format!("[{}]\n", section.table_name()); + if !body.is_empty() { + out.push_str(&body); + } + if !out.ends_with('\n') { + out.push('\n'); + } + Ok(out) +} + +fn serialize_table_body(value: &T) -> Result { + toml::to_string(value) + .map_err(|e| ApiFailure::internal(format!("failed to serialize access section: {}", e))) +} + +fn upsert_toml_table(source: &str, table_name: &str, replacement: &str) -> String { + if let Some((start, end)) = find_toml_table_bounds(source, table_name) { + let mut out = String::with_capacity(source.len() + replacement.len()); + out.push_str(&source[..start]); + out.push_str(replacement); + out.push_str(&source[end..]); + return out; + } + + let mut out = source.to_string(); + if !out.is_empty() && !out.ends_with('\n') { + out.push('\n'); + } + if !out.is_empty() { + out.push('\n'); + } + out.push_str(replacement); + out +} + +fn find_toml_table_bounds(source: &str, table_name: &str) -> Option<(usize, usize)> { + let target = format!("[{}]", table_name); + let mut offset = 0usize; + let mut start = None; + + for line in source.split_inclusive('\n') { + let trimmed = line.trim(); + if let Some(start_offset) = start { + if trimmed.starts_with('[') { + return Some((start_offset, offset)); + } + } else if trimmed == target { + start = Some(offset); + } + offset = offset.saturating_add(line.len()); + } + + start.map(|start_offset| (start_offset, source.len())) +} + async fn write_atomic(path: PathBuf, contents: String) -> Result<(), ApiFailure> { tokio::task::spawn_blocking(move || write_atomic_sync(&path, &contents)) .await diff --git a/src/api/users.rs b/src/api/users.rs index 7265044..d98790d 100644 --- a/src/api/users.rs +++ b/src/api/users.rs @@ -8,7 +8,8 @@ use crate::stats::Stats; use super::ApiShared; use super::config_store::{ - ensure_expected_revision, load_config_from_disk, save_config_to_disk, + AccessSection, ensure_expected_revision, load_config_from_disk, save_access_sections_to_disk, + save_config_to_disk, }; use super::model::{ ApiFailure, CreateUserRequest, CreateUserResponse, PatchUserRequest, RotateSecretRequest, @@ -21,6 +22,12 @@ pub(super) async fn create_user( expected_revision: Option, shared: &ApiShared, ) -> Result<(CreateUserResponse, String), ApiFailure> { + let touches_user_ad_tags = body.user_ad_tag.is_some(); + let touches_user_max_tcp_conns = body.max_tcp_conns.is_some(); + let touches_user_expirations = body.expiration_rfc3339.is_some(); + let touches_user_data_quota = body.data_quota_bytes.is_some(); + let touches_user_max_unique_ips = body.max_unique_ips.is_some(); + if !is_valid_username(&body.username) { return Err(ApiFailure::bad_request( "username must match [A-Za-z0-9_.-] and be 1..64 chars", @@ -84,7 +91,24 @@ pub(super) async fn create_user( cfg.validate() .map_err(|e| ApiFailure::bad_request(format!("config validation failed: {}", e)))?; - let revision = save_config_to_disk(&shared.config_path, &cfg).await?; + let mut touched_sections = vec![AccessSection::Users]; + if touches_user_ad_tags { + touched_sections.push(AccessSection::UserAdTags); + } + if touches_user_max_tcp_conns { + touched_sections.push(AccessSection::UserMaxTcpConns); + } + if touches_user_expirations { + touched_sections.push(AccessSection::UserExpirations); + } + if touches_user_data_quota { + touched_sections.push(AccessSection::UserDataQuota); + } + if touches_user_max_unique_ips { + touched_sections.push(AccessSection::UserMaxUniqueIps); + } + + let revision = save_access_sections_to_disk(&shared.config_path, &cfg, &touched_sections).await?; drop(_guard); if let Some(limit) = updated_limit { @@ -231,7 +255,15 @@ pub(super) async fn rotate_secret( cfg.access.users.insert(user.to_string(), secret.clone()); cfg.validate() .map_err(|e| ApiFailure::bad_request(format!("config validation failed: {}", e)))?; - let revision = save_config_to_disk(&shared.config_path, &cfg).await?; + let touched_sections = [ + AccessSection::Users, + AccessSection::UserAdTags, + AccessSection::UserMaxTcpConns, + AccessSection::UserExpirations, + AccessSection::UserDataQuota, + AccessSection::UserMaxUniqueIps, + ]; + let revision = save_access_sections_to_disk(&shared.config_path, &cfg, &touched_sections).await?; drop(_guard); let (detected_ip_v4, detected_ip_v6) = shared.detected_link_ips(); diff --git a/src/maestro/admission.rs b/src/maestro/admission.rs index bacd4d4..69a9c9f 100644 --- a/src/maestro/admission.rs +++ b/src/maestro/admission.rs @@ -8,6 +8,9 @@ use crate::config::ProxyConfig; use crate::proxy::route_mode::{RelayRouteMode, RouteRuntimeController}; use crate::transport::middle_proxy::MePool; +const STARTUP_FALLBACK_AFTER: Duration = Duration::from_secs(80); +const RUNTIME_FALLBACK_AFTER: Duration = Duration::from_secs(6); + pub(crate) async fn configure_admission_gate( config: &Arc, me_pool: Option>, @@ -17,7 +20,6 @@ pub(crate) async fn configure_admission_gate( ) { if config.general.use_middle_proxy { if let Some(pool) = me_pool.as_ref() { - let fallback_after = Duration::from_secs(6); let initial_ready = pool.admission_ready_conditional_cast().await; admission_tx.send_replace(initial_ready); let _ = route_runtime.set_mode(RelayRouteMode::Middle); @@ -36,6 +38,7 @@ pub(crate) async fn configure_admission_gate( tokio::spawn(async move { let mut gate_open = initial_ready; let mut route_mode = RelayRouteMode::Middle; + let mut ready_observed = initial_ready; let mut not_ready_since = if initial_ready { None } else { @@ -57,11 +60,17 @@ pub(crate) async fn configure_admission_gate( let ready = pool_for_gate.admission_ready_conditional_cast().await; let now = Instant::now(); let (next_gate_open, next_route_mode, next_fallback_active) = if ready { + ready_observed = true; not_ready_since = None; (true, RelayRouteMode::Middle, false) } else { let not_ready_started_at = *not_ready_since.get_or_insert(now); let not_ready_for = now.saturating_duration_since(not_ready_started_at); + let fallback_after = if ready_observed { + RUNTIME_FALLBACK_AFTER + } else { + STARTUP_FALLBACK_AFTER + }; if fallback_enabled && not_ready_for > fallback_after { (true, RelayRouteMode::Direct, true) } else { @@ -79,6 +88,11 @@ pub(crate) async fn configure_admission_gate( "Middle-End routing restored for new sessions" ); } else { + let fallback_after = if ready_observed { + RUNTIME_FALLBACK_AFTER + } else { + STARTUP_FALLBACK_AFTER + }; warn!( target_mode = route_mode.as_str(), cutover_generation = snapshot.generation, diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 9c674cd..72fdd40 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -281,88 +281,178 @@ pub(crate) async fn initialize_me_pool( .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_POOL_INIT_STAGE1) .await; - let mut init_attempt: u32 = 0; - loop { - init_attempt = init_attempt.saturating_add(1); - startup_tracker.set_me_init_attempt(init_attempt).await; - match pool.init(pool_size, &rng).await { - Ok(()) => { - startup_tracker.set_me_last_error(None).await; - startup_tracker - .complete_component( - COMPONENT_ME_POOL_INIT_STAGE1, - Some("ME pool initialized".to_string()), - ) - .await; - startup_tracker - .set_me_status(StartupMeStatus::Ready, "ready") - .await; - info!( - attempt = init_attempt, - "Middle-End pool initialized successfully" - ); + if me2dc_fallback { + let pool_bg = pool.clone(); + let rng_bg = rng.clone(); + let startup_tracker_bg = startup_tracker.clone(); + let retry_limit = if me_init_retry_attempts == 0 { + String::from("unlimited") + } else { + me_init_retry_attempts.to_string() + }; + std::thread::spawn(move || { + let runtime = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(runtime) => runtime, + Err(error) => { + error!(error = %error, "Failed to build background runtime for ME initialization"); + return; + } + }; + runtime.block_on(async move { + let mut init_attempt: u32 = 0; + loop { + init_attempt = init_attempt.saturating_add(1); + startup_tracker_bg.set_me_init_attempt(init_attempt).await; + match pool_bg.init(pool_size, &rng_bg).await { + Ok(()) => { + startup_tracker_bg.set_me_last_error(None).await; + startup_tracker_bg + .complete_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("ME pool initialized".to_string()), + ) + .await; + startup_tracker_bg + .set_me_status(StartupMeStatus::Ready, "ready") + .await; + info!( + attempt = init_attempt, + "Middle-End pool initialized successfully" + ); - // Phase 4: Start health monitor - let pool_clone = pool.clone(); - let rng_clone = rng.clone(); - let min_conns = pool_size; - tokio::spawn(async move { - crate::transport::middle_proxy::me_health_monitor( - pool_clone, rng_clone, min_conns, - ) - .await; - }); - - break Some(pool); - } - Err(e) => { - startup_tracker.set_me_last_error(Some(e.to_string())).await; - let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; - if retries_limited && init_attempt >= me_init_retry_attempts { + let pool_health = pool_bg.clone(); + let rng_health = rng_bg.clone(); + let min_conns = pool_size; + tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + pool_health, + rng_health, + min_conns, + ) + .await; + }); + break; + } + Err(e) => { + startup_tracker_bg.set_me_last_error(Some(e.to_string())).await; + if init_attempt >= me_init_warn_after_attempts { + warn!( + error = %e, + attempt = init_attempt, + retry_limit = %retry_limit, + retry_in_secs = 2, + "ME pool is not ready yet; retrying background initialization" + ); + } else { + info!( + error = %e, + attempt = init_attempt, + retry_limit = %retry_limit, + retry_in_secs = 2, + "ME pool startup warmup: retrying background initialization" + ); + } + pool_bg.reset_stun_state(); + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } + }); + }); + startup_tracker + .set_me_status(StartupMeStatus::Initializing, "background_init") + .await; + info!( + startup_grace_secs = 80, + "ME pool initialization continues in background; startup continues with conditional Direct fallback" + ); + Some(pool) + } else { + let mut init_attempt: u32 = 0; + loop { + init_attempt = init_attempt.saturating_add(1); + startup_tracker.set_me_init_attempt(init_attempt).await; + match pool.init(pool_size, &rng).await { + Ok(()) => { + startup_tracker.set_me_last_error(None).await; startup_tracker - .fail_component( + .complete_component( COMPONENT_ME_POOL_INIT_STAGE1, - Some("ME init retry budget exhausted".to_string()), + Some("ME pool initialized".to_string()), ) .await; startup_tracker - .set_me_status(StartupMeStatus::Failed, "failed") + .set_me_status(StartupMeStatus::Ready, "ready") .await; - error!( - error = %e, - attempt = init_attempt, - retry_limit = me_init_retry_attempts, - "ME pool init retries exhausted; falling back to direct mode" - ); - break None; - } - - let retry_limit = if !me2dc_fallback || me_init_retry_attempts == 0 { - String::from("unlimited") - } else { - me_init_retry_attempts.to_string() - }; - if init_attempt >= me_init_warn_after_attempts { - warn!( - error = %e, - attempt = init_attempt, - retry_limit = retry_limit, - me2dc_fallback = me2dc_fallback, - retry_in_secs = 2, - "ME pool is not ready yet; retrying startup initialization" - ); - } else { info!( - error = %e, attempt = init_attempt, - retry_limit = retry_limit, - me2dc_fallback = me2dc_fallback, - retry_in_secs = 2, - "ME pool startup warmup: retrying initialization" + "Middle-End pool initialized successfully" ); + + let pool_clone = pool.clone(); + let rng_clone = rng.clone(); + let min_conns = pool_size; + tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + pool_clone, rng_clone, min_conns, + ) + .await; + }); + + break Some(pool); + } + Err(e) => { + startup_tracker.set_me_last_error(Some(e.to_string())).await; + let retries_limited = me_init_retry_attempts > 0; + if retries_limited && init_attempt >= me_init_retry_attempts { + startup_tracker + .fail_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("ME init retry budget exhausted".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Failed, "failed") + .await; + error!( + error = %e, + attempt = init_attempt, + retry_limit = me_init_retry_attempts, + "ME pool init retries exhausted; startup cannot continue in middle-proxy mode" + ); + break None; + } + + let retry_limit = if me_init_retry_attempts == 0 { + String::from("unlimited") + } else { + me_init_retry_attempts.to_string() + }; + if init_attempt >= me_init_warn_after_attempts { + warn!( + error = %e, + attempt = init_attempt, + retry_limit = retry_limit, + me2dc_fallback = me2dc_fallback, + retry_in_secs = 2, + "ME pool is not ready yet; retrying startup initialization" + ); + } else { + info!( + error = %e, + attempt = init_attempt, + retry_limit = retry_limit, + me2dc_fallback = me2dc_fallback, + retry_in_secs = 2, + "ME pool startup warmup: retrying initialization" + ); + } + pool.reset_stun_state(); + tokio::time::sleep(Duration::from_secs(2)).await; } - pool.reset_stun_state(); - tokio::time::sleep(Duration::from_secs(2)).await; } } } diff --git a/src/metrics.rs b/src/metrics.rs index 917c9b3..c24dc54 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -17,6 +17,7 @@ use crate::config::ProxyConfig; use crate::ip_tracker::UserIpTracker; use crate::stats::beobachten::BeobachtenStore; use crate::stats::Stats; +use crate::transport::{ListenOptions, create_listener}; pub async fn serve( port: u16, @@ -26,16 +27,90 @@ pub async fn serve( config_rx: tokio::sync::watch::Receiver>, whitelist: Vec, ) { - let addr = SocketAddr::from(([0, 0, 0, 0], port)); - let listener = match TcpListener::bind(addr).await { - Ok(l) => l, - Err(e) => { - warn!(error = %e, "Failed to bind metrics on {}", addr); - return; - } - }; - info!("Metrics endpoint: http://{}/metrics and /beobachten", addr); + let whitelist = Arc::new(whitelist); + let mut listener_v4 = None; + let mut listener_v6 = None; + let addr_v4 = SocketAddr::from(([0, 0, 0, 0], port)); + match bind_metrics_listener(addr_v4, false) { + Ok(listener) => { + info!("Metrics endpoint: http://{}/metrics and /beobachten", addr_v4); + listener_v4 = Some(listener); + } + Err(e) => { + warn!(error = %e, "Failed to bind metrics on {}", addr_v4); + } + } + + let addr_v6 = SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 0], port)); + match bind_metrics_listener(addr_v6, true) { + Ok(listener) => { + info!("Metrics endpoint: http://[::]:{}/metrics and /beobachten", port); + listener_v6 = Some(listener); + } + Err(e) => { + warn!(error = %e, "Failed to bind metrics on {}", addr_v6); + } + } + + match (listener_v4, listener_v6) { + (None, None) => { + warn!("Metrics listener is unavailable on both IPv4 and IPv6"); + } + (Some(listener), None) | (None, Some(listener)) => { + serve_listener( + listener, stats, beobachten, ip_tracker, config_rx, whitelist, + ) + .await; + } + (Some(listener4), Some(listener6)) => { + let stats_v6 = stats.clone(); + let beobachten_v6 = beobachten.clone(); + let ip_tracker_v6 = ip_tracker.clone(); + let config_rx_v6 = config_rx.clone(); + let whitelist_v6 = whitelist.clone(); + tokio::spawn(async move { + serve_listener( + listener6, + stats_v6, + beobachten_v6, + ip_tracker_v6, + config_rx_v6, + whitelist_v6, + ) + .await; + }); + serve_listener( + listener4, + stats, + beobachten, + ip_tracker, + config_rx, + whitelist, + ) + .await; + } + } +} + +fn bind_metrics_listener(addr: SocketAddr, ipv6_only: bool) -> std::io::Result { + let options = ListenOptions { + reuse_port: false, + ipv6_only, + ..Default::default() + }; + let socket = create_listener(addr, &options)?; + TcpListener::from_std(socket.into()) +} + +async fn serve_listener( + listener: TcpListener, + stats: Arc, + beobachten: Arc, + ip_tracker: Arc, + config_rx: tokio::sync::watch::Receiver>, + whitelist: Arc>, +) { loop { let (stream, peer) = match listener.accept().await { Ok(v) => v,