Merge pull request #338 from telemt/flow-api

API Zero + API Docs
This commit is contained in:
Alexey 2026-03-06 13:08:12 +03:00 committed by GitHub
commit dd12997744
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 410 additions and 2 deletions

View File

@ -76,6 +76,10 @@ Notes:
| Method | Path | Body | Success | `data` contract |
| --- | --- | --- | --- | --- |
| `GET` | `/v1/health` | none | `200` | `HealthData` |
| `GET` | `/v1/system/info` | none | `200` | `SystemInfoData` |
| `GET` | `/v1/runtime/gates` | none | `200` | `RuntimeGatesData` |
| `GET` | `/v1/limits/effective` | none | `200` | `EffectiveLimitsData` |
| `GET` | `/v1/security/posture` | none | `200` | `SecurityPostureData` |
| `GET` | `/v1/stats/summary` | none | `200` | `SummaryData` |
| `GET` | `/v1/stats/zero/all` | none | `200` | `ZeroAllData` |
| `GET` | `/v1/stats/upstreams` | none | `200` | `UpstreamsData` |
@ -176,6 +180,94 @@ Note: the request contract is defined, but the corresponding route currently ret
| `handshake_timeouts_total` | `u64` | Handshake timeout count. |
| `configured_users` | `usize` | Number of configured users in config. |
### `SystemInfoData`
| Field | Type | Description |
| --- | --- | --- |
| `version` | `string` | Binary version (`CARGO_PKG_VERSION`). |
| `target_arch` | `string` | Target architecture (`std::env::consts::ARCH`). |
| `target_os` | `string` | Target OS (`std::env::consts::OS`). |
| `build_profile` | `string` | Build profile (`PROFILE` env when available). |
| `git_commit` | `string?` | Optional commit hash from build env metadata. |
| `build_time_utc` | `string?` | Optional build timestamp from build env metadata. |
| `rustc_version` | `string?` | Optional compiler version from build env metadata. |
| `process_started_at_epoch_secs` | `u64` | Process start time as Unix epoch seconds. |
| `uptime_seconds` | `f64` | Process uptime in seconds. |
| `config_path` | `string` | Active config file path used by runtime. |
| `config_hash` | `string` | SHA-256 hash of current config content (same value as envelope `revision`). |
| `config_reload_count` | `u64` | Number of successfully observed config updates since process start. |
| `last_config_reload_epoch_secs` | `u64?` | Unix epoch seconds of the latest observed config reload; null/absent before first reload. |
### `RuntimeGatesData`
| Field | Type | Description |
| --- | --- | --- |
| `accepting_new_connections` | `bool` | Current admission-gate state for new listener accepts. |
| `conditional_cast_enabled` | `bool` | Whether conditional ME admission logic is enabled (`general.use_middle_proxy`). |
| `me_runtime_ready` | `bool` | Current ME runtime readiness status used for conditional gate decisions. |
| `me2dc_fallback_enabled` | `bool` | Whether ME -> direct fallback is enabled. |
| `use_middle_proxy` | `bool` | Current transport mode preference. |
### `EffectiveLimitsData`
| Field | Type | Description |
| --- | --- | --- |
| `update_every_secs` | `u64` | Effective unified updater interval. |
| `me_reinit_every_secs` | `u64` | Effective ME periodic reinit interval. |
| `me_pool_force_close_secs` | `u64` | Effective stale-writer force-close timeout. |
| `timeouts` | `EffectiveTimeoutLimits` | Effective timeout policy snapshot. |
| `upstream` | `EffectiveUpstreamLimits` | Effective upstream connect/retry limits. |
| `middle_proxy` | `EffectiveMiddleProxyLimits` | Effective ME pool/floor/reconnect limits. |
| `user_ip_policy` | `EffectiveUserIpPolicyLimits` | Effective unique-IP policy mode/window. |
#### `EffectiveTimeoutLimits`
| Field | Type | Description |
| --- | --- | --- |
| `client_handshake_secs` | `u64` | Client handshake timeout. |
| `tg_connect_secs` | `u64` | Upstream Telegram connect timeout. |
| `client_keepalive_secs` | `u64` | Client keepalive interval. |
| `client_ack_secs` | `u64` | ACK timeout. |
| `me_one_retry` | `u8` | Fast retry count for single-endpoint ME DC. |
| `me_one_timeout_ms` | `u64` | Fast retry timeout per attempt for single-endpoint ME DC. |
#### `EffectiveUpstreamLimits`
| Field | Type | Description |
| --- | --- | --- |
| `connect_retry_attempts` | `u32` | Upstream connect retry attempts. |
| `connect_retry_backoff_ms` | `u64` | Upstream retry backoff delay. |
| `connect_budget_ms` | `u64` | Total connect wall-clock budget across retries. |
| `unhealthy_fail_threshold` | `u32` | Consecutive fail threshold for unhealthy marking. |
| `connect_failfast_hard_errors` | `bool` | Whether hard errors skip additional retries. |
#### `EffectiveMiddleProxyLimits`
| Field | Type | Description |
| --- | --- | --- |
| `floor_mode` | `string` | Effective floor mode (`static` or `adaptive`). |
| `adaptive_floor_idle_secs` | `u64` | Adaptive floor idle threshold. |
| `adaptive_floor_min_writers_single_endpoint` | `u8` | Adaptive floor minimum for single-endpoint DCs. |
| `adaptive_floor_recover_grace_secs` | `u64` | Adaptive floor recovery grace period. |
| `reconnect_max_concurrent_per_dc` | `u32` | Max concurrent reconnects per DC. |
| `reconnect_backoff_base_ms` | `u64` | Reconnect base backoff. |
| `reconnect_backoff_cap_ms` | `u64` | Reconnect backoff cap. |
| `reconnect_fast_retry_count` | `u32` | Number of fast retries before standard backoff strategy. |
| `me2dc_fallback` | `bool` | Effective ME -> direct fallback flag. |
#### `EffectiveUserIpPolicyLimits`
| Field | Type | Description |
| --- | --- | --- |
| `mode` | `string` | Unique-IP policy mode (`active_window`, `time_window`, `combined`). |
| `window_secs` | `u64` | Time window length used by unique-IP policy. |
### `SecurityPostureData`
| Field | Type | Description |
| --- | --- | --- |
| `api_read_only` | `bool` | Current API read-only state. |
| `api_whitelist_enabled` | `bool` | Whether whitelist filtering is active. |
| `api_whitelist_entries` | `usize` | Number of configured whitelist CIDRs. |
| `api_auth_header_enabled` | `bool` | Whether `Authorization` header validation is active. |
| `proxy_protocol_enabled` | `bool` | Global PROXY protocol accept setting. |
| `log_level` | `string` | Effective log level (`debug`, `verbose`, `normal`, `silent`). |
| `telemetry_core_enabled` | `bool` | Core telemetry toggle. |
| `telemetry_user_enabled` | `bool` | Per-user telemetry toggle. |
| `telemetry_me_level` | `string` | ME telemetry level (`silent`, `normal`, `debug`). |
### `ZeroAllData`
| Field | Type | Description |
| --- | --- | --- |

