User Management in API

This commit is contained in:
Alexey 2026-03-10 00:02:39 +03:00
parent 100ef0fa28
commit 161af51558
No known key found for this signature in database
5 changed files with 457 additions and 84 deletions

View File

@ -1,13 +1,39 @@
use std::collections::BTreeMap;
use std::io::Write; use std::io::Write;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use hyper::header::IF_MATCH; use hyper::header::IF_MATCH;
use serde::Serialize;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
use super::model::ApiFailure; use super::model::ApiFailure;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(super) enum AccessSection {
Users,
UserAdTags,
UserMaxTcpConns,
UserExpirations,
UserDataQuota,
UserMaxUniqueIps,
}
impl AccessSection {
fn table_name(self) -> &'static str {
match self {
Self::Users => "access.users",
Self::UserAdTags => "access.user_ad_tags",
Self::UserMaxTcpConns => "access.user_max_tcp_conns",
Self::UserExpirations => "access.user_expirations",
Self::UserDataQuota => "access.user_data_quota",
Self::UserMaxUniqueIps => "access.user_max_unique_ips",
}
}
}
pub(super) fn parse_if_match(headers: &hyper::HeaderMap) -> Option<String> { pub(super) fn parse_if_match(headers: &hyper::HeaderMap) -> Option<String> {
headers headers
.get(IF_MATCH) .get(IF_MATCH)
@ -66,6 +92,142 @@ pub(super) async fn save_config_to_disk(
Ok(compute_revision(&serialized)) Ok(compute_revision(&serialized))
} }
pub(super) async fn save_access_sections_to_disk(
config_path: &Path,
cfg: &ProxyConfig,
sections: &[AccessSection],
) -> Result<String, ApiFailure> {
let mut content = tokio::fs::read_to_string(config_path)
.await
.map_err(|e| ApiFailure::internal(format!("failed to read config: {}", e)))?;
let mut applied = Vec::new();
for section in sections {
if applied.contains(section) {
continue;
}
let rendered = render_access_section(cfg, *section)?;
content = upsert_toml_table(&content, section.table_name(), &rendered);
applied.push(*section);
}
write_atomic(config_path.to_path_buf(), content.clone()).await?;
Ok(compute_revision(&content))
}
fn render_access_section(cfg: &ProxyConfig, section: AccessSection) -> Result<String, ApiFailure> {
let body = match section {
AccessSection::Users => {
let rows: BTreeMap<String, String> = cfg
.access
.users
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect();
serialize_table_body(&rows)?
}
AccessSection::UserAdTags => {
let rows: BTreeMap<String, String> = cfg
.access
.user_ad_tags
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect();
serialize_table_body(&rows)?
}
AccessSection::UserMaxTcpConns => {
let rows: BTreeMap<String, usize> = cfg
.access
.user_max_tcp_conns
.iter()
.map(|(key, value)| (key.clone(), *value))
.collect();
serialize_table_body(&rows)?
}
AccessSection::UserExpirations => {
let rows: BTreeMap<String, DateTime<Utc>> = cfg
.access
.user_expirations
.iter()
.map(|(key, value)| (key.clone(), *value))
.collect();
serialize_table_body(&rows)?
}
AccessSection::UserDataQuota => {
let rows: BTreeMap<String, u64> = cfg
.access
.user_data_quota
.iter()
.map(|(key, value)| (key.clone(), *value))
.collect();
serialize_table_body(&rows)?
}
AccessSection::UserMaxUniqueIps => {
let rows: BTreeMap<String, usize> = cfg
.access
.user_max_unique_ips
.iter()
.map(|(key, value)| (key.clone(), *value))
.collect();
serialize_table_body(&rows)?
}
};
let mut out = format!("[{}]\n", section.table_name());
if !body.is_empty() {
out.push_str(&body);
}
if !out.ends_with('\n') {
out.push('\n');
}
Ok(out)
}
fn serialize_table_body<T: Serialize>(value: &T) -> Result<String, ApiFailure> {
toml::to_string(value)
.map_err(|e| ApiFailure::internal(format!("failed to serialize access section: {}", e)))
}
fn upsert_toml_table(source: &str, table_name: &str, replacement: &str) -> String {
if let Some((start, end)) = find_toml_table_bounds(source, table_name) {
let mut out = String::with_capacity(source.len() + replacement.len());
out.push_str(&source[..start]);
out.push_str(replacement);
out.push_str(&source[end..]);
return out;
}
let mut out = source.to_string();
if !out.is_empty() && !out.ends_with('\n') {
out.push('\n');
}
if !out.is_empty() {
out.push('\n');
}
out.push_str(replacement);
out
}
fn find_toml_table_bounds(source: &str, table_name: &str) -> Option<(usize, usize)> {
let target = format!("[{}]", table_name);
let mut offset = 0usize;
let mut start = None;
for line in source.split_inclusive('\n') {
let trimmed = line.trim();
if let Some(start_offset) = start {
if trimmed.starts_with('[') {
return Some((start_offset, offset));
}
} else if trimmed == target {
start = Some(offset);
}
offset = offset.saturating_add(line.len());
}
start.map(|start_offset| (start_offset, source.len()))
}
async fn write_atomic(path: PathBuf, contents: String) -> Result<(), ApiFailure> { async fn write_atomic(path: PathBuf, contents: String) -> Result<(), ApiFailure> {
tokio::task::spawn_blocking(move || write_atomic_sync(&path, &contents)) tokio::task::spawn_blocking(move || write_atomic_sync(&path, &contents))
.await .await

View File

@ -8,7 +8,8 @@ use crate::stats::Stats;
use super::ApiShared; use super::ApiShared;
use super::config_store::{ use super::config_store::{
ensure_expected_revision, load_config_from_disk, save_config_to_disk, AccessSection, ensure_expected_revision, load_config_from_disk, save_access_sections_to_disk,
save_config_to_disk,
}; };
use super::model::{ use super::model::{
ApiFailure, CreateUserRequest, CreateUserResponse, PatchUserRequest, RotateSecretRequest, ApiFailure, CreateUserRequest, CreateUserResponse, PatchUserRequest, RotateSecretRequest,
@ -21,6 +22,12 @@ pub(super) async fn create_user(
expected_revision: Option<String>, expected_revision: Option<String>,
shared: &ApiShared, shared: &ApiShared,
) -> Result<(CreateUserResponse, String), ApiFailure> { ) -> Result<(CreateUserResponse, String), ApiFailure> {
let touches_user_ad_tags = body.user_ad_tag.is_some();
let touches_user_max_tcp_conns = body.max_tcp_conns.is_some();
let touches_user_expirations = body.expiration_rfc3339.is_some();
let touches_user_data_quota = body.data_quota_bytes.is_some();
let touches_user_max_unique_ips = body.max_unique_ips.is_some();
if !is_valid_username(&body.username) { if !is_valid_username(&body.username) {
return Err(ApiFailure::bad_request( return Err(ApiFailure::bad_request(
"username must match [A-Za-z0-9_.-] and be 1..64 chars", "username must match [A-Za-z0-9_.-] and be 1..64 chars",
@ -84,7 +91,24 @@ pub(super) async fn create_user(
cfg.validate() cfg.validate()
.map_err(|e| ApiFailure::bad_request(format!("config validation failed: {}", e)))?; .map_err(|e| ApiFailure::bad_request(format!("config validation failed: {}", e)))?;
let revision = save_config_to_disk(&shared.config_path, &cfg).await?; let mut touched_sections = vec![AccessSection::Users];
if touches_user_ad_tags {
touched_sections.push(AccessSection::UserAdTags);
}
if touches_user_max_tcp_conns {
touched_sections.push(AccessSection::UserMaxTcpConns);
}
if touches_user_expirations {
touched_sections.push(AccessSection::UserExpirations);
}
if touches_user_data_quota {
touched_sections.push(AccessSection::UserDataQuota);
}
if touches_user_max_unique_ips {
touched_sections.push(AccessSection::UserMaxUniqueIps);
}
let revision = save_access_sections_to_disk(&shared.config_path, &cfg, &touched_sections).await?;
drop(_guard); drop(_guard);
if let Some(limit) = updated_limit { if let Some(limit) = updated_limit {
@ -231,7 +255,15 @@ pub(super) async fn rotate_secret(
cfg.access.users.insert(user.to_string(), secret.clone()); cfg.access.users.insert(user.to_string(), secret.clone());
cfg.validate() cfg.validate()
.map_err(|e| ApiFailure::bad_request(format!("config validation failed: {}", e)))?; .map_err(|e| ApiFailure::bad_request(format!("config validation failed: {}", e)))?;
let revision = save_config_to_disk(&shared.config_path, &cfg).await?; let touched_sections = [
AccessSection::Users,
AccessSection::UserAdTags,
AccessSection::UserMaxTcpConns,
AccessSection::UserExpirations,
AccessSection::UserDataQuota,
AccessSection::UserMaxUniqueIps,
];
let revision = save_access_sections_to_disk(&shared.config_path, &cfg, &touched_sections).await?;
drop(_guard); drop(_guard);
let (detected_ip_v4, detected_ip_v6) = shared.detected_link_ips(); let (detected_ip_v4, detected_ip_v6) = shared.detected_link_ips();

View File

@ -8,6 +8,9 @@ use crate::config::ProxyConfig;
use crate::proxy::route_mode::{RelayRouteMode, RouteRuntimeController}; use crate::proxy::route_mode::{RelayRouteMode, RouteRuntimeController};
use crate::transport::middle_proxy::MePool; use crate::transport::middle_proxy::MePool;
const STARTUP_FALLBACK_AFTER: Duration = Duration::from_secs(80);
const RUNTIME_FALLBACK_AFTER: Duration = Duration::from_secs(6);
pub(crate) async fn configure_admission_gate( pub(crate) async fn configure_admission_gate(
config: &Arc<ProxyConfig>, config: &Arc<ProxyConfig>,
me_pool: Option<Arc<MePool>>, me_pool: Option<Arc<MePool>>,
@ -17,7 +20,6 @@ pub(crate) async fn configure_admission_gate(
) { ) {
if config.general.use_middle_proxy { if config.general.use_middle_proxy {
if let Some(pool) = me_pool.as_ref() { if let Some(pool) = me_pool.as_ref() {
let fallback_after = Duration::from_secs(6);
let initial_ready = pool.admission_ready_conditional_cast().await; let initial_ready = pool.admission_ready_conditional_cast().await;
admission_tx.send_replace(initial_ready); admission_tx.send_replace(initial_ready);
let _ = route_runtime.set_mode(RelayRouteMode::Middle); let _ = route_runtime.set_mode(RelayRouteMode::Middle);
@ -36,6 +38,7 @@ pub(crate) async fn configure_admission_gate(
tokio::spawn(async move { tokio::spawn(async move {
let mut gate_open = initial_ready; let mut gate_open = initial_ready;
let mut route_mode = RelayRouteMode::Middle; let mut route_mode = RelayRouteMode::Middle;
let mut ready_observed = initial_ready;
let mut not_ready_since = if initial_ready { let mut not_ready_since = if initial_ready {
None None
} else { } else {
@ -57,11 +60,17 @@ pub(crate) async fn configure_admission_gate(
let ready = pool_for_gate.admission_ready_conditional_cast().await; let ready = pool_for_gate.admission_ready_conditional_cast().await;
let now = Instant::now(); let now = Instant::now();
let (next_gate_open, next_route_mode, next_fallback_active) = if ready { let (next_gate_open, next_route_mode, next_fallback_active) = if ready {
ready_observed = true;
not_ready_since = None; not_ready_since = None;
(true, RelayRouteMode::Middle, false) (true, RelayRouteMode::Middle, false)
} else { } else {
let not_ready_started_at = *not_ready_since.get_or_insert(now); let not_ready_started_at = *not_ready_since.get_or_insert(now);
let not_ready_for = now.saturating_duration_since(not_ready_started_at); let not_ready_for = now.saturating_duration_since(not_ready_started_at);
let fallback_after = if ready_observed {
RUNTIME_FALLBACK_AFTER
} else {
STARTUP_FALLBACK_AFTER
};
if fallback_enabled && not_ready_for > fallback_after { if fallback_enabled && not_ready_for > fallback_after {
(true, RelayRouteMode::Direct, true) (true, RelayRouteMode::Direct, true)
} else { } else {
@ -79,6 +88,11 @@ pub(crate) async fn configure_admission_gate(
"Middle-End routing restored for new sessions" "Middle-End routing restored for new sessions"
); );
} else { } else {
let fallback_after = if ready_observed {
RUNTIME_FALLBACK_AFTER
} else {
STARTUP_FALLBACK_AFTER
};
warn!( warn!(
target_mode = route_mode.as_str(), target_mode = route_mode.as_str(),
cutover_generation = snapshot.generation, cutover_generation = snapshot.generation,

View File

@ -281,88 +281,178 @@ pub(crate) async fn initialize_me_pool(
.set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_POOL_INIT_STAGE1) .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_POOL_INIT_STAGE1)
.await; .await;
let mut init_attempt: u32 = 0; if me2dc_fallback {
loop { let pool_bg = pool.clone();
init_attempt = init_attempt.saturating_add(1); let rng_bg = rng.clone();
startup_tracker.set_me_init_attempt(init_attempt).await; let startup_tracker_bg = startup_tracker.clone();
match pool.init(pool_size, &rng).await { let retry_limit = if me_init_retry_attempts == 0 {
Ok(()) => { String::from("unlimited")
startup_tracker.set_me_last_error(None).await; } else {
startup_tracker me_init_retry_attempts.to_string()
.complete_component( };
COMPONENT_ME_POOL_INIT_STAGE1, std::thread::spawn(move || {
Some("ME pool initialized".to_string()), let runtime = match tokio::runtime::Builder::new_current_thread()
) .enable_all()
.await; .build()
startup_tracker {
.set_me_status(StartupMeStatus::Ready, "ready") Ok(runtime) => runtime,
.await; Err(error) => {
info!( error!(error = %error, "Failed to build background runtime for ME initialization");
attempt = init_attempt, return;
"Middle-End pool initialized successfully" }
); };
runtime.block_on(async move {
let mut init_attempt: u32 = 0;
loop {
init_attempt = init_attempt.saturating_add(1);
startup_tracker_bg.set_me_init_attempt(init_attempt).await;
match pool_bg.init(pool_size, &rng_bg).await {
Ok(()) => {
startup_tracker_bg.set_me_last_error(None).await;
startup_tracker_bg
.complete_component(
COMPONENT_ME_POOL_INIT_STAGE1,
Some("ME pool initialized".to_string()),
)
.await;
startup_tracker_bg
.set_me_status(StartupMeStatus::Ready, "ready")
.await;
info!(
attempt = init_attempt,
"Middle-End pool initialized successfully"
);
// Phase 4: Start health monitor let pool_health = pool_bg.clone();
let pool_clone = pool.clone(); let rng_health = rng_bg.clone();
let rng_clone = rng.clone(); let min_conns = pool_size;
let min_conns = pool_size; tokio::spawn(async move {
tokio::spawn(async move { crate::transport::middle_proxy::me_health_monitor(
crate::transport::middle_proxy::me_health_monitor( pool_health,
pool_clone, rng_clone, min_conns, rng_health,
) min_conns,
.await; )
}); .await;
});
break Some(pool); break;
} }
Err(e) => { Err(e) => {
startup_tracker.set_me_last_error(Some(e.to_string())).await; startup_tracker_bg.set_me_last_error(Some(e.to_string())).await;
let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; if init_attempt >= me_init_warn_after_attempts {
if retries_limited && init_attempt >= me_init_retry_attempts { warn!(
error = %e,
attempt = init_attempt,
retry_limit = %retry_limit,
retry_in_secs = 2,
"ME pool is not ready yet; retrying background initialization"
);
} else {
info!(
error = %e,
attempt = init_attempt,
retry_limit = %retry_limit,
retry_in_secs = 2,
"ME pool startup warmup: retrying background initialization"
);
}
pool_bg.reset_stun_state();
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
});
});
startup_tracker
.set_me_status(StartupMeStatus::Initializing, "background_init")
.await;
info!(
startup_grace_secs = 80,
"ME pool initialization continues in background; startup continues with conditional Direct fallback"
);
Some(pool)
} else {
let mut init_attempt: u32 = 0;
loop {
init_attempt = init_attempt.saturating_add(1);
startup_tracker.set_me_init_attempt(init_attempt).await;
match pool.init(pool_size, &rng).await {
Ok(()) => {
startup_tracker.set_me_last_error(None).await;
startup_tracker startup_tracker
.fail_component( .complete_component(
COMPONENT_ME_POOL_INIT_STAGE1, COMPONENT_ME_POOL_INIT_STAGE1,
Some("ME init retry budget exhausted".to_string()), Some("ME pool initialized".to_string()),
) )
.await; .await;
startup_tracker startup_tracker
.set_me_status(StartupMeStatus::Failed, "failed") .set_me_status(StartupMeStatus::Ready, "ready")
.await; .await;
error!(
error = %e,
attempt = init_attempt,
retry_limit = me_init_retry_attempts,
"ME pool init retries exhausted; falling back to direct mode"
);
break None;
}
let retry_limit = if !me2dc_fallback || me_init_retry_attempts == 0 {
String::from("unlimited")
} else {
me_init_retry_attempts.to_string()
};
if init_attempt >= me_init_warn_after_attempts {
warn!(
error = %e,
attempt = init_attempt,
retry_limit = retry_limit,
me2dc_fallback = me2dc_fallback,
retry_in_secs = 2,
"ME pool is not ready yet; retrying startup initialization"
);
} else {
info!( info!(
error = %e,
attempt = init_attempt, attempt = init_attempt,
retry_limit = retry_limit, "Middle-End pool initialized successfully"
me2dc_fallback = me2dc_fallback,
retry_in_secs = 2,
"ME pool startup warmup: retrying initialization"
); );
let pool_clone = pool.clone();
let rng_clone = rng.clone();
let min_conns = pool_size;
tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
pool_clone, rng_clone, min_conns,
)
.await;
});
break Some(pool);
}
Err(e) => {
startup_tracker.set_me_last_error(Some(e.to_string())).await;
let retries_limited = me_init_retry_attempts > 0;
if retries_limited && init_attempt >= me_init_retry_attempts {
startup_tracker
.fail_component(
COMPONENT_ME_POOL_INIT_STAGE1,
Some("ME init retry budget exhausted".to_string()),
)
.await;
startup_tracker
.set_me_status(StartupMeStatus::Failed, "failed")
.await;
error!(
error = %e,
attempt = init_attempt,
retry_limit = me_init_retry_attempts,
"ME pool init retries exhausted; startup cannot continue in middle-proxy mode"
);
break None;
}
let retry_limit = if me_init_retry_attempts == 0 {
String::from("unlimited")
} else {
me_init_retry_attempts.to_string()
};
if init_attempt >= me_init_warn_after_attempts {
warn!(
error = %e,
attempt = init_attempt,
retry_limit = retry_limit,
me2dc_fallback = me2dc_fallback,
retry_in_secs = 2,
"ME pool is not ready yet; retrying startup initialization"
);
} else {
info!(
error = %e,
attempt = init_attempt,
retry_limit = retry_limit,
me2dc_fallback = me2dc_fallback,
retry_in_secs = 2,
"ME pool startup warmup: retrying initialization"
);
}
pool.reset_stun_state();
tokio::time::sleep(Duration::from_secs(2)).await;
} }
pool.reset_stun_state();
tokio::time::sleep(Duration::from_secs(2)).await;
} }
} }
} }

View File

@ -17,6 +17,7 @@ use crate::config::ProxyConfig;
use crate::ip_tracker::UserIpTracker; use crate::ip_tracker::UserIpTracker;
use crate::stats::beobachten::BeobachtenStore; use crate::stats::beobachten::BeobachtenStore;
use crate::stats::Stats; use crate::stats::Stats;
use crate::transport::{ListenOptions, create_listener};
pub async fn serve( pub async fn serve(
port: u16, port: u16,
@ -26,16 +27,90 @@ pub async fn serve(
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>, config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
whitelist: Vec<IpNetwork>, whitelist: Vec<IpNetwork>,
) { ) {
let addr = SocketAddr::from(([0, 0, 0, 0], port)); let whitelist = Arc::new(whitelist);
let listener = match TcpListener::bind(addr).await { let mut listener_v4 = None;
Ok(l) => l, let mut listener_v6 = None;
Err(e) => {
warn!(error = %e, "Failed to bind metrics on {}", addr);
return;
}
};
info!("Metrics endpoint: http://{}/metrics and /beobachten", addr);
let addr_v4 = SocketAddr::from(([0, 0, 0, 0], port));
match bind_metrics_listener(addr_v4, false) {
Ok(listener) => {
info!("Metrics endpoint: http://{}/metrics and /beobachten", addr_v4);
listener_v4 = Some(listener);
}
Err(e) => {
warn!(error = %e, "Failed to bind metrics on {}", addr_v4);
}
}
let addr_v6 = SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 0], port));
match bind_metrics_listener(addr_v6, true) {
Ok(listener) => {
info!("Metrics endpoint: http://[::]:{}/metrics and /beobachten", port);
listener_v6 = Some(listener);
}
Err(e) => {
warn!(error = %e, "Failed to bind metrics on {}", addr_v6);
}
}
match (listener_v4, listener_v6) {
(None, None) => {
warn!("Metrics listener is unavailable on both IPv4 and IPv6");
}
(Some(listener), None) | (None, Some(listener)) => {
serve_listener(
listener, stats, beobachten, ip_tracker, config_rx, whitelist,
)
.await;
}
(Some(listener4), Some(listener6)) => {
let stats_v6 = stats.clone();
let beobachten_v6 = beobachten.clone();
let ip_tracker_v6 = ip_tracker.clone();
let config_rx_v6 = config_rx.clone();
let whitelist_v6 = whitelist.clone();
tokio::spawn(async move {
serve_listener(
listener6,
stats_v6,
beobachten_v6,
ip_tracker_v6,
config_rx_v6,
whitelist_v6,
)
.await;
});
serve_listener(
listener4,
stats,
beobachten,
ip_tracker,
config_rx,
whitelist,
)
.await;
}
}
}
fn bind_metrics_listener(addr: SocketAddr, ipv6_only: bool) -> std::io::Result<TcpListener> {
let options = ListenOptions {
reuse_port: false,
ipv6_only,
..Default::default()
};
let socket = create_listener(addr, &options)?;
TcpListener::from_std(socket.into())
}
async fn serve_listener(
listener: TcpListener,
stats: Arc<Stats>,
beobachten: Arc<BeobachtenStore>,
ip_tracker: Arc<UserIpTracker>,
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
whitelist: Arc<Vec<IpNetwork>>,
) {
loop { loop {
let (stream, peer) = match listener.accept().await { let (stream, peer) = match listener.accept().await {
Ok(v) => v, Ok(v) => v,