mirror of
https://github.com/telemt/telemt.git
synced 2026-05-13 15:21:44 +03:00
Hardened API & Management-plane Admission
- bound API and metrics connection handling - default metrics listener to localhost - reject untrusted PROXY protocol peers before parsing headers - cap API request body size and PROXY v2 payload allocation - validate route usernames and TLS domains consistently
This commit is contained in:
122
src/api/mod.rs
122
src/api/mod.rs
@@ -13,6 +13,7 @@ use hyper::header::AUTHORIZATION;
|
||||
use hyper::server::conn::http1;
|
||||
use hyper::service::service_fn;
|
||||
use hyper::{Method, Request, Response, StatusCode};
|
||||
use subtle::ConstantTimeEq;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::{Mutex, RwLock, Semaphore, watch};
|
||||
use tokio::time::timeout;
|
||||
@@ -46,6 +47,7 @@ use http_utils::{error_response, read_json, read_optional_json, success_response
|
||||
use model::{
|
||||
ApiFailure, ClassCount, CreateUserRequest, DeleteUserResponse, HealthData, HealthReadyData,
|
||||
PatchUserRequest, ResetUserQuotaResponse, RotateSecretRequest, SummaryData, UserActiveIps,
|
||||
is_valid_username,
|
||||
};
|
||||
use runtime_edge::{
|
||||
EdgeConnectionsCacheEntry, build_runtime_connections_summary_data,
|
||||
@@ -70,6 +72,7 @@ use users::{create_user, delete_user, patch_user, rotate_secret, users_from_conf
|
||||
|
||||
const API_MAX_CONTROL_CONNECTIONS: usize = 1024;
|
||||
const API_HTTP_CONNECTION_TIMEOUT: Duration = Duration::from_secs(15);
|
||||
const ROUTE_USERNAME_ERROR: &str = "username must match [A-Za-z0-9_.-] and be 1..64 chars";
|
||||
|
||||
pub(super) struct ApiRuntimeState {
|
||||
pub(super) process_started_at_epoch_secs: u64,
|
||||
@@ -108,6 +111,18 @@ impl ApiShared {
|
||||
}
|
||||
}
|
||||
|
||||
fn auth_header_matches(actual: &str, expected: &str) -> bool {
|
||||
actual.as_bytes().ct_eq(expected.as_bytes()).into()
|
||||
}
|
||||
|
||||
fn parse_route_username(user: &str) -> Result<&str, ApiFailure> {
|
||||
if is_valid_username(user) {
|
||||
Ok(user)
|
||||
} else {
|
||||
Err(ApiFailure::bad_request(ROUTE_USERNAME_ERROR))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn serve(
|
||||
listen: SocketAddr,
|
||||
stats: Arc<Stats>,
|
||||
@@ -277,7 +292,7 @@ async fn handle(
|
||||
.headers()
|
||||
.get(AUTHORIZATION)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|v| v == api_cfg.auth_header)
|
||||
.map(|v| auth_header_matches(v, &api_cfg.auth_header))
|
||||
.unwrap_or(false);
|
||||
if !auth_ok {
|
||||
return Ok(error_response(
|
||||
@@ -533,6 +548,7 @@ async fn handle(
|
||||
&& !user.is_empty()
|
||||
&& !user.contains('/')
|
||||
{
|
||||
let user = parse_route_username(user)?;
|
||||
if api_cfg.read_only {
|
||||
return Ok(error_response(
|
||||
request_id,
|
||||
@@ -576,10 +592,64 @@ async fn handle(
|
||||
revision,
|
||||
));
|
||||
}
|
||||
if method == Method::POST
|
||||
&& let Some(base_user) = normalized_path
|
||||
.strip_prefix("/v1/users/")
|
||||
.and_then(|path| path.strip_suffix("/rotate-secret"))
|
||||
&& !base_user.is_empty()
|
||||
&& !base_user.contains('/')
|
||||
{
|
||||
let base_user = parse_route_username(base_user)?;
|
||||
if api_cfg.read_only {
|
||||
return Ok(error_response(
|
||||
request_id,
|
||||
ApiFailure::new(
|
||||
StatusCode::FORBIDDEN,
|
||||
"read_only",
|
||||
"API runs in read-only mode",
|
||||
),
|
||||
));
|
||||
}
|
||||
let expected_revision = parse_if_match(req.headers());
|
||||
let body =
|
||||
read_optional_json::<RotateSecretRequest>(req.into_body(), body_limit)
|
||||
.await?;
|
||||
let result = rotate_secret(
|
||||
base_user,
|
||||
body.unwrap_or_default(),
|
||||
expected_revision,
|
||||
&shared,
|
||||
)
|
||||
.await;
|
||||
let (mut data, revision) = match result {
|
||||
Ok(ok) => ok,
|
||||
Err(error) => {
|
||||
shared.runtime_events.record(
|
||||
"api.user.rotate_secret.failed",
|
||||
format!("username={} code={}", base_user, error.code),
|
||||
);
|
||||
return Err(error);
|
||||
}
|
||||
};
|
||||
let runtime_cfg = config_rx.borrow().clone();
|
||||
data.user.in_runtime =
|
||||
runtime_cfg.access.users.contains_key(&data.user.username);
|
||||
shared.runtime_events.record(
|
||||
"api.user.rotate_secret.ok",
|
||||
format!("username={}", base_user),
|
||||
);
|
||||
let status = if data.user.in_runtime {
|
||||
StatusCode::OK
|
||||
} else {
|
||||
StatusCode::ACCEPTED
|
||||
};
|
||||
return Ok(success_response(status, data, revision));
|
||||
}
|
||||
if let Some(user) = normalized_path.strip_prefix("/v1/users/")
|
||||
&& !user.is_empty()
|
||||
&& !user.contains('/')
|
||||
{
|
||||
let user = parse_route_username(user)?;
|
||||
if method == Method::GET {
|
||||
let revision = current_revision(&shared.config_path).await?;
|
||||
let disk_cfg = load_config_from_disk(&shared.config_path).await?;
|
||||
@@ -680,56 +750,6 @@ async fn handle(
|
||||
};
|
||||
return Ok(success_response(status, response, revision));
|
||||
}
|
||||
if method == Method::POST
|
||||
&& let Some(base_user) = user.strip_suffix("/rotate-secret")
|
||||
&& !base_user.is_empty()
|
||||
&& !base_user.contains('/')
|
||||
{
|
||||
if api_cfg.read_only {
|
||||
return Ok(error_response(
|
||||
request_id,
|
||||
ApiFailure::new(
|
||||
StatusCode::FORBIDDEN,
|
||||
"read_only",
|
||||
"API runs in read-only mode",
|
||||
),
|
||||
));
|
||||
}
|
||||
let expected_revision = parse_if_match(req.headers());
|
||||
let body =
|
||||
read_optional_json::<RotateSecretRequest>(req.into_body(), body_limit)
|
||||
.await?;
|
||||
let result = rotate_secret(
|
||||
base_user,
|
||||
body.unwrap_or_default(),
|
||||
expected_revision,
|
||||
&shared,
|
||||
)
|
||||
.await;
|
||||
let (mut data, revision) = match result {
|
||||
Ok(ok) => ok,
|
||||
Err(error) => {
|
||||
shared.runtime_events.record(
|
||||
"api.user.rotate_secret.failed",
|
||||
format!("username={} code={}", base_user, error.code),
|
||||
);
|
||||
return Err(error);
|
||||
}
|
||||
};
|
||||
let runtime_cfg = config_rx.borrow().clone();
|
||||
data.user.in_runtime =
|
||||
runtime_cfg.access.users.contains_key(&data.user.username);
|
||||
shared.runtime_events.record(
|
||||
"api.user.rotate_secret.ok",
|
||||
format!("username={}", base_user),
|
||||
);
|
||||
let status = if data.user.in_runtime {
|
||||
StatusCode::OK
|
||||
} else {
|
||||
StatusCode::ACCEPTED
|
||||
};
|
||||
return Ok(success_response(status, data, revision));
|
||||
}
|
||||
if method == Method::POST {
|
||||
return Ok(error_response(
|
||||
request_id,
|
||||
|
||||
@@ -465,12 +465,7 @@ pub(super) async fn users_from_config(
|
||||
.map(|secret| {
|
||||
build_user_links(cfg, secret, startup_detected_ip_v4, startup_detected_ip_v6)
|
||||
})
|
||||
.unwrap_or(UserLinks {
|
||||
classic: Vec::new(),
|
||||
secure: Vec::new(),
|
||||
tls: Vec::new(),
|
||||
tls_domains: Vec::new(),
|
||||
});
|
||||
.unwrap_or_else(empty_user_links);
|
||||
users.push(UserInfo {
|
||||
in_runtime: runtime_cfg
|
||||
.map(|runtime| runtime.access.users.contains_key(&username))
|
||||
@@ -511,6 +506,15 @@ pub(super) async fn users_from_config(
|
||||
users
|
||||
}
|
||||
|
||||
fn empty_user_links() -> UserLinks {
|
||||
UserLinks {
|
||||
classic: Vec::new(),
|
||||
secure: Vec::new(),
|
||||
tls: Vec::new(),
|
||||
tls_domains: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_user_links(
|
||||
cfg: &ProxyConfig,
|
||||
secret: &str,
|
||||
|
||||
@@ -22,6 +22,14 @@ const MAX_ME_ROUTE_CHANNEL_CAPACITY: usize = 8_192;
|
||||
const MAX_ME_C2ME_CHANNEL_CAPACITY: usize = 8_192;
|
||||
const MIN_MAX_CLIENT_FRAME_BYTES: usize = 4 * 1024;
|
||||
const MAX_MAX_CLIENT_FRAME_BYTES: usize = 16 * 1024 * 1024;
|
||||
const MAX_API_REQUEST_BODY_LIMIT_BYTES: usize = 1024 * 1024;
|
||||
|
||||
fn is_valid_tls_domain_name(domain: &str) -> bool {
|
||||
!domain.is_empty()
|
||||
&& !domain
|
||||
.chars()
|
||||
.any(|ch| ch.is_whitespace() || matches!(ch, '/' | '\\'))
|
||||
}
|
||||
|
||||
const TOP_LEVEL_CONFIG_KEYS: &[&str] = &[
|
||||
"general",
|
||||
@@ -1773,9 +1781,11 @@ impl ProxyConfig {
|
||||
));
|
||||
}
|
||||
|
||||
if config.server.api.request_body_limit_bytes == 0 {
|
||||
if !(1..=MAX_API_REQUEST_BODY_LIMIT_BYTES)
|
||||
.contains(&config.server.api.request_body_limit_bytes)
|
||||
{
|
||||
return Err(ProxyError::Config(
|
||||
"server.api.request_body_limit_bytes must be > 0".to_string(),
|
||||
"server.api.request_body_limit_bytes must be within [1, 1048576]".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
@@ -2103,13 +2113,22 @@ impl ProxyConfig {
|
||||
return Err(ProxyError::Config("No modes enabled".to_string()));
|
||||
}
|
||||
|
||||
if self.censorship.tls_domain.contains(' ') || self.censorship.tls_domain.contains('/') {
|
||||
if !is_valid_tls_domain_name(&self.censorship.tls_domain) {
|
||||
return Err(ProxyError::Config(format!(
|
||||
"Invalid tls_domain: '{}'. Must be a valid domain name",
|
||||
self.censorship.tls_domain
|
||||
)));
|
||||
}
|
||||
|
||||
for domain in &self.censorship.tls_domains {
|
||||
if !is_valid_tls_domain_name(domain) {
|
||||
return Err(ProxyError::Config(format!(
|
||||
"Invalid tls_domains entry: '{}'. Must be a valid domain name",
|
||||
domain
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
for (user, tag) in &self.access.user_ad_tags {
|
||||
let zeros = "00000000000000000000000000000000";
|
||||
if !is_valid_ad_tag(tag) {
|
||||
|
||||
@@ -78,11 +78,11 @@ pub async fn serve(
|
||||
return;
|
||||
}
|
||||
|
||||
// Fallback: bind on 0.0.0.0 and [::] using metrics_port.
|
||||
// Fallback: keep metrics local unless an explicit metrics_listen is configured.
|
||||
let mut listener_v4 = None;
|
||||
let mut listener_v6 = None;
|
||||
|
||||
let addr_v4 = SocketAddr::from(([0, 0, 0, 0], port));
|
||||
let addr_v4 = SocketAddr::from(([127, 0, 0, 1], port));
|
||||
match bind_metrics_listener(addr_v4, false, listen_backlog) {
|
||||
Ok(listener) => {
|
||||
info!(
|
||||
@@ -96,11 +96,11 @@ pub async fn serve(
|
||||
}
|
||||
}
|
||||
|
||||
let addr_v6 = SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 0], port));
|
||||
let addr_v6 = SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 1], port));
|
||||
match bind_metrics_listener(addr_v6, true, listen_backlog) {
|
||||
Ok(listener) => {
|
||||
info!(
|
||||
"Metrics endpoint: http://[::]:{}/metrics and /beobachten",
|
||||
"Metrics endpoint: http://[::1]:{}/metrics and /beobachten",
|
||||
port
|
||||
);
|
||||
listener_v6 = Some(listener);
|
||||
|
||||
@@ -466,17 +466,7 @@ where
|
||||
let mut local_addr = synthetic_local_addr(config.server.port);
|
||||
|
||||
if proxy_protocol_enabled {
|
||||
let proxy_header_timeout =
|
||||
Duration::from_millis(config.server.proxy_protocol_header_timeout_ms.max(1));
|
||||
match timeout(
|
||||
proxy_header_timeout,
|
||||
parse_proxy_protocol(&mut stream, peer),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(info)) => {
|
||||
if !is_trusted_proxy_source(peer.ip(), &config.server.proxy_protocol_trusted_cidrs)
|
||||
{
|
||||
if !is_trusted_proxy_source(peer.ip(), &config.server.proxy_protocol_trusted_cidrs) {
|
||||
stats.increment_connects_bad_with_class("proxy_protocol_untrusted");
|
||||
warn!(
|
||||
peer = %peer,
|
||||
@@ -486,6 +476,16 @@ where
|
||||
record_beobachten_class(&beobachten, &config, peer.ip(), "other");
|
||||
return Err(ProxyError::InvalidProxyProtocol);
|
||||
}
|
||||
|
||||
let proxy_header_timeout =
|
||||
Duration::from_millis(config.server.proxy_protocol_header_timeout_ms.max(1));
|
||||
match timeout(
|
||||
proxy_header_timeout,
|
||||
parse_proxy_protocol(&mut stream, peer),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(info)) => {
|
||||
debug!(
|
||||
peer = %peer,
|
||||
client = %info.src_addr,
|
||||
@@ -978,15 +978,6 @@ impl RunningClientHandler {
|
||||
let mut local_addr = self.stream.local_addr().map_err(ProxyError::Io)?;
|
||||
|
||||
if self.proxy_protocol_enabled {
|
||||
let proxy_header_timeout =
|
||||
Duration::from_millis(self.config.server.proxy_protocol_header_timeout_ms.max(1));
|
||||
match timeout(
|
||||
proxy_header_timeout,
|
||||
parse_proxy_protocol(&mut self.stream, self.peer),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(info)) => {
|
||||
if !is_trusted_proxy_source(
|
||||
self.peer.ip(),
|
||||
&self.config.server.proxy_protocol_trusted_cidrs,
|
||||
@@ -1006,6 +997,16 @@ impl RunningClientHandler {
|
||||
);
|
||||
return Err(ProxyError::InvalidProxyProtocol);
|
||||
}
|
||||
|
||||
let proxy_header_timeout =
|
||||
Duration::from_millis(self.config.server.proxy_protocol_header_timeout_ms.max(1));
|
||||
match timeout(
|
||||
proxy_header_timeout,
|
||||
parse_proxy_protocol(&mut self.stream, self.peer),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(info)) => {
|
||||
debug!(
|
||||
peer = %self.peer,
|
||||
client = %info.src_addr,
|
||||
|
||||
@@ -18,6 +18,9 @@ const PROXY_V1_MIN_LEN: usize = 6;
|
||||
/// Minimum length for v2 header
|
||||
const PROXY_V2_MIN_LEN: usize = 16;
|
||||
|
||||
/// Maximum accepted PROXY v2 address and TLV payload.
|
||||
const PROXY_V2_MAX_ADDR_LEN: usize = 216;
|
||||
|
||||
/// Address families for v2
|
||||
mod address_family {
|
||||
pub const UNSPEC: u8 = 0x0;
|
||||
@@ -169,6 +172,9 @@ async fn parse_v2<R: AsyncRead + Unpin>(
|
||||
|
||||
let family_protocol = header[13];
|
||||
let addr_len = u16::from_be_bytes([header[14], header[15]]) as usize;
|
||||
if addr_len > PROXY_V2_MAX_ADDR_LEN {
|
||||
return Err(ProxyError::InvalidProxyProtocol);
|
||||
}
|
||||
|
||||
// Read address data
|
||||
let mut addr_data = vec![0u8; addr_len];
|
||||
|
||||
Reference in New Issue
Block a user