Bound HTTP API+Metrics Connection Admission

This commit is contained in:
Alexey
2026-05-09 16:29:30 +03:00
parent dd3c5eff1c
commit b3510aa8b8
2 changed files with 73 additions and 10 deletions

View File

@@ -5,6 +5,7 @@ use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use http_body_util::Full; use http_body_util::Full;
use hyper::body::{Bytes, Incoming}; use hyper::body::{Bytes, Incoming};
@@ -13,7 +14,8 @@ use hyper::server::conn::http1;
use hyper::service::service_fn; use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode}; use hyper::{Method, Request, Response, StatusCode};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::{Mutex, RwLock, watch}; use tokio::sync::{Mutex, RwLock, Semaphore, watch};
use tokio::time::timeout;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use crate::config::{ApiGrayAction, ProxyConfig}; use crate::config::{ApiGrayAction, ProxyConfig};
@@ -66,6 +68,9 @@ use runtime_zero::{
}; };
use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config}; use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config};
const API_MAX_CONTROL_CONNECTIONS: usize = 1024;
const API_HTTP_CONNECTION_TIMEOUT: Duration = Duration::from_secs(15);
pub(super) struct ApiRuntimeState { pub(super) struct ApiRuntimeState {
pub(super) process_started_at_epoch_secs: u64, pub(super) process_started_at_epoch_secs: u64,
pub(super) config_reload_count: AtomicU64, pub(super) config_reload_count: AtomicU64,
@@ -167,6 +172,8 @@ pub async fn serve(
shared.runtime_events.clone(), shared.runtime_events.clone(),
); );
let connection_permits = Arc::new(Semaphore::new(API_MAX_CONTROL_CONNECTIONS));
loop { loop {
let (stream, peer) = match listener.accept().await { let (stream, peer) = match listener.accept().await {
Ok(v) => v, Ok(v) => v,
@@ -176,22 +183,47 @@ pub async fn serve(
} }
}; };
let connection_permit = match connection_permits.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
debug!(
peer = %peer,
max_connections = API_MAX_CONTROL_CONNECTIONS,
"Dropping API connection: control-plane connection budget exhausted"
);
continue;
}
};
let shared_conn = shared.clone(); let shared_conn = shared.clone();
let config_rx_conn = config_rx.clone(); let config_rx_conn = config_rx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _connection_permit = connection_permit;
let svc = service_fn(move |req: Request<Incoming>| { let svc = service_fn(move |req: Request<Incoming>| {
let shared_req = shared_conn.clone(); let shared_req = shared_conn.clone();
let config_rx_req = config_rx_conn.clone(); let config_rx_req = config_rx_conn.clone();
async move { handle(req, peer, shared_req, config_rx_req).await } async move { handle(req, peer, shared_req, config_rx_req).await }
}); });
if let Err(error) = http1::Builder::new() match timeout(
.serve_connection(hyper_util::rt::TokioIo::new(stream), svc) API_HTTP_CONNECTION_TIMEOUT,
http1::Builder::new().serve_connection(hyper_util::rt::TokioIo::new(stream), svc),
)
.await .await
{ {
Ok(Ok(())) => {}
Ok(Err(error)) => {
if !error.is_user() { if !error.is_user() {
debug!(error = %error, "API connection error"); debug!(error = %error, "API connection error");
} }
} }
Err(_) => {
debug!(
peer = %peer,
timeout_ms = API_HTTP_CONNECTION_TIMEOUT.as_millis() as u64,
"API connection timed out"
);
}
}
}); });
} }
} }

View File

@@ -11,6 +11,8 @@ use hyper::service::service_fn;
use hyper::{Request, Response, StatusCode}; use hyper::{Request, Response, StatusCode};
use ipnetwork::IpNetwork; use ipnetwork::IpNetwork;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tokio::time::timeout;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
@@ -27,6 +29,8 @@ use crate::transport::{ListenOptions, create_listener};
const USER_LABELED_METRICS_MAX_USERS: usize = 4096; const USER_LABELED_METRICS_MAX_USERS: usize = 4096;
// Keeps TLS-front per-domain health series bounded for large generated configs. // Keeps TLS-front per-domain health series bounded for large generated configs.
const TLS_FRONT_PROFILE_HEALTH_MAX_DOMAINS: usize = 256; const TLS_FRONT_PROFILE_HEALTH_MAX_DOMAINS: usize = 256;
const METRICS_MAX_CONTROL_CONNECTIONS: usize = 512;
const METRICS_HTTP_CONNECTION_TIMEOUT: Duration = Duration::from_secs(15);
pub async fn serve( pub async fn serve(
port: u16, port: u16,
@@ -184,6 +188,8 @@ async fn serve_listener(
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>, config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
whitelist: Arc<Vec<IpNetwork>>, whitelist: Arc<Vec<IpNetwork>>,
) { ) {
let connection_permits = Arc::new(Semaphore::new(METRICS_MAX_CONTROL_CONNECTIONS));
loop { loop {
let (stream, peer) = match listener.accept().await { let (stream, peer) = match listener.accept().await {
Ok(v) => v, Ok(v) => v,
@@ -198,6 +204,18 @@ async fn serve_listener(
continue; continue;
} }
let connection_permit = match connection_permits.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
debug!(
peer = %peer,
max_connections = METRICS_MAX_CONTROL_CONNECTIONS,
"Dropping metrics connection: control-plane connection budget exhausted"
);
continue;
}
};
let stats = stats.clone(); let stats = stats.clone();
let beobachten = beobachten.clone(); let beobachten = beobachten.clone();
let shared_state = shared_state.clone(); let shared_state = shared_state.clone();
@@ -205,6 +223,7 @@ async fn serve_listener(
let tls_cache = tls_cache.clone(); let tls_cache = tls_cache.clone();
let config_rx_conn = config_rx.clone(); let config_rx_conn = config_rx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _connection_permit = connection_permit;
let svc = service_fn(move |req| { let svc = service_fn(move |req| {
let stats = stats.clone(); let stats = stats.clone();
let beobachten = beobachten.clone(); let beobachten = beobachten.clone();
@@ -225,12 +244,24 @@ async fn serve_listener(
.await .await
} }
}); });
if let Err(e) = http1::Builder::new() match timeout(
.serve_connection(hyper_util::rt::TokioIo::new(stream), svc) METRICS_HTTP_CONNECTION_TIMEOUT,
http1::Builder::new().serve_connection(hyper_util::rt::TokioIo::new(stream), svc),
)
.await .await
{ {
Ok(Ok(())) => {}
Ok(Err(e)) => {
debug!(error = %e, "Metrics connection error"); debug!(error = %e, "Metrics connection error");
} }
Err(_) => {
debug!(
peer = %peer,
timeout_ms = METRICS_HTTP_CONNECTION_TIMEOUT.as_millis() as u64,
"Metrics connection timed out"
);
}
}
}); });
} }
} }