API UpstreamManager

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-04 11:41:41 +03:00
parent 5df2fe9f97
commit de2047adf2
No known key found for this signature in database
5 changed files with 270 additions and 28 deletions

View File

@ -20,6 +20,7 @@ use crate::config::ProxyConfig;
use crate::ip_tracker::UserIpTracker; use crate::ip_tracker::UserIpTracker;
use crate::stats::Stats; use crate::stats::Stats;
use crate::transport::middle_proxy::MePool; use crate::transport::middle_proxy::MePool;
use crate::transport::UpstreamManager;
mod config_store; mod config_store;
mod model; mod model;
@ -33,7 +34,7 @@ use model::{
}; };
use runtime_stats::{ use runtime_stats::{
MinimalCacheEntry, build_dcs_data, build_me_writers_data, build_minimal_all_data, MinimalCacheEntry, build_dcs_data, build_me_writers_data, build_minimal_all_data,
build_zero_all_data, build_upstreams_data, build_zero_all_data,
}; };
use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config}; use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config};
@ -42,6 +43,7 @@ pub(super) struct ApiShared {
pub(super) stats: Arc<Stats>, pub(super) stats: Arc<Stats>,
pub(super) ip_tracker: Arc<UserIpTracker>, pub(super) ip_tracker: Arc<UserIpTracker>,
pub(super) me_pool: Option<Arc<MePool>>, pub(super) me_pool: Option<Arc<MePool>>,
pub(super) upstream_manager: Arc<UpstreamManager>,
pub(super) config_path: PathBuf, pub(super) config_path: PathBuf,
pub(super) startup_detected_ip_v4: Option<IpAddr>, pub(super) startup_detected_ip_v4: Option<IpAddr>,
pub(super) startup_detected_ip_v6: Option<IpAddr>, pub(super) startup_detected_ip_v6: Option<IpAddr>,
@ -61,6 +63,7 @@ pub async fn serve(
stats: Arc<Stats>, stats: Arc<Stats>,
ip_tracker: Arc<UserIpTracker>, ip_tracker: Arc<UserIpTracker>,
me_pool: Option<Arc<MePool>>, me_pool: Option<Arc<MePool>>,
upstream_manager: Arc<UpstreamManager>,
config_rx: watch::Receiver<Arc<ProxyConfig>>, config_rx: watch::Receiver<Arc<ProxyConfig>>,
config_path: PathBuf, config_path: PathBuf,
startup_detected_ip_v4: Option<IpAddr>, startup_detected_ip_v4: Option<IpAddr>,
@ -84,6 +87,7 @@ pub async fn serve(
stats, stats,
ip_tracker, ip_tracker,
me_pool, me_pool,
upstream_manager,
config_path, config_path,
startup_detected_ip_v4, startup_detected_ip_v4,
startup_detected_ip_v6, startup_detected_ip_v6,
@ -201,6 +205,11 @@ async fn handle(
let data = build_zero_all_data(&shared.stats, cfg.access.users.len()); let data = build_zero_all_data(&shared.stats, cfg.access.users.len());
Ok(success_response(StatusCode::OK, data, revision)) Ok(success_response(StatusCode::OK, data, revision))
} }
("GET", "/v1/stats/upstreams") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_upstreams_data(shared.as_ref(), api_cfg);
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/stats/minimal/all") => { ("GET", "/v1/stats/minimal/all") => {
let revision = current_revision(&shared.config_path).await?; let revision = current_revision(&shared.config_path).await?;
let data = build_minimal_all_data(shared.as_ref(), api_cfg).await; let data = build_minimal_all_data(shared.as_ref(), api_cfg).await;

View File

@ -103,6 +103,50 @@ pub(super) struct ZeroUpstreamData {
pub(super) connect_duration_fail_bucket_gt_1000ms: u64, pub(super) connect_duration_fail_bucket_gt_1000ms: u64,
} }
#[derive(Serialize, Clone)]
pub(super) struct UpstreamDcStatus {
pub(super) dc: i16,
pub(super) latency_ema_ms: Option<f64>,
pub(super) ip_preference: &'static str,
}
#[derive(Serialize, Clone)]
pub(super) struct UpstreamStatus {
pub(super) upstream_id: usize,
pub(super) route_kind: &'static str,
pub(super) address: String,
pub(super) weight: u16,
pub(super) scopes: String,
pub(super) healthy: bool,
pub(super) fails: u32,
pub(super) last_check_age_secs: u64,
pub(super) effective_latency_ms: Option<f64>,
pub(super) dc: Vec<UpstreamDcStatus>,
}
#[derive(Serialize, Clone)]
pub(super) struct UpstreamSummaryData {
pub(super) configured_total: usize,
pub(super) healthy_total: usize,
pub(super) unhealthy_total: usize,
pub(super) direct_total: usize,
pub(super) socks4_total: usize,
pub(super) socks5_total: usize,
}
#[derive(Serialize, Clone)]
pub(super) struct UpstreamsData {
pub(super) enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) reason: Option<&'static str>,
pub(super) generated_at_epoch_secs: u64,
pub(super) zero: ZeroUpstreamData,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) summary: Option<UpstreamSummaryData>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) upstreams: Option<Vec<UpstreamStatus>>,
}
#[derive(Serialize, Clone)] #[derive(Serialize, Clone)]
pub(super) struct ZeroMiddleProxyData { pub(super) struct ZeroMiddleProxyData {
pub(super) keepalive_sent_total: u64, pub(super) keepalive_sent_total: u64,

View File

@ -2,12 +2,15 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use crate::config::ApiConfig; use crate::config::ApiConfig;
use crate::stats::Stats; use crate::stats::Stats;
use crate::transport::upstream::IpPreference;
use crate::transport::UpstreamRouteKind;
use super::ApiShared; use super::ApiShared;
use super::model::{ use super::model::{
DcStatus, DcStatusData, MeWriterStatus, MeWritersData, MeWritersSummary, MinimalAllData, DcStatus, DcStatusData, MeWriterStatus, MeWritersData, MeWritersSummary, MinimalAllData,
MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData, MinimalQuarantineData, MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData, MinimalQuarantineData,
ZeroAllData, ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData, UpstreamDcStatus, UpstreamStatus, UpstreamSummaryData, UpstreamsData, ZeroAllData,
ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData,
ZeroUpstreamData, ZeroUpstreamData,
}; };
@ -41,32 +44,7 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
telemetry_user_enabled: telemetry.user_enabled, telemetry_user_enabled: telemetry.user_enabled,
telemetry_me_level: telemetry.me_level.to_string(), telemetry_me_level: telemetry.me_level.to_string(),
}, },
upstream: ZeroUpstreamData { upstream: build_zero_upstream_data(stats),
connect_attempt_total: stats.get_upstream_connect_attempt_total(),
connect_success_total: stats.get_upstream_connect_success_total(),
connect_fail_total: stats.get_upstream_connect_fail_total(),
connect_failfast_hard_error_total: stats.get_upstream_connect_failfast_hard_error_total(),
connect_attempts_bucket_1: stats.get_upstream_connect_attempts_bucket_1(),
connect_attempts_bucket_2: stats.get_upstream_connect_attempts_bucket_2(),
connect_attempts_bucket_3_4: stats.get_upstream_connect_attempts_bucket_3_4(),
connect_attempts_bucket_gt_4: stats.get_upstream_connect_attempts_bucket_gt_4(),
connect_duration_success_bucket_le_100ms: stats
.get_upstream_connect_duration_success_bucket_le_100ms(),
connect_duration_success_bucket_101_500ms: stats
.get_upstream_connect_duration_success_bucket_101_500ms(),
connect_duration_success_bucket_501_1000ms: stats
.get_upstream_connect_duration_success_bucket_501_1000ms(),
connect_duration_success_bucket_gt_1000ms: stats
.get_upstream_connect_duration_success_bucket_gt_1000ms(),
connect_duration_fail_bucket_le_100ms: stats
.get_upstream_connect_duration_fail_bucket_le_100ms(),
connect_duration_fail_bucket_101_500ms: stats
.get_upstream_connect_duration_fail_bucket_101_500ms(),
connect_duration_fail_bucket_501_1000ms: stats
.get_upstream_connect_duration_fail_bucket_501_1000ms(),
connect_duration_fail_bucket_gt_1000ms: stats
.get_upstream_connect_duration_fail_bucket_gt_1000ms(),
},
middle_proxy: ZeroMiddleProxyData { middle_proxy: ZeroMiddleProxyData {
keepalive_sent_total: stats.get_me_keepalive_sent(), keepalive_sent_total: stats.get_me_keepalive_sent(),
keepalive_failed_total: stats.get_me_keepalive_failed(), keepalive_failed_total: stats.get_me_keepalive_failed(),
@ -140,6 +118,102 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
} }
} }
fn build_zero_upstream_data(stats: &Stats) -> ZeroUpstreamData {
ZeroUpstreamData {
connect_attempt_total: stats.get_upstream_connect_attempt_total(),
connect_success_total: stats.get_upstream_connect_success_total(),
connect_fail_total: stats.get_upstream_connect_fail_total(),
connect_failfast_hard_error_total: stats.get_upstream_connect_failfast_hard_error_total(),
connect_attempts_bucket_1: stats.get_upstream_connect_attempts_bucket_1(),
connect_attempts_bucket_2: stats.get_upstream_connect_attempts_bucket_2(),
connect_attempts_bucket_3_4: stats.get_upstream_connect_attempts_bucket_3_4(),
connect_attempts_bucket_gt_4: stats.get_upstream_connect_attempts_bucket_gt_4(),
connect_duration_success_bucket_le_100ms: stats
.get_upstream_connect_duration_success_bucket_le_100ms(),
connect_duration_success_bucket_101_500ms: stats
.get_upstream_connect_duration_success_bucket_101_500ms(),
connect_duration_success_bucket_501_1000ms: stats
.get_upstream_connect_duration_success_bucket_501_1000ms(),
connect_duration_success_bucket_gt_1000ms: stats
.get_upstream_connect_duration_success_bucket_gt_1000ms(),
connect_duration_fail_bucket_le_100ms: stats.get_upstream_connect_duration_fail_bucket_le_100ms(),
connect_duration_fail_bucket_101_500ms: stats
.get_upstream_connect_duration_fail_bucket_101_500ms(),
connect_duration_fail_bucket_501_1000ms: stats
.get_upstream_connect_duration_fail_bucket_501_1000ms(),
connect_duration_fail_bucket_gt_1000ms: stats
.get_upstream_connect_duration_fail_bucket_gt_1000ms(),
}
}
pub(super) fn build_upstreams_data(shared: &ApiShared, api_cfg: &ApiConfig) -> UpstreamsData {
let generated_at_epoch_secs = now_epoch_secs();
let zero = build_zero_upstream_data(&shared.stats);
if !api_cfg.minimal_runtime_enabled {
return UpstreamsData {
enabled: false,
reason: Some(FEATURE_DISABLED_REASON),
generated_at_epoch_secs,
zero,
summary: None,
upstreams: None,
};
}
let Some(snapshot) = shared.upstream_manager.try_api_snapshot() else {
return UpstreamsData {
enabled: true,
reason: Some(SOURCE_UNAVAILABLE_REASON),
generated_at_epoch_secs,
zero,
summary: None,
upstreams: None,
};
};
let summary = UpstreamSummaryData {
configured_total: snapshot.summary.configured_total,
healthy_total: snapshot.summary.healthy_total,
unhealthy_total: snapshot.summary.unhealthy_total,
direct_total: snapshot.summary.direct_total,
socks4_total: snapshot.summary.socks4_total,
socks5_total: snapshot.summary.socks5_total,
};
let upstreams = snapshot
.upstreams
.into_iter()
.map(|upstream| UpstreamStatus {
upstream_id: upstream.upstream_id,
route_kind: map_route_kind(upstream.route_kind),
address: upstream.address,
weight: upstream.weight,
scopes: upstream.scopes,
healthy: upstream.healthy,
fails: upstream.fails,
last_check_age_secs: upstream.last_check_age_secs,
effective_latency_ms: upstream.effective_latency_ms,
dc: upstream
.dc
.into_iter()
.map(|dc| UpstreamDcStatus {
dc: dc.dc,
latency_ema_ms: dc.latency_ema_ms,
ip_preference: map_ip_preference(dc.ip_preference),
})
.collect(),
})
.collect();
UpstreamsData {
enabled: true,
reason: None,
generated_at_epoch_secs,
zero,
summary: Some(summary),
upstreams: Some(upstreams),
}
}
pub(super) async fn build_minimal_all_data( pub(super) async fn build_minimal_all_data(
shared: &ApiShared, shared: &ApiShared,
api_cfg: &ApiConfig, api_cfg: &ApiConfig,
@ -384,6 +458,24 @@ fn disabled_dcs(now_epoch_secs: u64, reason: &'static str) -> DcStatusData {
} }
} }
fn map_route_kind(value: UpstreamRouteKind) -> &'static str {
match value {
UpstreamRouteKind::Direct => "direct",
UpstreamRouteKind::Socks4 => "socks4",
UpstreamRouteKind::Socks5 => "socks5",
}
}
fn map_ip_preference(value: IpPreference) -> &'static str {
match value {
IpPreference::Unknown => "unknown",
IpPreference::PreferV6 => "prefer_v6",
IpPreference::PreferV4 => "prefer_v4",
IpPreference::BothWork => "both_work",
IpPreference::Unavailable => "unavailable",
}
}
fn now_epoch_secs() -> u64 { fn now_epoch_secs() -> u64 {
SystemTime::now() SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)

View File

@ -1169,6 +1169,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let stats = stats.clone(); let stats = stats.clone();
let ip_tracker_api = ip_tracker.clone(); let ip_tracker_api = ip_tracker.clone();
let me_pool_api = me_pool.clone(); let me_pool_api = me_pool.clone();
let upstream_manager_api = upstream_manager.clone();
let config_rx_api = config_rx.clone(); let config_rx_api = config_rx.clone();
let config_path_api = std::path::PathBuf::from(&config_path); let config_path_api = std::path::PathBuf::from(&config_path);
let startup_detected_ip_v4 = detected_ip_v4; let startup_detected_ip_v4 = detected_ip_v4;
@ -1179,6 +1180,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
stats, stats,
ip_tracker_api, ip_tracker_api,
me_pool_api, me_pool_api,
upstream_manager_api,
config_rx_api, config_rx_api,
config_path_api, config_path_api,
startup_detected_ip_v4, startup_detected_ip_v4,

View File

@ -165,6 +165,43 @@ pub enum UpstreamRouteKind {
Socks5, Socks5,
} }
#[derive(Debug, Clone)]
pub struct UpstreamApiDcSnapshot {
pub dc: i16,
pub latency_ema_ms: Option<f64>,
pub ip_preference: IpPreference,
}
#[derive(Debug, Clone)]
pub struct UpstreamApiItemSnapshot {
pub upstream_id: usize,
pub route_kind: UpstreamRouteKind,
pub address: String,
pub weight: u16,
pub scopes: String,
pub healthy: bool,
pub fails: u32,
pub last_check_age_secs: u64,
pub effective_latency_ms: Option<f64>,
pub dc: Vec<UpstreamApiDcSnapshot>,
}
#[derive(Debug, Clone, Default)]
pub struct UpstreamApiSummarySnapshot {
pub configured_total: usize,
pub healthy_total: usize,
pub unhealthy_total: usize,
pub direct_total: usize,
pub socks4_total: usize,
pub socks5_total: usize,
}
#[derive(Debug, Clone)]
pub struct UpstreamApiSnapshot {
pub summary: UpstreamApiSummarySnapshot,
pub upstreams: Vec<UpstreamApiItemSnapshot>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct UpstreamEgressInfo { pub struct UpstreamEgressInfo {
pub route_kind: UpstreamRouteKind, pub route_kind: UpstreamRouteKind,
@ -217,6 +254,64 @@ impl UpstreamManager {
} }
} }
pub fn try_api_snapshot(&self) -> Option<UpstreamApiSnapshot> {
let guard = self.upstreams.try_read().ok()?;
let now = std::time::Instant::now();
let mut summary = UpstreamApiSummarySnapshot {
configured_total: guard.len(),
..UpstreamApiSummarySnapshot::default()
};
let mut upstreams = Vec::with_capacity(guard.len());
for (idx, upstream) in guard.iter().enumerate() {
if upstream.healthy {
summary.healthy_total += 1;
} else {
summary.unhealthy_total += 1;
}
let (route_kind, address) = match &upstream.config.upstream_type {
UpstreamType::Direct { .. } => {
summary.direct_total += 1;
(UpstreamRouteKind::Direct, "direct".to_string())
}
UpstreamType::Socks4 { address, .. } => {
summary.socks4_total += 1;
(UpstreamRouteKind::Socks4, address.clone())
}
UpstreamType::Socks5 { address, .. } => {
summary.socks5_total += 1;
(UpstreamRouteKind::Socks5, address.clone())
}
};
let mut dc = Vec::with_capacity(NUM_DCS);
for dc_idx in 0..NUM_DCS {
dc.push(UpstreamApiDcSnapshot {
dc: (dc_idx + 1) as i16,
latency_ema_ms: upstream.dc_latency[dc_idx].get(),
ip_preference: upstream.dc_ip_pref[dc_idx],
});
}
upstreams.push(UpstreamApiItemSnapshot {
upstream_id: idx,
route_kind,
address,
weight: upstream.config.weight,
scopes: upstream.config.scopes.clone(),
healthy: upstream.healthy,
fails: upstream.fails,
last_check_age_secs: now.saturating_duration_since(upstream.last_check).as_secs(),
effective_latency_ms: upstream.effective_latency(None),
dc,
});
}
Some(UpstreamApiSnapshot { summary, upstreams })
}
#[cfg(unix)] #[cfg(unix)]
fn resolve_interface_addrs(name: &str, want_ipv6: bool) -> Vec<IpAddr> { fn resolve_interface_addrs(name: &str, want_ipv6: bool) -> Vec<IpAddr> {
use nix::ifaddrs::getifaddrs; use nix::ifaddrs::getifaddrs;