API ME Pool Status

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-04 02:45:32 +03:00
parent 4c2bc2f41f
commit 7ea6387278
No known key found for this signature in database
1 changed files with 187 additions and 0 deletions

View File

@ -1,8 +1,11 @@
use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::time::Instant;
use super::pool::{MePool, WriterContour}; use super::pool::{MePool, WriterContour};
use crate::config::{MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy};
use crate::transport::upstream::IpPreference;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) struct MeApiWriterStatusSnapshot { pub(crate) struct MeApiWriterStatusSnapshot {
@ -45,6 +48,57 @@ pub(crate) struct MeApiStatusSnapshot {
pub dcs: Vec<MeApiDcStatusSnapshot>, pub dcs: Vec<MeApiDcStatusSnapshot>,
} }
#[derive(Clone, Debug)]
pub(crate) struct MeApiQuarantinedEndpointSnapshot {
pub endpoint: SocketAddr,
pub remaining_ms: u64,
}
#[derive(Clone, Debug)]
pub(crate) struct MeApiDcPathSnapshot {
pub dc: i16,
pub ip_preference: Option<&'static str>,
pub selected_addr_v4: Option<SocketAddr>,
pub selected_addr_v6: Option<SocketAddr>,
}
#[derive(Clone, Debug)]
pub(crate) struct MeApiRuntimeSnapshot {
pub active_generation: u64,
pub warm_generation: u64,
pub pending_hardswap_generation: u64,
pub pending_hardswap_age_secs: Option<u64>,
pub hardswap_enabled: bool,
pub floor_mode: &'static str,
pub adaptive_floor_idle_secs: u64,
pub adaptive_floor_min_writers_single_endpoint: u8,
pub adaptive_floor_recover_grace_secs: u64,
pub me_keepalive_enabled: bool,
pub me_keepalive_interval_secs: u64,
pub me_keepalive_jitter_secs: u64,
pub me_keepalive_payload_random: bool,
pub rpc_proxy_req_every_secs: u64,
pub me_reconnect_max_concurrent_per_dc: u32,
pub me_reconnect_backoff_base_ms: u64,
pub me_reconnect_backoff_cap_ms: u64,
pub me_reconnect_fast_retry_count: u32,
pub me_pool_drain_ttl_secs: u64,
pub me_pool_force_close_secs: u64,
pub me_pool_min_fresh_ratio: f32,
pub me_bind_stale_mode: &'static str,
pub me_bind_stale_ttl_secs: u64,
pub me_single_endpoint_shadow_writers: u8,
pub me_single_endpoint_outage_mode_enabled: bool,
pub me_single_endpoint_outage_disable_quarantine: bool,
pub me_single_endpoint_outage_backoff_min_ms: u64,
pub me_single_endpoint_outage_backoff_max_ms: u64,
pub me_single_endpoint_shadow_rotate_every_secs: u64,
pub me_deterministic_writer_sort: bool,
pub me_socks_kdf_policy: &'static str,
pub quarantined_endpoints: Vec<MeApiQuarantinedEndpointSnapshot>,
pub network_path: Vec<MeApiDcPathSnapshot>,
}
impl MePool { impl MePool {
pub(crate) async fn api_status_snapshot(&self) -> MeApiStatusSnapshot { pub(crate) async fn api_status_snapshot(&self) -> MeApiStatusSnapshot {
let now_epoch_secs = Self::now_epoch_secs(); let now_epoch_secs = Self::now_epoch_secs();
@ -206,6 +260,107 @@ impl MePool {
dcs, dcs,
} }
} }
pub(crate) async fn api_runtime_snapshot(&self) -> MeApiRuntimeSnapshot {
let now = Instant::now();
let now_epoch_secs = Self::now_epoch_secs();
let pending_started_at = self
.pending_hardswap_started_at_epoch_secs
.load(Ordering::Relaxed);
let pending_hardswap_age_secs = (pending_started_at > 0)
.then_some(now_epoch_secs.saturating_sub(pending_started_at));
let mut quarantined_endpoints = Vec::<MeApiQuarantinedEndpointSnapshot>::new();
{
let guard = self.endpoint_quarantine.lock().await;
for (endpoint, expires_at) in guard.iter() {
if *expires_at <= now {
continue;
}
let remaining_ms = expires_at.duration_since(now).as_millis() as u64;
quarantined_endpoints.push(MeApiQuarantinedEndpointSnapshot {
endpoint: *endpoint,
remaining_ms,
});
}
}
quarantined_endpoints.sort_by_key(|entry| entry.endpoint);
let mut network_path = Vec::<MeApiDcPathSnapshot>::new();
if let Some(upstream) = &self.upstream {
for dc in 1..=5 {
let dc_idx = dc as i16;
let ip_preference = upstream
.get_dc_ip_preference(dc_idx)
.await
.map(ip_preference_label);
let selected_addr_v4 = upstream.get_dc_addr(dc_idx, false).await;
let selected_addr_v6 = upstream.get_dc_addr(dc_idx, true).await;
network_path.push(MeApiDcPathSnapshot {
dc: dc_idx,
ip_preference,
selected_addr_v4,
selected_addr_v6,
});
}
}
MeApiRuntimeSnapshot {
active_generation: self.active_generation.load(Ordering::Relaxed),
warm_generation: self.warm_generation.load(Ordering::Relaxed),
pending_hardswap_generation: self.pending_hardswap_generation.load(Ordering::Relaxed),
pending_hardswap_age_secs,
hardswap_enabled: self.hardswap.load(Ordering::Relaxed),
floor_mode: floor_mode_label(self.floor_mode()),
adaptive_floor_idle_secs: self.me_adaptive_floor_idle_secs.load(Ordering::Relaxed),
adaptive_floor_min_writers_single_endpoint: self
.me_adaptive_floor_min_writers_single_endpoint
.load(Ordering::Relaxed),
adaptive_floor_recover_grace_secs: self
.me_adaptive_floor_recover_grace_secs
.load(Ordering::Relaxed),
me_keepalive_enabled: self.me_keepalive_enabled,
me_keepalive_interval_secs: self.me_keepalive_interval.as_secs(),
me_keepalive_jitter_secs: self.me_keepalive_jitter.as_secs(),
me_keepalive_payload_random: self.me_keepalive_payload_random,
rpc_proxy_req_every_secs: self.rpc_proxy_req_every_secs.load(Ordering::Relaxed),
me_reconnect_max_concurrent_per_dc: self.me_reconnect_max_concurrent_per_dc,
me_reconnect_backoff_base_ms: self.me_reconnect_backoff_base.as_millis() as u64,
me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64,
me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count,
me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed),
me_pool_force_close_secs: self.me_pool_force_close_secs.load(Ordering::Relaxed),
me_pool_min_fresh_ratio: Self::permille_to_ratio(
self.me_pool_min_fresh_ratio_permille.load(Ordering::Relaxed),
),
me_bind_stale_mode: bind_stale_mode_label(self.bind_stale_mode()),
me_bind_stale_ttl_secs: self.me_bind_stale_ttl_secs.load(Ordering::Relaxed),
me_single_endpoint_shadow_writers: self
.me_single_endpoint_shadow_writers
.load(Ordering::Relaxed),
me_single_endpoint_outage_mode_enabled: self
.me_single_endpoint_outage_mode_enabled
.load(Ordering::Relaxed),
me_single_endpoint_outage_disable_quarantine: self
.me_single_endpoint_outage_disable_quarantine
.load(Ordering::Relaxed),
me_single_endpoint_outage_backoff_min_ms: self
.me_single_endpoint_outage_backoff_min_ms
.load(Ordering::Relaxed),
me_single_endpoint_outage_backoff_max_ms: self
.me_single_endpoint_outage_backoff_max_ms
.load(Ordering::Relaxed),
me_single_endpoint_shadow_rotate_every_secs: self
.me_single_endpoint_shadow_rotate_every_secs
.load(Ordering::Relaxed),
me_deterministic_writer_sort: self
.me_deterministic_writer_sort
.load(Ordering::Relaxed),
me_socks_kdf_policy: socks_kdf_policy_label(self.socks_kdf_policy()),
quarantined_endpoints,
network_path,
}
}
} }
fn ratio_pct(part: usize, total: usize) -> f64 { fn ratio_pct(part: usize, total: usize) -> f64 {
@ -216,6 +371,38 @@ fn ratio_pct(part: usize, total: usize) -> f64 {
pct.clamp(0.0, 100.0) pct.clamp(0.0, 100.0)
} }
fn floor_mode_label(mode: MeFloorMode) -> &'static str {
match mode {
MeFloorMode::Static => "static",
MeFloorMode::Adaptive => "adaptive",
}
}
fn bind_stale_mode_label(mode: MeBindStaleMode) -> &'static str {
match mode {
MeBindStaleMode::Never => "never",
MeBindStaleMode::Ttl => "ttl",
MeBindStaleMode::Always => "always",
}
}
fn socks_kdf_policy_label(policy: MeSocksKdfPolicy) -> &'static str {
match policy {
MeSocksKdfPolicy::Strict => "strict",
MeSocksKdfPolicy::Compat => "compat",
}
}
fn ip_preference_label(preference: IpPreference) -> &'static str {
match preference {
IpPreference::Unknown => "unknown",
IpPreference::PreferV6 => "prefer_v6",
IpPreference::PreferV4 => "prefer_v4",
IpPreference::BothWork => "both",
IpPreference::Unavailable => "unavailable",
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::ratio_pct; use super::ratio_pct;