diff --git a/docs/API.md b/docs/API.md index c70b625..9296aff 100644 --- a/docs/API.md +++ b/docs/API.md @@ -99,6 +99,7 @@ Notes: | `GET` | `/v1/runtime/me_quality` | none | `200` | `RuntimeMeQualityData` | | `GET` | `/v1/runtime/upstream_quality` | none | `200` | `RuntimeUpstreamQualityData` | | `GET` | `/v1/runtime/nat_stun` | none | `200` | `RuntimeNatStunData` | +| `GET` | `/v1/runtime/me-selftest` | none | `200` | `RuntimeMeSelftestData` | | `GET` | `/v1/runtime/connections/summary` | none | `200` | `RuntimeEdgeConnectionsSummaryData` | | `GET` | `/v1/runtime/events/recent` | none | `200` | `RuntimeEdgeEventsData` | | `GET` | `/v1/stats/users` | none | `200` | `UserInfo[]` | @@ -560,6 +561,67 @@ Note: the request contract is defined, but the corresponding route currently ret | `addr` | `string` | Reflected public endpoint (`ip:port`). | | `age_secs` | `u64` | Reflection value age in seconds. | +### `RuntimeMeSelftestData` +| Field | Type | Description | +| --- | --- | --- | +| `enabled` | `bool` | Runtime payload availability. | +| `reason` | `string?` | `source_unavailable` when ME pool is unavailable. | +| `generated_at_epoch_secs` | `u64` | Snapshot generation timestamp. | +| `data` | `RuntimeMeSelftestPayload?` | Null when unavailable. | + +#### `RuntimeMeSelftestPayload` +| Field | Type | Description | +| --- | --- | --- | +| `kdf` | `RuntimeMeSelftestKdfData` | KDF EWMA health state. | +| `timeskew` | `RuntimeMeSelftestTimeskewData` | Date-header skew health state. | +| `ip` | `RuntimeMeSelftestIpData` | Interface IP family classification. | +| `pid` | `RuntimeMeSelftestPidData` | Process PID marker (`one|non-one`). | +| `bnd` | `RuntimeMeSelftestBndData` | SOCKS BND.ADDR/BND.PORT health state. | + +#### `RuntimeMeSelftestKdfData` +| Field | Type | Description | +| --- | --- | --- | +| `state` | `string` | `ok` or `error` based on EWMA threshold. | +| `ewma_errors_per_min` | `f64` | EWMA KDF error rate per minute. | +| `threshold_errors_per_min` | `f64` | Threshold used for `error` decision. | +| `errors_total` | `u64` | Total source errors (`kdf_drift + socks_kdf_strict_reject`). | + +#### `RuntimeMeSelftestTimeskewData` +| Field | Type | Description | +| --- | --- | --- | +| `state` | `string` | `ok` or `error` (`max_skew_secs_15m > 60` => `error`). | +| `max_skew_secs_15m` | `u64?` | Maximum observed skew in the last 15 minutes. | +| `samples_15m` | `usize` | Number of skew samples in the last 15 minutes. | +| `last_skew_secs` | `u64?` | Latest observed skew value. | +| `last_source` | `string?` | Latest skew source marker. | +| `last_seen_age_secs` | `u64?` | Age of the latest skew sample. | + +#### `RuntimeMeSelftestIpData` +| Field | Type | Description | +| --- | --- | --- | +| `v4` | `RuntimeMeSelftestIpFamilyData?` | IPv4 interface probe result; absent when unknown. | +| `v6` | `RuntimeMeSelftestIpFamilyData?` | IPv6 interface probe result; absent when unknown. | + +#### `RuntimeMeSelftestIpFamilyData` +| Field | Type | Description | +| --- | --- | --- | +| `addr` | `string` | Detected interface IP. | +| `state` | `string` | `good`, `bogon`, or `loopback`. | + +#### `RuntimeMeSelftestPidData` +| Field | Type | Description | +| --- | --- | --- | +| `pid` | `u32` | Current process PID. | +| `state` | `string` | `one` when PID=1, otherwise `non-one`. | + +#### `RuntimeMeSelftestBndData` +| Field | Type | Description | +| --- | --- | --- | +| `addr_state` | `string` | `ok`, `bogon`, or `error`. | +| `port_state` | `string` | `ok`, `zero`, or `error`. | +| `last_addr` | `string?` | Latest observed SOCKS BND address. | +| `last_seen_age_secs` | `u64?` | Age of latest BND sample. | + ### `RuntimeEdgeConnectionsSummaryData` | Field | Type | Description | | --- | --- | --- | @@ -971,11 +1033,15 @@ Note: the request contract is defined, but the corresponding route currently ret | `tls` | `string[]` | Active `tg://proxy` links for EE-TLS mode (for each host+TLS domain). | Link generation uses active config and enabled modes: -- `[general.links].public_host/public_port` have priority. -- If `public_host` is not set, startup-detected public IPs are used when they are present in API runtime context. -- Fallback host sources: listener `announce`, `announce_ip`, explicit listener `ip`. -- Legacy fallback: `listen_addr_ipv4` and `listen_addr_ipv6` when routable. -- Startup-detected IP values are process-static after API task bootstrap. +- Link port is `general.links.public_port` when configured; otherwise `server.port`. +- If `general.links.public_host` is non-empty, it is used as the single link host override. +- If `public_host` is not set, hosts are resolved from `server.listeners` in order: + `announce` -> `announce_ip` -> listener bind `ip`. +- For wildcard listener IPs (`0.0.0.0` / `::`), startup-detected external IP of the same family is used when available. +- Listener-derived hosts are de-duplicated while preserving first-seen order. +- If multiple hosts are resolved, API returns links for all resolved hosts in every enabled mode. +- If no host can be resolved from listeners, fallback is startup-detected `IPv4 -> IPv6`. +- Final compatibility fallback uses `listen_addr_ipv4`/`listen_addr_ipv6` when routable, otherwise `"UNKNOWN"`. - User rows are sorted by `username` in ascending lexical order. ### `CreateUserResponse` @@ -988,10 +1054,10 @@ Link generation uses active config and enabled modes: | Endpoint | Notes | | --- | --- | -| `POST /v1/users` | Creates user and validates resulting config before atomic save. | -| `PATCH /v1/users/{username}` | Partial update of provided fields only. Missing fields remain unchanged. | +| `POST /v1/users` | Creates user, validates config, then atomically updates only affected `access.*` TOML tables (`access.users` always, plus optional per-user tables present in request). | +| `PATCH /v1/users/{username}` | Partial update of provided fields only. Missing fields remain unchanged. Current implementation persists full config document on success. | | `POST /v1/users/{username}/rotate-secret` | Currently returns `404` in runtime route matcher; request schema is reserved for intended behavior. | -| `DELETE /v1/users/{username}` | Deletes user and related optional settings. Last user deletion is blocked. | +| `DELETE /v1/users/{username}` | Deletes only specified user, removes this user from related optional `access.user_*` maps, blocks last-user deletion, and atomically updates only related `access.*` TOML tables. | All mutating endpoints: - Respect `read_only` mode. @@ -999,6 +1065,10 @@ All mutating endpoints: - Return new `revision` after successful write. - Use process-local mutation lock + atomic write (`tmp + rename`) for config persistence. +Delete path cleanup guarantees: +- Config cleanup removes only the requested username keys. +- Runtime unique-IP cleanup removes only this user's limiter and tracked IP state. + ## Runtime State Matrix | Endpoint | `minimal_runtime_enabled=false` | `minimal_runtime_enabled=true` + source unavailable | `minimal_runtime_enabled=true` + source available | @@ -1020,9 +1090,20 @@ Additional runtime endpoint behavior: | `/v1/runtime/me_quality` | No | ME pool snapshot unavailable | `enabled=true`, full payload | | `/v1/runtime/upstream_quality` | No | Upstream runtime snapshot unavailable | `enabled=true`, full payload | | `/v1/runtime/nat_stun` | No | STUN shared state unavailable | `enabled=true`, full payload | +| `/v1/runtime/me-selftest` | No | ME pool unavailable => `enabled=false`, `reason=source_unavailable` | `enabled=true`, full payload | | `/v1/runtime/connections/summary` | `runtime_edge_enabled=false` => `enabled=false`, `reason=feature_disabled` | Recompute lock contention with no cache entry => `enabled=true`, `reason=source_unavailable` | `enabled=true`, full payload | | `/v1/runtime/events/recent` | `runtime_edge_enabled=false` => `enabled=false`, `reason=feature_disabled` | Not used in current implementation | `enabled=true`, full payload | +## ME Fallback Behavior Exposed Via API + +When `general.use_middle_proxy=true` and `general.me2dc_fallback=true`: +- Startup does not block on full ME pool readiness; initialization can continue in background. +- Runtime initialization payload can expose ME stage `background_init` until pool becomes ready. +- Admission/routing decision uses two readiness grace windows for "ME not ready" periods: + `80s` before first-ever readiness is observed (startup grace), + `6s` after readiness has been observed at least once (runtime failover timeout). +- While in fallback window breach, new sessions are routed via Direct-DC; when ME becomes ready, routing returns to Middle mode for new sessions. + ## Serialization Rules - Success responses always include `revision`. @@ -1046,7 +1127,7 @@ Additional runtime endpoint behavior: | Runtime apply path | Successful writes are picked up by existing config watcher/hot-reload path. | | Exposure | Built-in TLS/mTLS is not provided. Use loopback bind + reverse proxy if needed. | | Pagination | User list currently has no pagination/filtering. | -| Serialization side effect | Config comments/manual formatting are not preserved on write. | +| Serialization side effect | Updated TOML table bodies are re-serialized on write. Endpoints that persist full config can still rewrite broader formatting/comments. | ## Known Limitations (Current Release) diff --git a/src/api/config_store.rs b/src/api/config_store.rs index e7fbbca..f0da554 100644 --- a/src/api/config_store.rs +++ b/src/api/config_store.rs @@ -1,13 +1,39 @@ +use std::collections::BTreeMap; use std::io::Write; use std::path::{Path, PathBuf}; +use chrono::{DateTime, Utc}; use hyper::header::IF_MATCH; +use serde::Serialize; use sha2::{Digest, Sha256}; use crate::config::ProxyConfig; use super::model::ApiFailure; +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(super) enum AccessSection { + Users, + UserAdTags, + UserMaxTcpConns, + UserExpirations, + UserDataQuota, + UserMaxUniqueIps, +} + +impl AccessSection { + fn table_name(self) -> &'static str { + match self { + Self::Users => "access.users", + Self::UserAdTags => "access.user_ad_tags", + Self::UserMaxTcpConns => "access.user_max_tcp_conns", + Self::UserExpirations => "access.user_expirations", + Self::UserDataQuota => "access.user_data_quota", + Self::UserMaxUniqueIps => "access.user_max_unique_ips", + } + } +} + pub(super) fn parse_if_match(headers: &hyper::HeaderMap) -> Option { headers .get(IF_MATCH) @@ -66,6 +92,142 @@ pub(super) async fn save_config_to_disk( Ok(compute_revision(&serialized)) } +pub(super) async fn save_access_sections_to_disk( + config_path: &Path, + cfg: &ProxyConfig, + sections: &[AccessSection], +) -> Result { + let mut content = tokio::fs::read_to_string(config_path) + .await + .map_err(|e| ApiFailure::internal(format!("failed to read config: {}", e)))?; + + let mut applied = Vec::new(); + for section in sections { + if applied.contains(section) { + continue; + } + let rendered = render_access_section(cfg, *section)?; + content = upsert_toml_table(&content, section.table_name(), &rendered); + applied.push(*section); + } + + write_atomic(config_path.to_path_buf(), content.clone()).await?; + Ok(compute_revision(&content)) +} + +fn render_access_section(cfg: &ProxyConfig, section: AccessSection) -> Result { + let body = match section { + AccessSection::Users => { + let rows: BTreeMap = cfg + .access + .users + .iter() + .map(|(key, value)| (key.clone(), value.clone())) + .collect(); + serialize_table_body(&rows)? + } + AccessSection::UserAdTags => { + let rows: BTreeMap = cfg + .access + .user_ad_tags + .iter() + .map(|(key, value)| (key.clone(), value.clone())) + .collect(); + serialize_table_body(&rows)? + } + AccessSection::UserMaxTcpConns => { + let rows: BTreeMap = cfg + .access + .user_max_tcp_conns + .iter() + .map(|(key, value)| (key.clone(), *value)) + .collect(); + serialize_table_body(&rows)? + } + AccessSection::UserExpirations => { + let rows: BTreeMap> = cfg + .access + .user_expirations + .iter() + .map(|(key, value)| (key.clone(), *value)) + .collect(); + serialize_table_body(&rows)? + } + AccessSection::UserDataQuota => { + let rows: BTreeMap = cfg + .access + .user_data_quota + .iter() + .map(|(key, value)| (key.clone(), *value)) + .collect(); + serialize_table_body(&rows)? + } + AccessSection::UserMaxUniqueIps => { + let rows: BTreeMap = cfg + .access + .user_max_unique_ips + .iter() + .map(|(key, value)| (key.clone(), *value)) + .collect(); + serialize_table_body(&rows)? + } + }; + + let mut out = format!("[{}]\n", section.table_name()); + if !body.is_empty() { + out.push_str(&body); + } + if !out.ends_with('\n') { + out.push('\n'); + } + Ok(out) +} + +fn serialize_table_body(value: &T) -> Result { + toml::to_string(value) + .map_err(|e| ApiFailure::internal(format!("failed to serialize access section: {}", e))) +} + +fn upsert_toml_table(source: &str, table_name: &str, replacement: &str) -> String { + if let Some((start, end)) = find_toml_table_bounds(source, table_name) { + let mut out = String::with_capacity(source.len() + replacement.len()); + out.push_str(&source[..start]); + out.push_str(replacement); + out.push_str(&source[end..]); + return out; + } + + let mut out = source.to_string(); + if !out.is_empty() && !out.ends_with('\n') { + out.push('\n'); + } + if !out.is_empty() { + out.push('\n'); + } + out.push_str(replacement); + out +} + +fn find_toml_table_bounds(source: &str, table_name: &str) -> Option<(usize, usize)> { + let target = format!("[{}]", table_name); + let mut offset = 0usize; + let mut start = None; + + for line in source.split_inclusive('\n') { + let trimmed = line.trim(); + if let Some(start_offset) = start { + if trimmed.starts_with('[') { + return Some((start_offset, offset)); + } + } else if trimmed == target { + start = Some(offset); + } + offset = offset.saturating_add(line.len()); + } + + start.map(|start_offset| (start_offset, source.len())) +} + async fn write_atomic(path: PathBuf, contents: String) -> Result<(), ApiFailure> { tokio::task::spawn_blocking(move || write_atomic_sync(&path, &contents)) .await diff --git a/src/api/mod.rs b/src/api/mod.rs index 0a51231..ff9d2f9 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -75,8 +75,7 @@ pub(super) struct ApiShared { pub(super) me_pool: Arc>>>, pub(super) upstream_manager: Arc, pub(super) config_path: PathBuf, - pub(super) startup_detected_ip_v4: Option, - pub(super) startup_detected_ip_v6: Option, + pub(super) detected_ips_rx: watch::Receiver<(Option, Option)>, pub(super) mutation_lock: Arc>, pub(super) minimal_cache: Arc>>, pub(super) runtime_edge_connections_cache: Arc>>, @@ -91,6 +90,10 @@ impl ApiShared { fn next_request_id(&self) -> u64 { self.request_id.fetch_add(1, Ordering::Relaxed) } + + fn detected_link_ips(&self) -> (Option, Option) { + *self.detected_ips_rx.borrow() + } } pub async fn serve( @@ -102,8 +105,7 @@ pub async fn serve( config_rx: watch::Receiver>, admission_rx: watch::Receiver, config_path: PathBuf, - startup_detected_ip_v4: Option, - startup_detected_ip_v6: Option, + detected_ips_rx: watch::Receiver<(Option, Option)>, process_started_at_epoch_secs: u64, startup_tracker: Arc, ) { @@ -134,8 +136,7 @@ pub async fn serve( me_pool, upstream_manager, config_path, - startup_detected_ip_v4, - startup_detected_ip_v6, + detected_ips_rx, mutation_lock: Arc::new(Mutex::new(())), minimal_cache: Arc::new(Mutex::new(None)), runtime_edge_connections_cache: Arc::new(Mutex::new(None)), @@ -356,12 +357,13 @@ async fn handle( } ("GET", "/v1/stats/users") | ("GET", "/v1/users") => { let revision = current_revision(&shared.config_path).await?; + let (detected_ip_v4, detected_ip_v6) = shared.detected_link_ips(); let users = users_from_config( &cfg, &shared.stats, &shared.ip_tracker, - shared.startup_detected_ip_v4, - shared.startup_detected_ip_v6, + detected_ip_v4, + detected_ip_v6, ) .await; Ok(success_response(StatusCode::OK, users, revision)) @@ -399,12 +401,13 @@ async fn handle( { if method == Method::GET { let revision = current_revision(&shared.config_path).await?; + let (detected_ip_v4, detected_ip_v6) = shared.detected_link_ips(); let users = users_from_config( &cfg, &shared.stats, &shared.ip_tracker, - shared.startup_detected_ip_v4, - shared.startup_detected_ip_v6, + detected_ip_v4, + detected_ip_v6, ) .await; if let Some(user_info) = users.into_iter().find(|entry| entry.username == user) diff --git a/src/api/users.rs b/src/api/users.rs index d156896..da360c7 100644 --- a/src/api/users.rs +++ b/src/api/users.rs @@ -8,7 +8,8 @@ use crate::stats::Stats; use super::ApiShared; use super::config_store::{ - ensure_expected_revision, load_config_from_disk, save_config_to_disk, + AccessSection, ensure_expected_revision, load_config_from_disk, save_access_sections_to_disk, + save_config_to_disk, }; use super::model::{ ApiFailure, CreateUserRequest, CreateUserResponse, PatchUserRequest, RotateSecretRequest, @@ -21,6 +22,12 @@ pub(super) async fn create_user( expected_revision: Option, shared: &ApiShared, ) -> Result<(CreateUserResponse, String), ApiFailure> { + let touches_user_ad_tags = body.user_ad_tag.is_some(); + let touches_user_max_tcp_conns = body.max_tcp_conns.is_some(); + let touches_user_expirations = body.expiration_rfc3339.is_some(); + let touches_user_data_quota = body.data_quota_bytes.is_some(); + let touches_user_max_unique_ips = body.max_unique_ips.is_some(); + if !is_valid_username(&body.username) { return Err(ApiFailure::bad_request( "username must match [A-Za-z0-9_.-] and be 1..64 chars", @@ -84,19 +91,37 @@ pub(super) async fn create_user( cfg.validate() .map_err(|e| ApiFailure::bad_request(format!("config validation failed: {}", e)))?; - let revision = save_config_to_disk(&shared.config_path, &cfg).await?; + let mut touched_sections = vec![AccessSection::Users]; + if touches_user_ad_tags { + touched_sections.push(AccessSection::UserAdTags); + } + if touches_user_max_tcp_conns { + touched_sections.push(AccessSection::UserMaxTcpConns); + } + if touches_user_expirations { + touched_sections.push(AccessSection::UserExpirations); + } + if touches_user_data_quota { + touched_sections.push(AccessSection::UserDataQuota); + } + if touches_user_max_unique_ips { + touched_sections.push(AccessSection::UserMaxUniqueIps); + } + + let revision = save_access_sections_to_disk(&shared.config_path, &cfg, &touched_sections).await?; drop(_guard); if let Some(limit) = updated_limit { shared.ip_tracker.set_user_limit(&body.username, limit).await; } + let (detected_ip_v4, detected_ip_v6) = shared.detected_link_ips(); let users = users_from_config( &cfg, &shared.stats, &shared.ip_tracker, - shared.startup_detected_ip_v4, - shared.startup_detected_ip_v6, + detected_ip_v4, + detected_ip_v6, ) .await; let user = users @@ -118,8 +143,8 @@ pub(super) async fn create_user( links: build_user_links( &cfg, &secret, - shared.startup_detected_ip_v4, - shared.startup_detected_ip_v6, + detected_ip_v4, + detected_ip_v6, ), }); @@ -185,12 +210,13 @@ pub(super) async fn patch_user( if let Some(limit) = updated_limit { shared.ip_tracker.set_user_limit(user, limit).await; } + let (detected_ip_v4, detected_ip_v6) = shared.detected_link_ips(); let users = users_from_config( &cfg, &shared.stats, &shared.ip_tracker, - shared.startup_detected_ip_v4, - shared.startup_detected_ip_v6, + detected_ip_v4, + detected_ip_v6, ) .await; let user_info = users @@ -229,15 +255,24 @@ pub(super) async fn rotate_secret( cfg.access.users.insert(user.to_string(), secret.clone()); cfg.validate() .map_err(|e| ApiFailure::bad_request(format!("config validation failed: {}", e)))?; - let revision = save_config_to_disk(&shared.config_path, &cfg).await?; + let touched_sections = [ + AccessSection::Users, + AccessSection::UserAdTags, + AccessSection::UserMaxTcpConns, + AccessSection::UserExpirations, + AccessSection::UserDataQuota, + AccessSection::UserMaxUniqueIps, + ]; + let revision = save_access_sections_to_disk(&shared.config_path, &cfg, &touched_sections).await?; drop(_guard); + let (detected_ip_v4, detected_ip_v6) = shared.detected_link_ips(); let users = users_from_config( &cfg, &shared.stats, &shared.ip_tracker, - shared.startup_detected_ip_v4, - shared.startup_detected_ip_v6, + detected_ip_v4, + detected_ip_v6, ) .await; let user_info = users @@ -287,7 +322,15 @@ pub(super) async fn delete_user( cfg.validate() .map_err(|e| ApiFailure::bad_request(format!("config validation failed: {}", e)))?; - let revision = save_config_to_disk(&shared.config_path, &cfg).await?; + let touched_sections = [ + AccessSection::Users, + AccessSection::UserAdTags, + AccessSection::UserMaxTcpConns, + AccessSection::UserExpirations, + AccessSection::UserDataQuota, + AccessSection::UserMaxUniqueIps, + ]; + let revision = save_access_sections_to_disk(&shared.config_path, &cfg, &touched_sections).await?; drop(_guard); shared.ip_tracker.remove_user_limit(user).await; shared.ip_tracker.clear_user_ips(user).await; @@ -418,17 +461,6 @@ fn resolve_link_hosts( return vec![host.to_string()]; } - let mut startup_hosts = Vec::new(); - if let Some(ip) = startup_detected_ip_v4 { - push_unique_host(&mut startup_hosts, &ip.to_string()); - } - if let Some(ip) = startup_detected_ip_v6 { - push_unique_host(&mut startup_hosts, &ip.to_string()); - } - if !startup_hosts.is_empty() { - return startup_hosts; - } - let mut hosts = Vec::new(); for listener in &cfg.server.listeners { if let Some(host) = listener @@ -443,24 +475,44 @@ fn resolve_link_hosts( if let Some(ip) = listener.announce_ip { if !ip.is_unspecified() { push_unique_host(&mut hosts, &ip.to_string()); + continue; + } + } + if listener.ip.is_unspecified() { + let detected_ip = if listener.ip.is_ipv4() { + startup_detected_ip_v4 + } else { + startup_detected_ip_v6 + }; + if let Some(ip) = detected_ip { + push_unique_host(&mut hosts, &ip.to_string()); + } else { + push_unique_host(&mut hosts, &listener.ip.to_string()); } continue; } - if !listener.ip.is_unspecified() { - push_unique_host(&mut hosts, &listener.ip.to_string()); - } + push_unique_host(&mut hosts, &listener.ip.to_string()); } - if hosts.is_empty() { - if let Some(host) = cfg.server.listen_addr_ipv4.as_deref() { - push_host_from_legacy_listen(&mut hosts, host); - } - if let Some(host) = cfg.server.listen_addr_ipv6.as_deref() { - push_host_from_legacy_listen(&mut hosts, host); - } + if !hosts.is_empty() { + return hosts; } - hosts + if let Some(ip) = startup_detected_ip_v4.or(startup_detected_ip_v6) { + return vec![ip.to_string()]; + } + + if let Some(host) = cfg.server.listen_addr_ipv4.as_deref() { + push_host_from_legacy_listen(&mut hosts, host); + } + if let Some(host) = cfg.server.listen_addr_ipv6.as_deref() { + push_host_from_legacy_listen(&mut hosts, host); + } + if !hosts.is_empty() { + return hosts; + } + + vec!["UNKNOWN".to_string()] } fn push_host_from_legacy_listen(hosts: &mut Vec, raw: &str) { diff --git a/src/maestro/admission.rs b/src/maestro/admission.rs index bacd4d4..69a9c9f 100644 --- a/src/maestro/admission.rs +++ b/src/maestro/admission.rs @@ -8,6 +8,9 @@ use crate::config::ProxyConfig; use crate::proxy::route_mode::{RelayRouteMode, RouteRuntimeController}; use crate::transport::middle_proxy::MePool; +const STARTUP_FALLBACK_AFTER: Duration = Duration::from_secs(80); +const RUNTIME_FALLBACK_AFTER: Duration = Duration::from_secs(6); + pub(crate) async fn configure_admission_gate( config: &Arc, me_pool: Option>, @@ -17,7 +20,6 @@ pub(crate) async fn configure_admission_gate( ) { if config.general.use_middle_proxy { if let Some(pool) = me_pool.as_ref() { - let fallback_after = Duration::from_secs(6); let initial_ready = pool.admission_ready_conditional_cast().await; admission_tx.send_replace(initial_ready); let _ = route_runtime.set_mode(RelayRouteMode::Middle); @@ -36,6 +38,7 @@ pub(crate) async fn configure_admission_gate( tokio::spawn(async move { let mut gate_open = initial_ready; let mut route_mode = RelayRouteMode::Middle; + let mut ready_observed = initial_ready; let mut not_ready_since = if initial_ready { None } else { @@ -57,11 +60,17 @@ pub(crate) async fn configure_admission_gate( let ready = pool_for_gate.admission_ready_conditional_cast().await; let now = Instant::now(); let (next_gate_open, next_route_mode, next_fallback_active) = if ready { + ready_observed = true; not_ready_since = None; (true, RelayRouteMode::Middle, false) } else { let not_ready_started_at = *not_ready_since.get_or_insert(now); let not_ready_for = now.saturating_duration_since(not_ready_started_at); + let fallback_after = if ready_observed { + RUNTIME_FALLBACK_AFTER + } else { + STARTUP_FALLBACK_AFTER + }; if fallback_enabled && not_ready_for > fallback_after { (true, RelayRouteMode::Direct, true) } else { @@ -79,6 +88,11 @@ pub(crate) async fn configure_admission_gate( "Middle-End routing restored for new sessions" ); } else { + let fallback_after = if ready_observed { + RUNTIME_FALLBACK_AFTER + } else { + STARTUP_FALLBACK_AFTER + }; warn!( target_mode = route_mode.as_str(), cutover_generation = snapshot.generation, diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 9c674cd..72fdd40 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -281,88 +281,178 @@ pub(crate) async fn initialize_me_pool( .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_POOL_INIT_STAGE1) .await; - let mut init_attempt: u32 = 0; - loop { - init_attempt = init_attempt.saturating_add(1); - startup_tracker.set_me_init_attempt(init_attempt).await; - match pool.init(pool_size, &rng).await { - Ok(()) => { - startup_tracker.set_me_last_error(None).await; - startup_tracker - .complete_component( - COMPONENT_ME_POOL_INIT_STAGE1, - Some("ME pool initialized".to_string()), - ) - .await; - startup_tracker - .set_me_status(StartupMeStatus::Ready, "ready") - .await; - info!( - attempt = init_attempt, - "Middle-End pool initialized successfully" - ); + if me2dc_fallback { + let pool_bg = pool.clone(); + let rng_bg = rng.clone(); + let startup_tracker_bg = startup_tracker.clone(); + let retry_limit = if me_init_retry_attempts == 0 { + String::from("unlimited") + } else { + me_init_retry_attempts.to_string() + }; + std::thread::spawn(move || { + let runtime = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(runtime) => runtime, + Err(error) => { + error!(error = %error, "Failed to build background runtime for ME initialization"); + return; + } + }; + runtime.block_on(async move { + let mut init_attempt: u32 = 0; + loop { + init_attempt = init_attempt.saturating_add(1); + startup_tracker_bg.set_me_init_attempt(init_attempt).await; + match pool_bg.init(pool_size, &rng_bg).await { + Ok(()) => { + startup_tracker_bg.set_me_last_error(None).await; + startup_tracker_bg + .complete_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("ME pool initialized".to_string()), + ) + .await; + startup_tracker_bg + .set_me_status(StartupMeStatus::Ready, "ready") + .await; + info!( + attempt = init_attempt, + "Middle-End pool initialized successfully" + ); - // Phase 4: Start health monitor - let pool_clone = pool.clone(); - let rng_clone = rng.clone(); - let min_conns = pool_size; - tokio::spawn(async move { - crate::transport::middle_proxy::me_health_monitor( - pool_clone, rng_clone, min_conns, - ) - .await; - }); - - break Some(pool); - } - Err(e) => { - startup_tracker.set_me_last_error(Some(e.to_string())).await; - let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; - if retries_limited && init_attempt >= me_init_retry_attempts { + let pool_health = pool_bg.clone(); + let rng_health = rng_bg.clone(); + let min_conns = pool_size; + tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + pool_health, + rng_health, + min_conns, + ) + .await; + }); + break; + } + Err(e) => { + startup_tracker_bg.set_me_last_error(Some(e.to_string())).await; + if init_attempt >= me_init_warn_after_attempts { + warn!( + error = %e, + attempt = init_attempt, + retry_limit = %retry_limit, + retry_in_secs = 2, + "ME pool is not ready yet; retrying background initialization" + ); + } else { + info!( + error = %e, + attempt = init_attempt, + retry_limit = %retry_limit, + retry_in_secs = 2, + "ME pool startup warmup: retrying background initialization" + ); + } + pool_bg.reset_stun_state(); + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } + }); + }); + startup_tracker + .set_me_status(StartupMeStatus::Initializing, "background_init") + .await; + info!( + startup_grace_secs = 80, + "ME pool initialization continues in background; startup continues with conditional Direct fallback" + ); + Some(pool) + } else { + let mut init_attempt: u32 = 0; + loop { + init_attempt = init_attempt.saturating_add(1); + startup_tracker.set_me_init_attempt(init_attempt).await; + match pool.init(pool_size, &rng).await { + Ok(()) => { + startup_tracker.set_me_last_error(None).await; startup_tracker - .fail_component( + .complete_component( COMPONENT_ME_POOL_INIT_STAGE1, - Some("ME init retry budget exhausted".to_string()), + Some("ME pool initialized".to_string()), ) .await; startup_tracker - .set_me_status(StartupMeStatus::Failed, "failed") + .set_me_status(StartupMeStatus::Ready, "ready") .await; - error!( - error = %e, - attempt = init_attempt, - retry_limit = me_init_retry_attempts, - "ME pool init retries exhausted; falling back to direct mode" - ); - break None; - } - - let retry_limit = if !me2dc_fallback || me_init_retry_attempts == 0 { - String::from("unlimited") - } else { - me_init_retry_attempts.to_string() - }; - if init_attempt >= me_init_warn_after_attempts { - warn!( - error = %e, - attempt = init_attempt, - retry_limit = retry_limit, - me2dc_fallback = me2dc_fallback, - retry_in_secs = 2, - "ME pool is not ready yet; retrying startup initialization" - ); - } else { info!( - error = %e, attempt = init_attempt, - retry_limit = retry_limit, - me2dc_fallback = me2dc_fallback, - retry_in_secs = 2, - "ME pool startup warmup: retrying initialization" + "Middle-End pool initialized successfully" ); + + let pool_clone = pool.clone(); + let rng_clone = rng.clone(); + let min_conns = pool_size; + tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + pool_clone, rng_clone, min_conns, + ) + .await; + }); + + break Some(pool); + } + Err(e) => { + startup_tracker.set_me_last_error(Some(e.to_string())).await; + let retries_limited = me_init_retry_attempts > 0; + if retries_limited && init_attempt >= me_init_retry_attempts { + startup_tracker + .fail_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("ME init retry budget exhausted".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Failed, "failed") + .await; + error!( + error = %e, + attempt = init_attempt, + retry_limit = me_init_retry_attempts, + "ME pool init retries exhausted; startup cannot continue in middle-proxy mode" + ); + break None; + } + + let retry_limit = if me_init_retry_attempts == 0 { + String::from("unlimited") + } else { + me_init_retry_attempts.to_string() + }; + if init_attempt >= me_init_warn_after_attempts { + warn!( + error = %e, + attempt = init_attempt, + retry_limit = retry_limit, + me2dc_fallback = me2dc_fallback, + retry_in_secs = 2, + "ME pool is not ready yet; retrying startup initialization" + ); + } else { + info!( + error = %e, + attempt = init_attempt, + retry_limit = retry_limit, + me2dc_fallback = me2dc_fallback, + retry_in_secs = 2, + "ME pool startup warmup: retrying initialization" + ); + } + pool.reset_stun_state(); + tokio::time::sleep(Duration::from_secs(2)).await; } - pool.reset_stun_state(); - tokio::time::sleep(Duration::from_secs(2)).await; } } } diff --git a/src/maestro/mod.rs b/src/maestro/mod.rs index 92b42e3..fcb4d3f 100644 --- a/src/maestro/mod.rs +++ b/src/maestro/mod.rs @@ -20,7 +20,7 @@ mod runtime_tasks; mod shutdown; mod tls_bootstrap; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::{RwLock, Semaphore, watch}; @@ -189,6 +189,7 @@ pub async fn run() -> std::result::Result<(), Box> { } let (api_config_tx, api_config_rx) = watch::channel(Arc::new(config.clone())); + let (detected_ips_tx, detected_ips_rx) = watch::channel((None::, None::)); let initial_admission_open = !config.general.use_middle_proxy; let (admission_tx, admission_rx) = watch::channel(initial_admission_open); let initial_route_mode = if config.general.use_middle_proxy { @@ -223,6 +224,7 @@ pub async fn run() -> std::result::Result<(), Box> { let admission_rx_api = admission_rx.clone(); let config_path_api = std::path::PathBuf::from(&config_path); let startup_tracker_api = startup_tracker.clone(); + let detected_ips_rx_api = detected_ips_rx.clone(); tokio::spawn(async move { api::serve( listen, @@ -233,8 +235,7 @@ pub async fn run() -> std::result::Result<(), Box> { config_rx_api, admission_rx_api, config_path_api, - None, - None, + detected_ips_rx_api, process_started_at_epoch_secs, startup_tracker_api, ) @@ -288,6 +289,10 @@ pub async fn run() -> std::result::Result<(), Box> { config.general.stun_nat_probe_concurrency, ) .await?; + detected_ips_tx.send_replace(( + probe.detected_ipv4.map(IpAddr::V4), + probe.detected_ipv6.map(IpAddr::V6), + )); let decision = decide_network_capabilities(&config.network, &probe); log_probe_result(&probe, &decision); startup_tracker diff --git a/src/metrics.rs b/src/metrics.rs index 917c9b3..c24dc54 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -17,6 +17,7 @@ use crate::config::ProxyConfig; use crate::ip_tracker::UserIpTracker; use crate::stats::beobachten::BeobachtenStore; use crate::stats::Stats; +use crate::transport::{ListenOptions, create_listener}; pub async fn serve( port: u16, @@ -26,16 +27,90 @@ pub async fn serve( config_rx: tokio::sync::watch::Receiver>, whitelist: Vec, ) { - let addr = SocketAddr::from(([0, 0, 0, 0], port)); - let listener = match TcpListener::bind(addr).await { - Ok(l) => l, - Err(e) => { - warn!(error = %e, "Failed to bind metrics on {}", addr); - return; - } - }; - info!("Metrics endpoint: http://{}/metrics and /beobachten", addr); + let whitelist = Arc::new(whitelist); + let mut listener_v4 = None; + let mut listener_v6 = None; + let addr_v4 = SocketAddr::from(([0, 0, 0, 0], port)); + match bind_metrics_listener(addr_v4, false) { + Ok(listener) => { + info!("Metrics endpoint: http://{}/metrics and /beobachten", addr_v4); + listener_v4 = Some(listener); + } + Err(e) => { + warn!(error = %e, "Failed to bind metrics on {}", addr_v4); + } + } + + let addr_v6 = SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 0], port)); + match bind_metrics_listener(addr_v6, true) { + Ok(listener) => { + info!("Metrics endpoint: http://[::]:{}/metrics and /beobachten", port); + listener_v6 = Some(listener); + } + Err(e) => { + warn!(error = %e, "Failed to bind metrics on {}", addr_v6); + } + } + + match (listener_v4, listener_v6) { + (None, None) => { + warn!("Metrics listener is unavailable on both IPv4 and IPv6"); + } + (Some(listener), None) | (None, Some(listener)) => { + serve_listener( + listener, stats, beobachten, ip_tracker, config_rx, whitelist, + ) + .await; + } + (Some(listener4), Some(listener6)) => { + let stats_v6 = stats.clone(); + let beobachten_v6 = beobachten.clone(); + let ip_tracker_v6 = ip_tracker.clone(); + let config_rx_v6 = config_rx.clone(); + let whitelist_v6 = whitelist.clone(); + tokio::spawn(async move { + serve_listener( + listener6, + stats_v6, + beobachten_v6, + ip_tracker_v6, + config_rx_v6, + whitelist_v6, + ) + .await; + }); + serve_listener( + listener4, + stats, + beobachten, + ip_tracker, + config_rx, + whitelist, + ) + .await; + } + } +} + +fn bind_metrics_listener(addr: SocketAddr, ipv6_only: bool) -> std::io::Result { + let options = ListenOptions { + reuse_port: false, + ipv6_only, + ..Default::default() + }; + let socket = create_listener(addr, &options)?; + TcpListener::from_std(socket.into()) +} + +async fn serve_listener( + listener: TcpListener, + stats: Arc, + beobachten: Arc, + ip_tracker: Arc, + config_rx: tokio::sync::watch::Receiver>, + whitelist: Arc>, +) { loop { let (stream, peer) = match listener.accept().await { Ok(v) => v,