From b3510aa8b87b399b941e4e4a2d510578d3b93c7a Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 9 May 2026 16:29:30 +0300 Subject: [PATCH] Bound HTTP API+Metrics Connection Admission --- src/api/mod.rs | 44 ++++++++++++++++++++++++++++++++++++++------ src/metrics.rs | 39 +++++++++++++++++++++++++++++++++++---- 2 files changed, 73 insertions(+), 10 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index 8087497..8f1571c 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -5,6 +5,7 @@ use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::Duration; use http_body_util::Full; use hyper::body::{Bytes, Incoming}; @@ -13,7 +14,8 @@ use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Method, Request, Response, StatusCode}; use tokio::net::TcpListener; -use tokio::sync::{Mutex, RwLock, watch}; +use tokio::sync::{Mutex, RwLock, Semaphore, watch}; +use tokio::time::timeout; use tracing::{debug, info, warn}; use crate::config::{ApiGrayAction, ProxyConfig}; @@ -66,6 +68,9 @@ use runtime_zero::{ }; use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config}; +const API_MAX_CONTROL_CONNECTIONS: usize = 1024; +const API_HTTP_CONNECTION_TIMEOUT: Duration = Duration::from_secs(15); + pub(super) struct ApiRuntimeState { pub(super) process_started_at_epoch_secs: u64, pub(super) config_reload_count: AtomicU64, @@ -167,6 +172,8 @@ pub async fn serve( shared.runtime_events.clone(), ); + let connection_permits = Arc::new(Semaphore::new(API_MAX_CONTROL_CONNECTIONS)); + loop { let (stream, peer) = match listener.accept().await { Ok(v) => v, @@ -176,20 +183,45 @@ pub async fn serve( } }; + let connection_permit = match connection_permits.clone().try_acquire_owned() { + Ok(permit) => permit, + Err(_) => { + debug!( + peer = %peer, + max_connections = API_MAX_CONTROL_CONNECTIONS, + "Dropping API connection: control-plane connection budget exhausted" + ); + continue; + } + }; + let shared_conn = shared.clone(); let config_rx_conn = config_rx.clone(); tokio::spawn(async move { + let _connection_permit = connection_permit; let svc = service_fn(move |req: Request| { let shared_req = shared_conn.clone(); let config_rx_req = config_rx_conn.clone(); async move { handle(req, peer, shared_req, config_rx_req).await } }); - if let Err(error) = http1::Builder::new() - .serve_connection(hyper_util::rt::TokioIo::new(stream), svc) - .await + match timeout( + API_HTTP_CONNECTION_TIMEOUT, + http1::Builder::new().serve_connection(hyper_util::rt::TokioIo::new(stream), svc), + ) + .await { - if !error.is_user() { - debug!(error = %error, "API connection error"); + Ok(Ok(())) => {} + Ok(Err(error)) => { + if !error.is_user() { + debug!(error = %error, "API connection error"); + } + } + Err(_) => { + debug!( + peer = %peer, + timeout_ms = API_HTTP_CONNECTION_TIMEOUT.as_millis() as u64, + "API connection timed out" + ); } } }); diff --git a/src/metrics.rs b/src/metrics.rs index e100d2d..325f876 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -11,6 +11,8 @@ use hyper::service::service_fn; use hyper::{Request, Response, StatusCode}; use ipnetwork::IpNetwork; use tokio::net::TcpListener; +use tokio::sync::Semaphore; +use tokio::time::timeout; use tracing::{debug, info, warn}; use crate::config::ProxyConfig; @@ -27,6 +29,8 @@ use crate::transport::{ListenOptions, create_listener}; const USER_LABELED_METRICS_MAX_USERS: usize = 4096; // Keeps TLS-front per-domain health series bounded for large generated configs. const TLS_FRONT_PROFILE_HEALTH_MAX_DOMAINS: usize = 256; +const METRICS_MAX_CONTROL_CONNECTIONS: usize = 512; +const METRICS_HTTP_CONNECTION_TIMEOUT: Duration = Duration::from_secs(15); pub async fn serve( port: u16, @@ -184,6 +188,8 @@ async fn serve_listener( config_rx: tokio::sync::watch::Receiver>, whitelist: Arc>, ) { + let connection_permits = Arc::new(Semaphore::new(METRICS_MAX_CONTROL_CONNECTIONS)); + loop { let (stream, peer) = match listener.accept().await { Ok(v) => v, @@ -198,6 +204,18 @@ async fn serve_listener( continue; } + let connection_permit = match connection_permits.clone().try_acquire_owned() { + Ok(permit) => permit, + Err(_) => { + debug!( + peer = %peer, + max_connections = METRICS_MAX_CONTROL_CONNECTIONS, + "Dropping metrics connection: control-plane connection budget exhausted" + ); + continue; + } + }; + let stats = stats.clone(); let beobachten = beobachten.clone(); let shared_state = shared_state.clone(); @@ -205,6 +223,7 @@ async fn serve_listener( let tls_cache = tls_cache.clone(); let config_rx_conn = config_rx.clone(); tokio::spawn(async move { + let _connection_permit = connection_permit; let svc = service_fn(move |req| { let stats = stats.clone(); let beobachten = beobachten.clone(); @@ -225,11 +244,23 @@ async fn serve_listener( .await } }); - if let Err(e) = http1::Builder::new() - .serve_connection(hyper_util::rt::TokioIo::new(stream), svc) - .await + match timeout( + METRICS_HTTP_CONNECTION_TIMEOUT, + http1::Builder::new().serve_connection(hyper_util::rt::TokioIo::new(stream), svc), + ) + .await { - debug!(error = %e, "Metrics connection error"); + Ok(Ok(())) => {} + Ok(Err(e)) => { + debug!(error = %e, "Metrics connection error"); + } + Err(_) => { + debug!( + peer = %peer, + timeout_ms = METRICS_HTTP_CONNECTION_TIMEOUT.as_millis() as u64, + "Metrics connection timed out" + ); + } } }); }