View File

@ -2,7 +2,8 @@ use std::convert::Infallible;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use http_body_util::{BodyExt, Full};
use hyper::body::{Bytes, Incoming};
@ -25,6 +26,7 @@ use crate::transport::UpstreamManager;
mod config_store;
mod model;
mod runtime_stats;
mod runtime_zero;
mod users;
use config_store::{current_revision, parse_if_match};
@ -36,8 +38,19 @@ use runtime_stats::{
MinimalCacheEntry, build_dcs_data, build_me_writers_data, build_minimal_all_data,
build_upstreams_data, build_zero_all_data,
};
use runtime_zero::{
build_limits_effective_data, build_runtime_gates_data, build_security_posture_data,
build_system_info_data,
};
use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config};
pub(super) struct ApiRuntimeState {
pub(super) process_started_at_epoch_secs: u64,
pub(super) config_reload_count: AtomicU64,
pub(super) last_config_reload_epoch_secs: AtomicU64,
pub(super) admission_open: AtomicBool,
}
#[derive(Clone)]
pub(super) struct ApiShared {
pub(super) stats: Arc<Stats>,
@ -50,6 +63,7 @@ pub(super) struct ApiShared {
pub(super) mutation_lock: Arc<Mutex<()>>,
pub(super) minimal_cache: Arc<Mutex<Option<MinimalCacheEntry>>>,
pub(super) request_id: Arc<AtomicU64>,
pub(super) runtime_state: Arc<ApiRuntimeState>,
}
impl ApiShared {
@ -65,9 +79,11 @@ pub async fn serve(
me_pool: Option<Arc<MePool>>,
upstream_manager: Arc<UpstreamManager>,
config_rx: watch::Receiver<Arc<ProxyConfig>>,
admission_rx: watch::Receiver<bool>,
config_path: PathBuf,
startup_detected_ip_v4: Option<IpAddr>,
startup_detected_ip_v6: Option<IpAddr>,
process_started_at_epoch_secs: u64,
) {
let listener = match TcpListener::bind(listen).await {
Ok(listener) => listener,
@ -83,6 +99,13 @@ pub async fn serve(
info!("API endpoint: http://{}/v1/*", listen);
let runtime_state = Arc::new(ApiRuntimeState {
process_started_at_epoch_secs,
config_reload_count: AtomicU64::new(0),
last_config_reload_epoch_secs: AtomicU64::new(0),
admission_open: AtomicBool::new(*admission_rx.borrow()),
});
let shared = Arc::new(ApiShared {
stats,
ip_tracker,
@ -94,6 +117,38 @@ pub async fn serve(
mutation_lock: Arc::new(Mutex::new(())),
minimal_cache: Arc::new(Mutex::new(None)),
request_id: Arc::new(AtomicU64::new(1)),
runtime_state: runtime_state.clone(),
});
let mut config_rx_reload = config_rx.clone();
let runtime_state_reload = runtime_state.clone();
tokio::spawn(async move {
loop {
if config_rx_reload.changed().await.is_err() {
break;
}
runtime_state_reload
.config_reload_count
.fetch_add(1, Ordering::Relaxed);
runtime_state_reload
.last_config_reload_epoch_secs
.store(now_epoch_secs(), Ordering::Relaxed);
}
});
let mut admission_rx_watch = admission_rx.clone();
tokio::spawn(async move {
runtime_state
.admission_open
.store(*admission_rx_watch.borrow(), Ordering::Relaxed);
loop {
if admission_rx_watch.changed().await.is_err() {
break;
}
runtime_state
.admission_open
.store(*admission_rx_watch.borrow(), Ordering::Relaxed);
}
});
loop {
@ -189,6 +244,26 @@ async fn handle(
};
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/system/info") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_system_info_data(shared.as_ref(), cfg.as_ref(), &revision);
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/runtime/gates") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_runtime_gates_data(shared.as_ref(), cfg.as_ref());
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/limits/effective") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_limits_effective_data(cfg.as_ref());
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/security/posture") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_security_posture_data(cfg.as_ref());
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/stats/summary") => {
let revision = current_revision(&shared.config_path).await?;
let data = SummaryData {
@ -441,3 +516,10 @@ async fn read_body_with_limit(body: Incoming, limit: usize) -> Result<Vec<u8>, A
}
Ok(collected)
}
fn now_epoch_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}

227
src/api/runtime_zero.rs Normal file
View File

@ -0,0 +1,227 @@
use std::sync::atomic::Ordering;
use serde::Serialize;
use crate::config::{MeFloorMode, ProxyConfig, UserMaxUniqueIpsMode};
use super::ApiShared;
#[derive(Serialize)]
pub(super) struct SystemInfoData {
pub(super) version: String,
pub(super) target_arch: String,
pub(super) target_os: String,
pub(super) build_profile: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) git_commit: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) build_time_utc: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) rustc_version: Option<String>,
pub(super) process_started_at_epoch_secs: u64,
pub(super) uptime_seconds: f64,
pub(super) config_path: String,
pub(super) config_hash: String,
pub(super) config_reload_count: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) last_config_reload_epoch_secs: Option<u64>,
}
#[derive(Serialize)]
pub(super) struct RuntimeGatesData {
pub(super) accepting_new_connections: bool,
pub(super) conditional_cast_enabled: bool,
pub(super) me_runtime_ready: bool,
pub(super) me2dc_fallback_enabled: bool,
pub(super) use_middle_proxy: bool,
}
#[derive(Serialize)]
pub(super) struct EffectiveTimeoutLimits {
pub(super) client_handshake_secs: u64,
pub(super) tg_connect_secs: u64,
pub(super) client_keepalive_secs: u64,
pub(super) client_ack_secs: u64,
pub(super) me_one_retry: u8,
pub(super) me_one_timeout_ms: u64,
}
#[derive(Serialize)]
pub(super) struct EffectiveUpstreamLimits {
pub(super) connect_retry_attempts: u32,
pub(super) connect_retry_backoff_ms: u64,
pub(super) connect_budget_ms: u64,
pub(super) unhealthy_fail_threshold: u32,
pub(super) connect_failfast_hard_errors: bool,
}
#[derive(Serialize)]
pub(super) struct EffectiveMiddleProxyLimits {
pub(super) floor_mode: &'static str,
pub(super) adaptive_floor_idle_secs: u64,
pub(super) adaptive_floor_min_writers_single_endpoint: u8,
pub(super) adaptive_floor_recover_grace_secs: u64,
pub(super) reconnect_max_concurrent_per_dc: u32,
pub(super) reconnect_backoff_base_ms: u64,
pub(super) reconnect_backoff_cap_ms: u64,
pub(super) reconnect_fast_retry_count: u32,
pub(super) me2dc_fallback: bool,
}
#[derive(Serialize)]
pub(super) struct EffectiveUserIpPolicyLimits {
pub(super) mode: &'static str,
pub(super) window_secs: u64,
}
#[derive(Serialize)]
pub(super) struct EffectiveLimitsData {
pub(super) update_every_secs: u64,
pub(super) me_reinit_every_secs: u64,
pub(super) me_pool_force_close_secs: u64,
pub(super) timeouts: EffectiveTimeoutLimits,
pub(super) upstream: EffectiveUpstreamLimits,
pub(super) middle_proxy: EffectiveMiddleProxyLimits,
pub(super) user_ip_policy: EffectiveUserIpPolicyLimits,
}
#[derive(Serialize)]
pub(super) struct SecurityPostureData {
pub(super) api_read_only: bool,
pub(super) api_whitelist_enabled: bool,
pub(super) api_whitelist_entries: usize,
pub(super) api_auth_header_enabled: bool,
pub(super) proxy_protocol_enabled: bool,
pub(super) log_level: String,
pub(super) telemetry_core_enabled: bool,
pub(super) telemetry_user_enabled: bool,
pub(super) telemetry_me_level: String,
}
pub(super) fn build_system_info_data(
shared: &ApiShared,
_cfg: &ProxyConfig,
revision: &str,
) -> SystemInfoData {
let last_reload_epoch_secs = shared
.runtime_state
.last_config_reload_epoch_secs
.load(Ordering::Relaxed);
let last_config_reload_epoch_secs = (last_reload_epoch_secs > 0).then_some(last_reload_epoch_secs);
let git_commit = option_env!("TELEMT_GIT_COMMIT")
.or(option_env!("VERGEN_GIT_SHA"))
.or(option_env!("GIT_COMMIT"))
.map(ToString::to_string);
let build_time_utc = option_env!("BUILD_TIME_UTC")
.or(option_env!("VERGEN_BUILD_TIMESTAMP"))
.map(ToString::to_string);
let rustc_version = option_env!("RUSTC_VERSION")
.or(option_env!("VERGEN_RUSTC_SEMVER"))
.map(ToString::to_string);
SystemInfoData {
version: env!("CARGO_PKG_VERSION").to_string(),
target_arch: std::env::consts::ARCH.to_string(),
target_os: std::env::consts::OS.to_string(),
build_profile: option_env!("PROFILE").unwrap_or("unknown").to_string(),
git_commit,
build_time_utc,
rustc_version,
process_started_at_epoch_secs: shared.runtime_state.process_started_at_epoch_secs,
uptime_seconds: shared.stats.uptime_secs(),
config_path: shared.config_path.display().to_string(),
config_hash: revision.to_string(),
config_reload_count: shared.runtime_state.config_reload_count.load(Ordering::Relaxed),
last_config_reload_epoch_secs,
}
}
pub(super) fn build_runtime_gates_data(shared: &ApiShared, cfg: &ProxyConfig) -> RuntimeGatesData {
let me_runtime_ready = if !cfg.general.use_middle_proxy {
true
} else {
shared
.me_pool
.as_ref()
.map(|pool| pool.is_runtime_ready())
.unwrap_or(false)
};
RuntimeGatesData {
accepting_new_connections: shared.runtime_state.admission_open.load(Ordering::Relaxed),
conditional_cast_enabled: cfg.general.use_middle_proxy,
me_runtime_ready,
me2dc_fallback_enabled: cfg.general.me2dc_fallback,
use_middle_proxy: cfg.general.use_middle_proxy,
}
}
pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsData {
EffectiveLimitsData {
update_every_secs: cfg.general.effective_update_every_secs(),
me_reinit_every_secs: cfg.general.effective_me_reinit_every_secs(),
me_pool_force_close_secs: cfg.general.effective_me_pool_force_close_secs(),
timeouts: EffectiveTimeoutLimits {
client_handshake_secs: cfg.timeouts.client_handshake,
tg_connect_secs: cfg.timeouts.tg_connect,
client_keepalive_secs: cfg.timeouts.client_keepalive,
client_ack_secs: cfg.timeouts.client_ack,
me_one_retry: cfg.timeouts.me_one_retry,
me_one_timeout_ms: cfg.timeouts.me_one_timeout_ms,
},
upstream: EffectiveUpstreamLimits {
connect_retry_attempts: cfg.general.upstream_connect_retry_attempts,
connect_retry_backoff_ms: cfg.general.upstream_connect_retry_backoff_ms,
connect_budget_ms: cfg.general.upstream_connect_budget_ms,
unhealthy_fail_threshold: cfg.general.upstream_unhealthy_fail_threshold,
connect_failfast_hard_errors: cfg.general.upstream_connect_failfast_hard_errors,
},
middle_proxy: EffectiveMiddleProxyLimits {
floor_mode: me_floor_mode_label(cfg.general.me_floor_mode),
adaptive_floor_idle_secs: cfg.general.me_adaptive_floor_idle_secs,
adaptive_floor_min_writers_single_endpoint: cfg
.general
.me_adaptive_floor_min_writers_single_endpoint,
adaptive_floor_recover_grace_secs: cfg.general.me_adaptive_floor_recover_grace_secs,
reconnect_max_concurrent_per_dc: cfg.general.me_reconnect_max_concurrent_per_dc,
reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms,
reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms,
reconnect_fast_retry_count: cfg.general.me_reconnect_fast_retry_count,
me2dc_fallback: cfg.general.me2dc_fallback,
},
user_ip_policy: EffectiveUserIpPolicyLimits {
mode: user_max_unique_ips_mode_label(cfg.access.user_max_unique_ips_mode),
window_secs: cfg.access.user_max_unique_ips_window_secs,
},
}
}
pub(super) fn build_security_posture_data(cfg: &ProxyConfig) -> SecurityPostureData {
SecurityPostureData {
api_read_only: cfg.server.api.read_only,
api_whitelist_enabled: !cfg.server.api.whitelist.is_empty(),
api_whitelist_entries: cfg.server.api.whitelist.len(),
api_auth_header_enabled: !cfg.server.api.auth_header.is_empty(),
proxy_protocol_enabled: cfg.server.proxy_protocol,
log_level: cfg.general.log_level.to_string(),
telemetry_core_enabled: cfg.general.telemetry.core_enabled,
telemetry_user_enabled: cfg.general.telemetry.user_enabled,
telemetry_me_level: cfg.general.telemetry.me_level.to_string(),
}
}
fn user_max_unique_ips_mode_label(mode: UserMaxUniqueIpsMode) -> &'static str {
match mode {
UserMaxUniqueIpsMode::ActiveWindow => "active_window",
UserMaxUniqueIpsMode::TimeWindow => "time_window",
UserMaxUniqueIpsMode::Combined => "combined",
}
}
fn me_floor_mode_label(mode: MeFloorMode) -> &'static str {
match mode {
MeFloorMode::Static => "static",
MeFloorMode::Adaptive => "adaptive",
}
}

View File

@ -4,7 +4,7 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use rand::Rng;
use tokio::net::TcpListener;
use tokio::signal;
@ -369,6 +369,10 @@ async fn load_startup_proxy_config_snapshot(
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let process_started_at = Instant::now();
let process_started_at_epoch_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let (config_path, cli_silent, cli_log_level) = parse_cli();
let mut config = match ProxyConfig::load(&config_path) {
@ -1556,6 +1560,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me_pool_api = me_pool.clone();
let upstream_manager_api = upstream_manager.clone();
let config_rx_api = config_rx.clone();
let admission_rx_api = admission_rx.clone();
let config_path_api = std::path::PathBuf::from(&config_path);
let startup_detected_ip_v4 = detected_ip_v4;
let startup_detected_ip_v6 = detected_ip_v6;
@ -1567,9 +1572,11 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
me_pool_api,
upstream_manager_api,
config_rx_api,
admission_rx_api,
config_path_api,
startup_detected_ip_v4,
startup_detected_ip_v6,
process_started_at_epoch_secs,
)
.await;
});