Compare commits

..

17 Commits

Author SHA1 Message Date
Alexey 3bd5637e47 ME + Admission + Cleanup Correctness: merge pull request #779 from telemt/flow
ME + Admission + Cleanup Correctness
2026-05-10 14:23:09 +03:00
Alexey 57b2aa0453 Rustfmt 2026-05-10 14:14:52 +03:00
Alexey 10c7cb2e0c Middle Relay Cancellation Errors 2026-05-10 14:12:15 +03:00
Alexey 900b574fb8 Harden ME Writer Cancellation paths 2026-05-10 14:09:10 +03:00
Alexey beed6b4679 Middle Wait Deadlines + Tighten Session Release State 2026-05-10 13:58:02 +03:00
Alexey eef2a38c75 Type Route Cutovers + Reduce IP Tracker cleanup pressure 2026-05-10 13:55:01 +03:00
Alexey 6cb72b3b6c Explicit Reasons of Session Fallback Cleanup + ME Close 2026-05-10 13:50:36 +03:00
Alexey 090b2ca636 Stats and Cleanup-proccess beyond Hot-path 2026-05-10 13:43:41 +03:00
Alexey e10c070dc1 Observability + Cancellation for Middle Quota + Traffic Waits 2026-05-10 13:38:11 +03:00
Alexey 3f9ac87daf Bounded Rate Bursts + Cancel ME Waits 2026-05-10 13:33:54 +03:00
Alexey 36de807096 Merge branch 'flow' of https://github.com/telemt/telemt into flow 2026-05-10 13:31:11 +03:00
Alexey 844a912b38 Expose Quota Contention + Cleanup fallback metrics 2026-05-10 13:30:59 +03:00
Alexey 5c5a3fae06 Update AGENTS.md 2026-05-10 13:29:02 +03:00
Alexey ba1d9be5d4 Hardened Relays and API Security paths 2026-05-10 13:22:54 +03:00
Alexey b2aa9b8c9e Hardened API & Management-plane Admission
- bound API and metrics connection handling
- default metrics listener to localhost
- reject untrusted PROXY protocol peers before parsing headers
- cap API request body size and PROXY v2 payload allocation
- validate route usernames and TLS domains consistently
2026-05-09 20:50:23 +03:00
Alexey 73c82bda7a Update AGENTS.md 2026-05-09 16:34:54 +03:00
Alexey b3510aa8b8 Bound HTTP API+Metrics Connection Admission 2026-05-09 16:29:30 +03:00
26 changed files with 1332 additions and 374 deletions
+8 -5
View File
@@ -191,6 +191,11 @@ When facing a non-trivial modification, follow this sequence:
4. **Implement**: Make the minimal, isolated change.
5. **Verify**: Explain why the change preserves existing behavior and architectural integrity.
When the repository contains a `PLAN.md` for the current task, maintain it as
a working checkbox plan while implementing changes. Mark completed and partial
items in `PLAN.md` as the code changes land, so the remaining work stays
explicit and future passes do not waste time rediscovering status.
---
### 9. Context Awareness
@@ -222,10 +227,9 @@ Your response MUST consist of two sections:
**Section 2: `## Changes`**
- For each modified or created file: the filename on a separate line in backticks, followed by the code block.
- For files **under 200 lines**: return the full file with all changes applied.
- For files **over 200 lines**: return only the changed functions/blocks with at least 3 lines of surrounding context above and below. If the user requests the full file, provide it.
- New files: full file content.
- For each modified or created file: the filename on a separate line in backticks, followed by a concise description of what changed.
- Do not include full file contents or long code blocks in `## Changes` unless the user explicitly asks for code text.
- If code snippets are necessary, include only the minimal relevant excerpt.
- End with a suggested git commit message in English.
#### Reporting Out-of-Scope Issues
@@ -429,4 +433,3 @@ Every patch must be **atomic and production-safe**.
* **No transitional states** — no placeholders, incomplete refactors, or temporary inconsistencies.
**Invariant:** After any single patch, the repository remains fully functional and buildable.
Generated
+3 -3
View File
@@ -2404,9 +2404,9 @@ checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f"
[[package]]
name = "rustls-webpki"
version = "0.103.12"
version = "0.103.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06"
checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e"
dependencies = [
"aws-lc-rs",
"ring",
@@ -2791,7 +2791,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "telemt"
version = "3.4.10"
version = "3.4.11"
dependencies = [
"aes",
"anyhow",
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.4.10"
version = "3.4.11"
edition = "2024"
[features]
+109 -57
View File
@@ -5,6 +5,7 @@ use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use http_body_util::Full;
use hyper::body::{Bytes, Incoming};
@@ -12,8 +13,10 @@ use hyper::header::AUTHORIZATION;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use subtle::ConstantTimeEq;
use tokio::net::TcpListener;
use tokio::sync::{Mutex, RwLock, watch};
use tokio::sync::{Mutex, RwLock, Semaphore, watch};
use tokio::time::timeout;
use tracing::{debug, info, warn};
use crate::config::{ApiGrayAction, ProxyConfig};
@@ -44,6 +47,7 @@ use http_utils::{error_response, read_json, read_optional_json, success_response
use model::{
ApiFailure, ClassCount, CreateUserRequest, DeleteUserResponse, HealthData, HealthReadyData,
PatchUserRequest, ResetUserQuotaResponse, RotateSecretRequest, SummaryData, UserActiveIps,
is_valid_username,
};
use runtime_edge::{
EdgeConnectionsCacheEntry, build_runtime_connections_summary_data,
@@ -66,6 +70,10 @@ use runtime_zero::{
};
use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config};
const API_MAX_CONTROL_CONNECTIONS: usize = 1024;
const API_HTTP_CONNECTION_TIMEOUT: Duration = Duration::from_secs(15);
const ROUTE_USERNAME_ERROR: &str = "username must match [A-Za-z0-9_.-] and be 1..64 chars";
pub(super) struct ApiRuntimeState {
pub(super) process_started_at_epoch_secs: u64,
pub(super) config_reload_count: AtomicU64,
@@ -103,6 +111,18 @@ impl ApiShared {
}
}
fn auth_header_matches(actual: &str, expected: &str) -> bool {
actual.as_bytes().ct_eq(expected.as_bytes()).into()
}
fn parse_route_username(user: &str) -> Result<&str, ApiFailure> {
if is_valid_username(user) {
Ok(user)
} else {
Err(ApiFailure::bad_request(ROUTE_USERNAME_ERROR))
}
}
pub async fn serve(
listen: SocketAddr,
stats: Arc<Stats>,
@@ -167,6 +187,8 @@ pub async fn serve(
shared.runtime_events.clone(),
);
let connection_permits = Arc::new(Semaphore::new(API_MAX_CONTROL_CONNECTIONS));
loop {
let (stream, peer) = match listener.accept().await {
Ok(v) => v,
@@ -176,20 +198,45 @@ pub async fn serve(
}
};
let connection_permit = match connection_permits.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
debug!(
peer = %peer,
max_connections = API_MAX_CONTROL_CONNECTIONS,
"Dropping API connection: control-plane connection budget exhausted"
);
continue;
}
};
let shared_conn = shared.clone();
let config_rx_conn = config_rx.clone();
tokio::spawn(async move {
let _connection_permit = connection_permit;
let svc = service_fn(move |req: Request<Incoming>| {
let shared_req = shared_conn.clone();
let config_rx_req = config_rx_conn.clone();
async move { handle(req, peer, shared_req, config_rx_req).await }
});
if let Err(error) = http1::Builder::new()
.serve_connection(hyper_util::rt::TokioIo::new(stream), svc)
.await
match timeout(
API_HTTP_CONNECTION_TIMEOUT,
http1::Builder::new().serve_connection(hyper_util::rt::TokioIo::new(stream), svc),
)
.await
{
if !error.is_user() {
debug!(error = %error, "API connection error");
Ok(Ok(())) => {}
Ok(Err(error)) => {
if !error.is_user() {
debug!(error = %error, "API connection error");
}
}
Err(_) => {
debug!(
peer = %peer,
timeout_ms = API_HTTP_CONNECTION_TIMEOUT.as_millis() as u64,
"API connection timed out"
);
}
}
});
@@ -245,7 +292,7 @@ async fn handle(
.headers()
.get(AUTHORIZATION)
.and_then(|v| v.to_str().ok())
.map(|v| v == api_cfg.auth_header)
.map(|v| auth_header_matches(v, &api_cfg.auth_header))
.unwrap_or(false);
if !auth_ok {
return Ok(error_response(
@@ -501,6 +548,7 @@ async fn handle(
&& !user.is_empty()
&& !user.contains('/')
{
let user = parse_route_username(user)?;
if api_cfg.read_only {
return Ok(error_response(
request_id,
@@ -544,10 +592,64 @@ async fn handle(
revision,
));
}
if method == Method::POST
&& let Some(base_user) = normalized_path
.strip_prefix("/v1/users/")
.and_then(|path| path.strip_suffix("/rotate-secret"))
&& !base_user.is_empty()
&& !base_user.contains('/')
{
let base_user = parse_route_username(base_user)?;
if api_cfg.read_only {
return Ok(error_response(
request_id,
ApiFailure::new(
StatusCode::FORBIDDEN,
"read_only",
"API runs in read-only mode",
),
));
}
let expected_revision = parse_if_match(req.headers());
let body =
read_optional_json::<RotateSecretRequest>(req.into_body(), body_limit)
.await?;
let result = rotate_secret(
base_user,
body.unwrap_or_default(),
expected_revision,
&shared,
)
.await;
let (mut data, revision) = match result {
Ok(ok) => ok,
Err(error) => {
shared.runtime_events.record(
"api.user.rotate_secret.failed",
format!("username={} code={}", base_user, error.code),
);
return Err(error);
}
};
let runtime_cfg = config_rx.borrow().clone();
data.user.in_runtime =
runtime_cfg.access.users.contains_key(&data.user.username);
shared.runtime_events.record(
"api.user.rotate_secret.ok",
format!("username={}", base_user),
);
let status = if data.user.in_runtime {
StatusCode::OK
} else {
StatusCode::ACCEPTED
};
return Ok(success_response(status, data, revision));
}
if let Some(user) = normalized_path.strip_prefix("/v1/users/")
&& !user.is_empty()
&& !user.contains('/')
{
let user = parse_route_username(user)?;
if method == Method::GET {
let revision = current_revision(&shared.config_path).await?;
let disk_cfg = load_config_from_disk(&shared.config_path).await?;
@@ -648,56 +750,6 @@ async fn handle(
};
return Ok(success_response(status, response, revision));
}
if method == Method::POST
&& let Some(base_user) = user.strip_suffix("/rotate-secret")
&& !base_user.is_empty()
&& !base_user.contains('/')
{
if api_cfg.read_only {
return Ok(error_response(
request_id,
ApiFailure::new(
StatusCode::FORBIDDEN,
"read_only",
"API runs in read-only mode",
),
));
}
let expected_revision = parse_if_match(req.headers());
let body =
read_optional_json::<RotateSecretRequest>(req.into_body(), body_limit)
.await?;
let result = rotate_secret(
base_user,
body.unwrap_or_default(),
expected_revision,
&shared,
)
.await;
let (mut data, revision) = match result {
Ok(ok) => ok,
Err(error) => {
shared.runtime_events.record(
"api.user.rotate_secret.failed",
format!("username={} code={}", base_user, error.code),
);
return Err(error);
}
};
let runtime_cfg = config_rx.borrow().clone();
data.user.in_runtime =
runtime_cfg.access.users.contains_key(&data.user.username);
shared.runtime_events.record(
"api.user.rotate_secret.ok",
format!("username={}", base_user),
);
let status = if data.user.in_runtime {
StatusCode::OK
} else {
StatusCode::ACCEPTED
};
return Ok(success_response(status, data, revision));
}
if method == Method::POST {
return Ok(error_response(
request_id,
+10 -6
View File
@@ -465,12 +465,7 @@ pub(super) async fn users_from_config(
.map(|secret| {
build_user_links(cfg, secret, startup_detected_ip_v4, startup_detected_ip_v6)
})
.unwrap_or(UserLinks {
classic: Vec::new(),
secure: Vec::new(),
tls: Vec::new(),
tls_domains: Vec::new(),
});
.unwrap_or_else(empty_user_links);
users.push(UserInfo {
in_runtime: runtime_cfg
.map(|runtime| runtime.access.users.contains_key(&username))
@@ -511,6 +506,15 @@ pub(super) async fn users_from_config(
users
}
fn empty_user_links() -> UserLinks {
UserLinks {
classic: Vec::new(),
secure: Vec::new(),
tls: Vec::new(),
tls_domains: Vec::new(),
}
}
fn build_user_links(
cfg: &ProxyConfig,
secret: &str,
+23 -7
View File
@@ -22,6 +22,14 @@ const MAX_ME_ROUTE_CHANNEL_CAPACITY: usize = 8_192;
const MAX_ME_C2ME_CHANNEL_CAPACITY: usize = 8_192;
const MIN_MAX_CLIENT_FRAME_BYTES: usize = 4 * 1024;
const MAX_MAX_CLIENT_FRAME_BYTES: usize = 16 * 1024 * 1024;
const MAX_API_REQUEST_BODY_LIMIT_BYTES: usize = 1024 * 1024;
fn is_valid_tls_domain_name(domain: &str) -> bool {
!domain.is_empty()
&& !domain
.chars()
.any(|ch| ch.is_whitespace() || matches!(ch, '/' | '\\'))
}
const TOP_LEVEL_CONFIG_KEYS: &[&str] = &[
"general",
@@ -613,10 +621,7 @@ fn collect_unknown_config_keys(parsed_toml: &toml::Value) -> Vec<UnknownConfigKe
}
}
if let Some(upstreams) = parsed_toml
.get("upstreams")
.and_then(toml::Value::as_array)
{
if let Some(upstreams) = parsed_toml.get("upstreams").and_then(toml::Value::as_array) {
for (idx, upstream) in upstreams.iter().enumerate() {
check_nested_table_value(
&mut unknown,
@@ -1773,9 +1778,11 @@ impl ProxyConfig {
));
}
if config.server.api.request_body_limit_bytes == 0 {
if !(1..=MAX_API_REQUEST_BODY_LIMIT_BYTES)
.contains(&config.server.api.request_body_limit_bytes)
{
return Err(ProxyError::Config(
"server.api.request_body_limit_bytes must be > 0".to_string(),
"server.api.request_body_limit_bytes must be within [1, 1048576]".to_string(),
));
}
@@ -2103,13 +2110,22 @@ impl ProxyConfig {
return Err(ProxyError::Config("No modes enabled".to_string()));
}
if self.censorship.tls_domain.contains(' ') || self.censorship.tls_domain.contains('/') {
if !is_valid_tls_domain_name(&self.censorship.tls_domain) {
return Err(ProxyError::Config(format!(
"Invalid tls_domain: '{}'. Must be a valid domain name",
self.censorship.tls_domain
)));
}
for domain in &self.censorship.tls_domains {
if !is_valid_tls_domain_name(domain) {
return Err(ProxyError::Config(format!(
"Invalid tls_domains entry: '{}'. Must be a valid domain name",
domain
)));
}
}
for (user, tag) in &self.access.user_ad_tags {
let zeros = "00000000000000000000000000000000";
if !is_valid_ad_tag(tag) {
+12 -2
View File
@@ -543,10 +543,17 @@ pub struct GeneralConfig {
pub me_d2c_frame_buf_shrink_threshold_bytes: usize,
/// Copy buffer size for client->DC direction in direct relay.
///
/// This is also the upper bound for one amortized upload rate-limit burst:
/// upload debt is settled before the next relay read instead of blocking
/// inside the completed read path.
#[serde(default = "default_direct_relay_copy_buf_c2s_bytes")]
pub direct_relay_copy_buf_c2s_bytes: usize,
/// Copy buffer size for DC->client direction in direct relay.
///
/// This bounds one direct download rate-limit grant because writes are
/// clipped to the currently available shaper budget.
#[serde(default = "default_direct_relay_copy_buf_s2c_bytes")]
pub direct_relay_copy_buf_s2c_bytes: usize,
@@ -1891,14 +1898,17 @@ pub struct AccessConfig {
///
/// Each entry supports independent upload (`up_bps`) and download
/// (`down_bps`) ceilings. A value of `0` in one direction means
/// "unlimited" for that direction.
/// "unlimited" for that direction. Limits are amortized: a relay quantum
/// may pass as a bounded burst, and the limiter applies the resulting wait
/// before later traffic in the same direction proceeds.
#[serde(default)]
pub user_rate_limits: HashMap<String, RateLimitBps>,
/// Per-CIDR aggregate transport rate limits in bits-per-second.
///
/// Matching uses longest-prefix-wins semantics. A value of `0` in one
/// direction means "unlimited" for that direction.
/// direction means "unlimited" for that direction. Limits are amortized
/// with the same bounded-burst contract as per-user rate limits.
#[serde(default)]
pub cidr_rate_limits: HashMap<IpNetwork, RateLimitBps>,
+15
View File
@@ -222,6 +222,21 @@ pub enum ProxyError {
#[error("Proxy error: {0}")]
Proxy(String),
#[error("ME connection lost")]
MiddleConnectionLost,
#[error("Session terminated")]
RouteSwitched,
#[error("Traffic budget wait cancelled")]
TrafficBudgetWaitCancelled,
#[error("Traffic budget wait deadline exceeded")]
TrafficBudgetWaitDeadlineExceeded,
#[error("ME client writer cancelled")]
MiddleClientWriterCancelled,
// ============= Config Errors =============
#[error("Config error: {0}")]
Config(String),
+23 -5
View File
@@ -32,6 +32,7 @@ pub struct UserIpTracker {
limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>,
limit_window: Arc<RwLock<Duration>>,
last_compact_epoch_secs: Arc<AtomicU64>,
cleanup_queue_len: Arc<AtomicU64>,
cleanup_queue: Arc<Mutex<HashMap<(String, IpAddr), usize>>>,
cleanup_drain_lock: Arc<AsyncMutex<()>>,
}
@@ -72,6 +73,7 @@ impl UserIpTracker {
limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)),
limit_window: Arc::new(RwLock::new(Duration::from_secs(30))),
last_compact_epoch_secs: Arc::new(AtomicU64::new(0)),
cleanup_queue_len: Arc::new(AtomicU64::new(0)),
cleanup_queue: Arc::new(Mutex::new(HashMap::new())),
cleanup_drain_lock: Arc::new(AsyncMutex::new(())),
}
@@ -120,6 +122,9 @@ impl UserIpTracker {
match self.cleanup_queue.lock() {
Ok(mut queue) => {
let count = queue.entry((user, ip)).or_insert(0);
if *count == 0 {
self.cleanup_queue_len.fetch_add(1, Ordering::Relaxed);
}
*count = count.saturating_add(1);
self.cleanup_deferred_releases
.fetch_add(1, Ordering::Relaxed);
@@ -127,6 +132,9 @@ impl UserIpTracker {
Err(poisoned) => {
let mut queue = poisoned.into_inner();
let count = queue.entry((user.clone(), ip)).or_insert(0);
if *count == 0 {
self.cleanup_queue_len.fetch_add(1, Ordering::Relaxed);
}
*count = count.saturating_add(1);
self.cleanup_deferred_releases
.fetch_add(1, Ordering::Relaxed);
@@ -156,6 +164,9 @@ impl UserIpTracker {
}
pub(crate) async fn drain_cleanup_queue(&self) {
if self.cleanup_queue_len.load(Ordering::Relaxed) == 0 {
return;
}
let Ok(_drain_guard) = self.cleanup_drain_lock.try_lock() else {
return;
};
@@ -173,6 +184,7 @@ impl UserIpTracker {
break;
};
if let Some(count) = queue.remove(&key) {
self.cleanup_queue_len.fetch_sub(1, Ordering::Relaxed);
drained.insert(key, count);
}
}
@@ -191,6 +203,7 @@ impl UserIpTracker {
break;
};
if let Some(count) = queue.remove(&key) {
self.cleanup_queue_len.fetch_sub(1, Ordering::Relaxed);
drained.insert(key, count);
}
}
@@ -294,12 +307,17 @@ impl UserIpTracker {
}
}
pub async fn run_periodic_maintenance(self: Arc<Self>) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
self.drain_cleanup_queue().await;
self.maybe_compact_empty_users().await;
}
}
pub async fn memory_stats(&self) -> UserIpTrackerMemoryStats {
let cleanup_queue_len = self
.cleanup_queue
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.len();
let cleanup_queue_len = self.cleanup_queue_len.load(Ordering::Relaxed) as usize;
let active_ips = self.active_ips.read().await;
let recent_ips = self.recent_ips.read().await;
let active_entries = active_ips.values().map(HashMap::len).sum();
+5 -9
View File
@@ -13,7 +13,7 @@ use crate::config::{ProxyConfig, RstOnCloseMode};
use crate::crypto::SecureRandom;
use crate::ip_tracker::UserIpTracker;
use crate::proxy::ClientHandler;
use crate::proxy::route_mode::{ROUTE_SWITCH_ERROR_MSG, RouteRuntimeController};
use crate::proxy::route_mode::RouteRuntimeController;
use crate::proxy::shared_state::ProxySharedState;
use crate::startup::{COMPONENT_LISTENERS_BIND, StartupTracker};
use crate::stats::beobachten::BeobachtenStore;
@@ -492,14 +492,10 @@ pub(crate) fn spawn_tcp_accept_loops(
let handshake_close_reason =
expected_handshake_close_description(&e);
let me_closed = matches!(
&e,
crate::error::ProxyError::Proxy(msg) if msg == "ME connection lost"
);
let route_switched = matches!(
&e,
crate::error::ProxyError::Proxy(msg) if msg == ROUTE_SWITCH_ERROR_MSG
);
let me_closed =
matches!(&e, crate::error::ProxyError::MiddleConnectionLost);
let route_switched =
matches!(&e, crate::error::ProxyError::RouteSwitched);
match (peer_close_reason, me_closed) {
(Some(reason), _) => {
+12
View File
@@ -73,6 +73,18 @@ pub(crate) async fn spawn_runtime_tasks(
rc_clone.run_periodic_cleanup().await;
});
let stats_maintenance = stats.clone();
tokio::spawn(async move {
stats_maintenance
.run_periodic_user_stats_maintenance()
.await;
});
let ip_tracker_maintenance = ip_tracker.clone();
tokio::spawn(async move {
ip_tracker_maintenance.run_periodic_maintenance().await;
});
let detected_ip_v4: Option<IpAddr> = probe.detected_ipv4.map(IpAddr::V4);
let detected_ip_v6: Option<IpAddr> = probe.detected_ipv6.map(IpAddr::V6);
debug!(
+9 -2
View File
@@ -8,8 +8,8 @@
//!
//! SIGHUP is handled separately in config/hot_reload.rs for config reload.
use std::sync::Arc;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
#[cfg(not(unix))]
@@ -52,7 +52,14 @@ pub(crate) async fn wait_for_shutdown(
quota_state_path: PathBuf,
) {
let signal = wait_for_shutdown_signal().await;
perform_shutdown(signal, process_started_at, me_pool, &stats, quota_state_path).await;
perform_shutdown(
signal,
process_started_at,
me_pool,
&stats,
quota_state_path,
)
.await;
}
/// Waits for any shutdown signal (SIGINT, SIGTERM, SIGQUIT).
+216 -33
View File
@@ -11,6 +11,8 @@ use hyper::service::service_fn;
use hyper::{Request, Response, StatusCode};
use ipnetwork::IpNetwork;
use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tokio::time::timeout;
use tracing::{debug, info, warn};
use crate::config::ProxyConfig;
@@ -27,6 +29,8 @@ use crate::transport::{ListenOptions, create_listener};
const USER_LABELED_METRICS_MAX_USERS: usize = 4096;
// Keeps TLS-front per-domain health series bounded for large generated configs.
const TLS_FRONT_PROFILE_HEALTH_MAX_DOMAINS: usize = 256;
const METRICS_MAX_CONTROL_CONNECTIONS: usize = 512;
const METRICS_HTTP_CONNECTION_TIMEOUT: Duration = Duration::from_secs(15);
pub async fn serve(
port: u16,
@@ -74,11 +78,11 @@ pub async fn serve(
return;
}
// Fallback: bind on 0.0.0.0 and [::] using metrics_port.
// Fallback: keep metrics local unless an explicit metrics_listen is configured.
let mut listener_v4 = None;
let mut listener_v6 = None;
let addr_v4 = SocketAddr::from(([0, 0, 0, 0], port));
let addr_v4 = SocketAddr::from(([127, 0, 0, 1], port));
match bind_metrics_listener(addr_v4, false, listen_backlog) {
Ok(listener) => {
info!(
@@ -92,11 +96,11 @@ pub async fn serve(
}
}
let addr_v6 = SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 0], port));
let addr_v6 = SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 1], port));
match bind_metrics_listener(addr_v6, true, listen_backlog) {
Ok(listener) => {
info!(
"Metrics endpoint: http://[::]:{}/metrics and /beobachten",
"Metrics endpoint: http://[::1]:{}/metrics and /beobachten",
port
);
listener_v6 = Some(listener);
@@ -184,6 +188,8 @@ async fn serve_listener(
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
whitelist: Arc<Vec<IpNetwork>>,
) {
let connection_permits = Arc::new(Semaphore::new(METRICS_MAX_CONTROL_CONNECTIONS));
loop {
let (stream, peer) = match listener.accept().await {
Ok(v) => v,
@@ -198,6 +204,18 @@ async fn serve_listener(
continue;
}
let connection_permit = match connection_permits.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
debug!(
peer = %peer,
max_connections = METRICS_MAX_CONTROL_CONNECTIONS,
"Dropping metrics connection: control-plane connection budget exhausted"
);
continue;
}
};
let stats = stats.clone();
let beobachten = beobachten.clone();
let shared_state = shared_state.clone();
@@ -205,6 +223,7 @@ async fn serve_listener(
let tls_cache = tls_cache.clone();
let config_rx_conn = config_rx.clone();
tokio::spawn(async move {
let _connection_permit = connection_permit;
let svc = service_fn(move |req| {
let stats = stats.clone();
let beobachten = beobachten.clone();
@@ -225,11 +244,23 @@ async fn serve_listener(
.await
}
});
if let Err(e) = http1::Builder::new()
.serve_connection(hyper_util::rt::TokioIo::new(stream), svc)
.await
match timeout(
METRICS_HTTP_CONNECTION_TIMEOUT,
http1::Builder::new().serve_connection(hyper_util::rt::TokioIo::new(stream), svc),
)
.await
{
debug!(error = %e, "Metrics connection error");
Ok(Ok(())) => {}
Ok(Err(e)) => {
debug!(error = %e, "Metrics connection error");
}
Err(_) => {
debug!(
peer = %peer,
timeout_ms = METRICS_HTTP_CONNECTION_TIMEOUT.as_millis() as u64,
"Metrics connection timed out"
);
}
}
});
}
@@ -343,10 +374,7 @@ async fn render_tls_front_profile_health(
out,
"# HELP telemt_tls_front_profile_age_seconds Age of cached TLS front profile data per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_age_seconds gauge"
);
let _ = writeln!(out, "# TYPE telemt_tls_front_profile_age_seconds gauge");
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_app_data_records TLS front cached app-data record count per configured domain"
@@ -359,10 +387,7 @@ async fn render_tls_front_profile_health(
out,
"# HELP telemt_tls_front_profile_ticket_records TLS front cached ticket-like tail record count per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_ticket_records gauge"
);
let _ = writeln!(out, "# TYPE telemt_tls_front_profile_ticket_records gauge");
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_change_cipher_spec_records TLS front cached ChangeCipherSpec record count per configured domain"
@@ -375,21 +400,14 @@ async fn render_tls_front_profile_health(
out,
"# HELP telemt_tls_front_profile_app_data_bytes TLS front cached total app-data bytes per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_app_data_bytes gauge"
);
let _ = writeln!(out, "# TYPE telemt_tls_front_profile_app_data_bytes gauge");
for item in health {
let domain = prometheus_label_value(&item.domain);
let _ = writeln!(
out,
"telemt_tls_front_profile_info{{domain=\"{}\",source=\"{}\",is_default=\"{}\",has_cert_info=\"{}\",has_cert_payload=\"{}\"}} 1",
domain,
item.source,
item.is_default,
item.has_cert_info,
item.has_cert_payload
domain, item.source, item.is_default, item.has_cert_info, item.has_cert_payload
);
let _ = writeln!(
out,
@@ -708,6 +726,63 @@ async fn render_metrics(
}
);
let _ = writeln!(
out,
"# HELP telemt_quota_refund_bytes_total Reserved quota bytes returned before commit"
);
let _ = writeln!(out, "# TYPE telemt_quota_refund_bytes_total counter");
let _ = writeln!(
out,
"telemt_quota_refund_bytes_total {}",
if core_enabled {
stats.get_quota_refund_bytes_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_quota_contention_total Quota reservation CAS contention events"
);
let _ = writeln!(out, "# TYPE telemt_quota_contention_total counter");
let _ = writeln!(
out,
"telemt_quota_contention_total {}",
if core_enabled {
stats.get_quota_contention_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_quota_contention_timeout_total Quota reservations that hit the bounded contention budget"
);
let _ = writeln!(out, "# TYPE telemt_quota_contention_timeout_total counter");
let _ = writeln!(
out,
"telemt_quota_contention_timeout_total {}",
if core_enabled {
stats.get_quota_contention_timeout_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_quota_acquire_cancelled_total Quota acquisitions cancelled before reservation completed"
);
let _ = writeln!(out, "# TYPE telemt_quota_acquire_cancelled_total counter");
let _ = writeln!(
out,
"telemt_quota_acquire_cancelled_total {}",
if core_enabled {
stats.get_quota_acquire_cancelled_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_conntrack_control_state Runtime conntrack control state flags"
@@ -822,6 +897,29 @@ async fn render_metrics(
);
let limiter_metrics = shared_state.traffic_limiter.metrics_snapshot();
let _ = writeln!(
out,
"# HELP telemt_rate_limiter_burst_bound_bytes Configured upper bound for one direct relay rate-limit burst"
);
let _ = writeln!(out, "# TYPE telemt_rate_limiter_burst_bound_bytes gauge");
let _ = writeln!(
out,
"telemt_rate_limiter_burst_bound_bytes{{direction=\"up\"}} {}",
if core_enabled {
config.general.direct_relay_copy_buf_c2s_bytes
} else {
0
}
);
let _ = writeln!(
out,
"telemt_rate_limiter_burst_bound_bytes{{direction=\"down\"}} {}",
if core_enabled {
config.general.direct_relay_copy_buf_s2c_bytes
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_rate_limiter_throttle_total Traffic limiter throttle events by scope and direction"
@@ -1924,6 +2022,85 @@ async fn render_metrics(
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_child_join_timeout_total Middle relay child tasks that did not join before cleanup deadline"
);
let _ = writeln!(out, "# TYPE telemt_me_child_join_timeout_total counter");
let _ = writeln!(
out,
"telemt_me_child_join_timeout_total {}",
if core_enabled {
stats.get_me_child_join_timeout_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_child_abort_total Middle relay child tasks aborted after bounded cleanup timeout"
);
let _ = writeln!(out, "# TYPE telemt_me_child_abort_total counter");
let _ = writeln!(
out,
"telemt_me_child_abort_total {}",
if core_enabled {
stats.get_me_child_abort_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_flow_wait_events_total Flow wait events by reason, direction, and outcome"
);
let _ = writeln!(out, "# TYPE telemt_flow_wait_events_total counter");
let _ = writeln!(
out,
"telemt_flow_wait_events_total{{reason=\"middle_rate_limit\",direction=\"down\",outcome=\"waited\"}} {}",
if core_enabled {
stats.get_flow_wait_middle_rate_limit_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_flow_wait_events_total{{reason=\"middle_rate_limit\",direction=\"down\",outcome=\"cancelled\"}} {}",
if core_enabled {
stats.get_flow_wait_middle_rate_limit_cancelled_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_flow_wait_ms_total Flow wait time in milliseconds by reason and direction"
);
let _ = writeln!(out, "# TYPE telemt_flow_wait_ms_total counter");
let _ = writeln!(
out,
"telemt_flow_wait_ms_total{{reason=\"middle_rate_limit\",direction=\"down\"}} {}",
if core_enabled {
stats.get_flow_wait_middle_rate_limit_ms_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_session_drop_fallback_total Session reservations cleaned by Drop instead of explicit async release"
);
let _ = writeln!(out, "# TYPE telemt_session_drop_fallback_total counter");
let _ = writeln!(
out,
"telemt_session_drop_fallback_total {}",
if core_enabled {
stats.get_session_drop_fallback_total()
} else {
0
}
);
let _ = writeln!(
out,
@@ -3597,8 +3774,9 @@ mod tests {
)));
assert!(output.contains("telemt_connections_total 2"));
assert!(output.contains("telemt_connections_bad_total 1"));
assert!(output
.contains("telemt_connections_bad_by_class_total{class=\"tls_handshake_bad_client\"} 1"));
assert!(output.contains(
"telemt_connections_bad_by_class_total{class=\"tls_handshake_bad_client\"} 1"
));
assert!(output.contains("telemt_handshake_timeouts_total 1"));
assert!(output.contains("telemt_handshake_failures_by_class_total{class=\"timeout\"} 1"));
assert!(output.contains("telemt_auth_expensive_checks_total 9"));
@@ -3714,16 +3892,21 @@ mod tests {
output.contains("telemt_tls_front_profile_info{domain=\"fallback.example\",source=\"default\",is_default=\"true\",has_cert_info=\"false\",has_cert_payload=\"false\"} 1")
);
assert!(
output.contains("telemt_tls_front_profile_app_data_records{domain=\"primary.example\"} 2")
output.contains(
"telemt_tls_front_profile_app_data_records{domain=\"primary.example\"} 2"
)
);
assert!(
output.contains("telemt_tls_front_profile_ticket_records{domain=\"primary.example\"} 1")
output
.contains("telemt_tls_front_profile_ticket_records{domain=\"primary.example\"} 1")
);
assert!(output.contains(
"telemt_tls_front_profile_change_cipher_spec_records{domain=\"primary.example\"} 1"
));
assert!(
output.contains("telemt_tls_front_profile_change_cipher_spec_records{domain=\"primary.example\"} 1")
);
assert!(
output.contains("telemt_tls_front_profile_app_data_bytes{domain=\"primary.example\"} 1536")
output.contains(
"telemt_tls_front_profile_app_data_bytes{domain=\"primary.example\"} 1536"
)
);
}
+45 -36
View File
@@ -32,7 +32,13 @@ struct UserConnectionReservation {
user: String,
ip: IpAddr,
tracks_ip: bool,
active: bool,
state: SessionReservationState,
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum SessionReservationState {
Active,
Released,
}
impl UserConnectionReservation {
@@ -49,28 +55,35 @@ impl UserConnectionReservation {
user,
ip,
tracks_ip,
active: true,
state: SessionReservationState::Active,
}
}
fn mark_released(&mut self) -> bool {
if self.state != SessionReservationState::Active {
return false;
}
self.state = SessionReservationState::Released;
true
}
async fn release(mut self) {
if !self.active {
if !self.mark_released() {
return;
}
if self.tracks_ip {
self.ip_tracker.remove_ip(&self.user, self.ip).await;
}
self.active = false;
self.stats.decrement_user_curr_connects(&self.user);
}
}
impl Drop for UserConnectionReservation {
fn drop(&mut self) {
if !self.active {
if !self.mark_released() {
return;
}
self.active = false;
self.stats.increment_session_drop_fallback_total();
self.stats.decrement_user_curr_connects(&self.user);
if self.tracks_ip {
self.ip_tracker.enqueue_cleanup(self.user.clone(), self.ip);
@@ -466,6 +479,17 @@ where
let mut local_addr = synthetic_local_addr(config.server.port);
if proxy_protocol_enabled {
if !is_trusted_proxy_source(peer.ip(), &config.server.proxy_protocol_trusted_cidrs) {
stats.increment_connects_bad_with_class("proxy_protocol_untrusted");
warn!(
peer = %peer,
trusted = ?config.server.proxy_protocol_trusted_cidrs,
"Rejecting PROXY protocol header from untrusted source"
);
record_beobachten_class(&beobachten, &config, peer.ip(), "other");
return Err(ProxyError::InvalidProxyProtocol);
}
let proxy_header_timeout =
Duration::from_millis(config.server.proxy_protocol_header_timeout_ms.max(1));
match timeout(
@@ -475,17 +499,6 @@ where
.await
{
Ok(Ok(info)) => {
if !is_trusted_proxy_source(peer.ip(), &config.server.proxy_protocol_trusted_cidrs)
{
stats.increment_connects_bad_with_class("proxy_protocol_untrusted");
warn!(
peer = %peer,
trusted = ?config.server.proxy_protocol_trusted_cidrs,
"Rejecting PROXY protocol header from untrusted source"
);
record_beobachten_class(&beobachten, &config, peer.ip(), "other");
return Err(ProxyError::InvalidProxyProtocol);
}
debug!(
peer = %peer,
client = %info.src_addr,
@@ -978,6 +991,21 @@ impl RunningClientHandler {
let mut local_addr = self.stream.local_addr().map_err(ProxyError::Io)?;
if self.proxy_protocol_enabled {
if !is_trusted_proxy_source(
self.peer.ip(),
&self.config.server.proxy_protocol_trusted_cidrs,
) {
self.stats
.increment_connects_bad_with_class("proxy_protocol_untrusted");
warn!(
peer = %self.peer,
trusted = ?self.config.server.proxy_protocol_trusted_cidrs,
"Rejecting PROXY protocol header from untrusted source"
);
record_beobachten_class(&self.beobachten, &self.config, self.peer.ip(), "other");
return Err(ProxyError::InvalidProxyProtocol);
}
let proxy_header_timeout =
Duration::from_millis(self.config.server.proxy_protocol_header_timeout_ms.max(1));
match timeout(
@@ -987,25 +1015,6 @@ impl RunningClientHandler {
.await
{
Ok(Ok(info)) => {
if !is_trusted_proxy_source(
self.peer.ip(),
&self.config.server.proxy_protocol_trusted_cidrs,
) {
self.stats
.increment_connects_bad_with_class("proxy_protocol_untrusted");
warn!(
peer = %self.peer,
trusted = ?self.config.server.proxy_protocol_trusted_cidrs,
"Rejecting PROXY protocol header from untrusted source"
);
record_beobachten_class(
&self.beobachten,
&self.config,
self.peer.ip(),
"other",
);
return Err(ProxyError::InvalidProxyProtocol);
}
debug!(
peer = %self.peer,
client = %info.src_addr,
+2 -3
View File
@@ -18,8 +18,7 @@ use crate::error::{ProxyError, Result};
use crate::protocol::constants::*;
use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce};
use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay,
RelayRouteMode, RouteCutoverState, affected_cutover_state, cutover_stagger_delay,
};
use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
@@ -360,7 +359,7 @@ where
"Cutover affected direct session, closing client connection"
);
tokio::time::sleep(delay).await;
break Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
break Err(ProxyError::RouteSwitched);
}
tokio::select! {
result = &mut relay_result => {
+4 -1
View File
@@ -1901,7 +1901,10 @@ where
.auth_expensive_checks_total
.fetch_add(validation_checks as u64, Ordering::Relaxed);
if config.access.is_user_source_ip_denied(user.as_str(), peer.ip()) {
if config
.access
.is_user_source_ip_denied(user.as_str(), peer.ip())
{
auth_probe_record_failure_in(shared, peer.ip(), Instant::now());
maybe_apply_server_hello_delay(config).await;
warn!(
+284 -105
View File
@@ -14,6 +14,7 @@ use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot, watch};
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
use crate::config::{ConntrackPressureProfile, ProxyConfig};
@@ -22,8 +23,7 @@ use crate::error::{ProxyError, Result};
use crate::protocol::constants::{secure_padding_len, *};
use crate::proxy::handshake::HandshakeSuccess;
use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay,
RelayRouteMode, RouteCutoverState, affected_cutover_state, cutover_stagger_delay,
};
use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
@@ -65,6 +65,15 @@ const ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES: usize = 128 * 1024;
const QUOTA_RESERVE_SPIN_RETRIES: usize = 32;
const QUOTA_RESERVE_BACKOFF_MIN_MS: u64 = 1;
const QUOTA_RESERVE_BACKOFF_MAX_MS: u64 = 16;
const QUOTA_RESERVE_MAX_BACKOFF_ROUNDS: usize = 16;
const ME_CHILD_JOIN_TIMEOUT: Duration = Duration::from_secs(2);
enum MiddleQuotaReserveError {
LimitExceeded,
Contended,
Cancelled,
DeadlineExceeded,
}
#[derive(Default)]
pub(crate) struct DesyncDedupRotationState {
@@ -622,21 +631,43 @@ async fn reserve_user_quota_with_yield(
user_stats: &UserStats,
bytes: u64,
limit: u64,
) -> std::result::Result<u64, QuotaReserveError> {
stats: &Stats,
cancel: &CancellationToken,
deadline: Option<Instant>,
) -> std::result::Result<u64, MiddleQuotaReserveError> {
let mut backoff_ms = QUOTA_RESERVE_BACKOFF_MIN_MS;
let mut backoff_rounds = 0usize;
loop {
for _ in 0..QUOTA_RESERVE_SPIN_RETRIES {
match user_stats.quota_try_reserve(bytes, limit) {
Ok(total) => return Ok(total),
Err(QuotaReserveError::LimitExceeded) => {
return Err(QuotaReserveError::LimitExceeded);
return Err(MiddleQuotaReserveError::LimitExceeded);
}
Err(QuotaReserveError::Contended) => {
stats.increment_quota_contention_total();
std::hint::spin_loop();
}
Err(QuotaReserveError::Contended) => std::hint::spin_loop(),
}
}
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
if deadline.is_some_and(|deadline| Instant::now() >= deadline) {
stats.increment_quota_contention_timeout_total();
return Err(MiddleQuotaReserveError::DeadlineExceeded);
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}
_ = cancel.cancelled() => {
stats.increment_quota_acquire_cancelled_total();
return Err(MiddleQuotaReserveError::Cancelled);
}
}
backoff_rounds = backoff_rounds.saturating_add(1);
if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS {
stats.increment_quota_contention_timeout_total();
return Err(MiddleQuotaReserveError::Contended);
}
backoff_ms = backoff_ms
.saturating_mul(2)
.min(QUOTA_RESERVE_BACKOFF_MAX_MS);
@@ -647,12 +678,13 @@ async fn wait_for_traffic_budget(
lease: Option<&Arc<TrafficLease>>,
direction: RateDirection,
bytes: u64,
) {
deadline: Option<Instant>,
) -> Result<()> {
if bytes == 0 {
return;
return Ok(());
}
let Some(lease) = lease else {
return;
return Ok(());
};
let mut remaining = bytes;
@@ -664,6 +696,9 @@ async fn wait_for_traffic_budget(
}
let wait_started_at = Instant::now();
if deadline.is_some_and(|deadline| wait_started_at >= deadline) {
return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded);
}
tokio::time::sleep(next_refill_delay()).await;
let wait_ms = wait_started_at
.elapsed()
@@ -676,6 +711,59 @@ async fn wait_for_traffic_budget(
wait_ms,
);
}
Ok(())
}
async fn wait_for_traffic_budget_or_cancel(
lease: Option<&Arc<TrafficLease>>,
direction: RateDirection,
bytes: u64,
cancel: &CancellationToken,
stats: &Stats,
deadline: Option<Instant>,
) -> Result<()> {
if bytes == 0 {
return Ok(());
}
let Some(lease) = lease else {
return Ok(());
};
let mut remaining = bytes;
while remaining > 0 {
let consume = lease.try_consume(direction, remaining);
if consume.granted > 0 {
remaining = remaining.saturating_sub(consume.granted);
continue;
}
let wait_started_at = Instant::now();
if deadline.is_some_and(|deadline| wait_started_at >= deadline) {
stats.increment_flow_wait_middle_rate_limit_cancelled_total();
return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded);
}
tokio::select! {
_ = tokio::time::sleep(next_refill_delay()) => {}
_ = cancel.cancelled() => {
stats.increment_flow_wait_middle_rate_limit_cancelled_total();
return Err(ProxyError::TrafficBudgetWaitCancelled);
}
}
let wait_ms = wait_started_at
.elapsed()
.as_millis()
.min(u128::from(u64::MAX)) as u64;
lease.observe_wait_ms(
direction,
consume.blocked_user,
consume.blocked_cidr,
wait_ms,
);
stats.observe_flow_wait_middle_rate_limit_ms(wait_ms);
}
Ok(())
}
fn classify_me_d2c_flush_reason(
@@ -1114,7 +1202,7 @@ where
tokio::time::sleep(delay).await;
let _ = me_pool.send_close(conn_id).await;
me_pool.registry().unregister(conn_id).await;
return Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
return Err(ProxyError::RouteSwitched);
}
// Per-user ad_tag from access.user_ad_tags; fallback to general.ad_tag (hot-reloadable)
@@ -1169,7 +1257,7 @@ where
let c2me_byte_semaphore = Arc::new(Semaphore::new(c2me_byte_budget));
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity);
let me_pool_c2me = me_pool.clone();
let c2me_sender = tokio::spawn(async move {
let mut c2me_sender = tokio::spawn(async move {
let mut sent_since_yield = 0usize;
while let Some(cmd) = c2me_rx.recv().await {
match cmd {
@@ -1205,16 +1293,18 @@ where
});
let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
let flow_cancel = CancellationToken::new();
let mut me_rx_task = me_rx;
let stats_clone = stats.clone();
let rng_clone = rng.clone();
let user_clone = user.clone();
let quota_user_stats_me_writer = quota_user_stats.clone();
let traffic_lease_me_writer = traffic_lease.clone();
let flow_cancel_me_writer = flow_cancel.clone();
let last_downstream_activity_ms_clone = last_downstream_activity_ms.clone();
let bytes_me2c_clone = bytes_me2c.clone();
let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config);
let me_writer = tokio::spawn(async move {
let mut me_writer = tokio::spawn(async move {
let mut writer = crypto_writer;
let mut frame_buf = Vec::with_capacity(16 * 1024);
let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes;
@@ -1234,7 +1324,7 @@ where
let Some(first) = msg else {
debug!(conn_id, "ME channel closed");
shrink_session_vec(&mut frame_buf, shrink_threshold);
return Err(ProxyError::Proxy("ME connection lost".into()));
return Err(ProxyError::MiddleConnectionLost);
};
let mut batch_frames = 0usize;
@@ -1256,6 +1346,7 @@ where
quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
traffic_lease_me_writer.as_ref(),
&flow_cancel_me_writer,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
@@ -1276,7 +1367,7 @@ where
} else {
None
};
let _ = writer.flush().await;
let _ = flush_client_or_cancel(&mut writer, &flow_cancel_me_writer).await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
@@ -1317,6 +1408,7 @@ where
quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
traffic_lease_me_writer.as_ref(),
&flow_cancel_me_writer,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
@@ -1338,7 +1430,8 @@ where
} else {
None
};
let _ = writer.flush().await;
let _ =
flush_client_or_cancel(&mut writer, &flow_cancel_me_writer).await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
@@ -1381,6 +1474,7 @@ where
quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
traffic_lease_me_writer.as_ref(),
&flow_cancel_me_writer,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
@@ -1405,7 +1499,11 @@ where
} else {
None
};
let _ = writer.flush().await;
let _ = flush_client_or_cancel(
&mut writer,
&flow_cancel_me_writer,
)
.await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
@@ -1447,6 +1545,7 @@ where
quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
traffic_lease_me_writer.as_ref(),
&flow_cancel_me_writer,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
@@ -1471,7 +1570,11 @@ where
} else {
None
};
let _ = writer.flush().await;
let _ = flush_client_or_cancel(
&mut writer,
&flow_cancel_me_writer,
)
.await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
@@ -1495,7 +1598,7 @@ where
Ok(None) => {
debug!(conn_id, "ME channel closed");
shrink_session_vec(&mut frame_buf, shrink_threshold);
return Err(ProxyError::Proxy("ME connection lost".into()));
return Err(ProxyError::MiddleConnectionLost);
}
Err(_) => {
max_delay_fired = true;
@@ -1517,7 +1620,7 @@ where
} else {
None
};
writer.flush().await.map_err(ProxyError::Io)?;
flush_client_or_cancel(&mut writer, &flow_cancel_me_writer).await?;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
@@ -1610,7 +1713,7 @@ where
stats.as_ref(),
)
.await;
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
main_result = Err(ProxyError::RouteSwitched);
break;
}
@@ -1641,26 +1744,50 @@ where
traffic_lease.as_ref(),
RateDirection::Up,
payload.len() as u64,
None,
)
.await;
.await?;
forensics.bytes_c2me = forensics
.bytes_c2me
.saturating_add(payload.len() as u64);
if let (Some(limit), Some(user_stats)) =
(quota_limit, quota_user_stats.as_deref())
{
if reserve_user_quota_with_yield(
match reserve_user_quota_with_yield(
user_stats,
payload.len() as u64,
limit,
stats.as_ref(),
&flow_cancel,
None,
)
.await
.is_err()
{
main_result = Err(ProxyError::DataQuotaExceeded {
user: user.clone(),
});
break;
Ok(_) => {}
Err(MiddleQuotaReserveError::LimitExceeded) => {
main_result = Err(ProxyError::DataQuotaExceeded {
user: user.clone(),
});
break;
}
Err(MiddleQuotaReserveError::Contended) => {
main_result = Err(ProxyError::Proxy(
"ME C->ME quota reservation contended".into(),
));
break;
}
Err(MiddleQuotaReserveError::Cancelled) => {
main_result = Err(ProxyError::Proxy(
"ME C->ME quota reservation cancelled".into(),
));
break;
}
Err(MiddleQuotaReserveError::DeadlineExceeded) => {
main_result = Err(ProxyError::Proxy(
"ME C->ME quota reservation deadline exceeded".into(),
));
break;
}
}
stats.add_user_octets_from_handle(user_stats, payload.len() as u64);
} else {
@@ -1729,22 +1856,34 @@ where
}
drop(c2me_tx);
let c2me_result = c2me_sender
.await
.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME sender join error: {e}"))));
let c2me_result = match timeout(ME_CHILD_JOIN_TIMEOUT, &mut c2me_sender).await {
Ok(joined) => {
joined.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME sender join error: {e}"))))
}
Err(_) => {
stats.increment_me_child_join_timeout_total();
stats.increment_me_child_abort_total();
c2me_sender.abort();
Err(ProxyError::Proxy("ME sender join timeout".into()))
}
};
flow_cancel.cancel();
let _ = stop_tx.send(());
let mut writer_result = me_writer
.await
.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}"))));
let mut writer_result = match timeout(ME_CHILD_JOIN_TIMEOUT, &mut me_writer).await {
Ok(joined) => {
joined.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}"))))
}
Err(_) => {
stats.increment_me_child_join_timeout_total();
stats.increment_me_child_abort_total();
me_writer.abort();
Err(ProxyError::Proxy("ME writer join timeout".into()))
}
};
// When client closes, but ME channel stopped as unregistered - it isnt error
if client_closed
&& matches!(
writer_result,
Err(ProxyError::Proxy(ref msg)) if msg == "ME connection lost"
)
{
if client_closed && matches!(writer_result, Err(ProxyError::MiddleConnectionLost)) {
writer_result = Ok(());
}
@@ -2300,6 +2439,7 @@ where
quota_limit,
quota_soft_overshoot_bytes,
None,
&CancellationToken::new(),
bytes_me2c,
conn_id,
ack_flush_immediate,
@@ -2320,6 +2460,7 @@ async fn process_me_writer_response_with_traffic_lease<W>(
quota_limit: Option<u64>,
quota_soft_overshoot_bytes: u64,
traffic_lease: Option<&Arc<TrafficLease>>,
cancel: &CancellationToken,
bytes_me2c: &AtomicU64,
conn_id: u64,
ack_flush_immediate: bool,
@@ -2338,31 +2479,65 @@ where
let data_len = data.len() as u64;
if let (Some(limit), Some(user_stats)) = (quota_limit, quota_user_stats) {
let soft_limit = quota_soft_cap(limit, quota_soft_overshoot_bytes);
if reserve_user_quota_with_yield(user_stats, data_len, soft_limit)
.await
.is_err()
match reserve_user_quota_with_yield(
user_stats, data_len, soft_limit, stats, cancel, None,
)
.await
{
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite);
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
Ok(_) => {}
Err(MiddleQuotaReserveError::LimitExceeded) => {
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite);
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
}
Err(MiddleQuotaReserveError::Contended) => {
return Err(ProxyError::Proxy(
"ME D->C quota reservation contended".into(),
));
}
Err(MiddleQuotaReserveError::Cancelled) => {
return Err(ProxyError::Proxy(
"ME D->C quota reservation cancelled".into(),
));
}
Err(MiddleQuotaReserveError::DeadlineExceeded) => {
return Err(ProxyError::Proxy(
"ME D->C quota reservation deadline exceeded".into(),
));
}
}
}
wait_for_traffic_budget(traffic_lease, RateDirection::Down, data_len).await;
wait_for_traffic_budget_or_cancel(
traffic_lease,
RateDirection::Down,
data_len,
cancel,
stats,
None,
)
.await?;
let write_mode =
match write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
.await
{
Ok(mode) => mode,
Err(err) => {
if quota_limit.is_some() {
stats.add_quota_write_fail_bytes_total(data_len);
stats.increment_quota_write_fail_events_total();
}
return Err(err);
let write_mode = match write_client_payload(
client_writer,
proto_tag,
flags,
&data,
rng,
frame_buf,
cancel,
)
.await
{
Ok(mode) => mode,
Err(err) => {
if quota_limit.is_some() {
stats.add_quota_write_fail_bytes_total(data_len);
stats.increment_quota_write_fail_events_total();
}
};
return Err(err);
}
};
bytes_me2c.fetch_add(data_len, Ordering::Relaxed);
if let Some(user_stats) = quota_user_stats {
@@ -2386,8 +2561,16 @@ where
} else {
trace!(conn_id, confirm, "ME->C quickack");
}
wait_for_traffic_budget(traffic_lease, RateDirection::Down, 4).await;
write_client_ack(client_writer, proto_tag, confirm).await?;
wait_for_traffic_budget_or_cancel(
traffic_lease,
RateDirection::Down,
4,
cancel,
stats,
None,
)
.await?;
write_client_ack(client_writer, proto_tag, confirm, cancel).await?;
stats.increment_me_d2c_ack_frames_total();
Ok(MeWriterResponseOutcome::Continue {
@@ -2439,6 +2622,7 @@ async fn write_client_payload<W>(
data: &[u8],
rng: &SecureRandom,
frame_buf: &mut Vec<u8>,
cancel: &CancellationToken,
) -> Result<MeD2cWriteMode>
where
W: AsyncWrite + Unpin + Send + 'static,
@@ -2466,21 +2650,12 @@ where
frame_buf.reserve(wire_len);
frame_buf.push(first);
frame_buf.extend_from_slice(data);
client_writer
.write_all(frame_buf.as_slice())
.await
.map_err(ProxyError::Io)?;
write_all_client_or_cancel(client_writer, frame_buf.as_slice(), cancel).await?;
MeD2cWriteMode::Coalesced
} else {
let header = [first];
client_writer
.write_all(&header)
.await
.map_err(ProxyError::Io)?;
client_writer
.write_all(data)
.await
.map_err(ProxyError::Io)?;
write_all_client_or_cancel(client_writer, &header, cancel).await?;
write_all_client_or_cancel(client_writer, data, cancel).await?;
MeD2cWriteMode::Split
}
} else if len_words < (1 << 24) {
@@ -2495,21 +2670,12 @@ where
frame_buf.reserve(wire_len);
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
frame_buf.extend_from_slice(data);
client_writer
.write_all(frame_buf.as_slice())
.await
.map_err(ProxyError::Io)?;
write_all_client_or_cancel(client_writer, frame_buf.as_slice(), cancel).await?;
MeD2cWriteMode::Coalesced
} else {
let header = [first, lw[0], lw[1], lw[2]];
client_writer
.write_all(&header)
.await
.map_err(ProxyError::Io)?;
client_writer
.write_all(data)
.await
.map_err(ProxyError::Io)?;
write_all_client_or_cancel(client_writer, &header, cancel).await?;
write_all_client_or_cancel(client_writer, data, cancel).await?;
MeD2cWriteMode::Split
}
} else {
@@ -2544,21 +2710,12 @@ where
frame_buf.resize(start + padding_len, 0);
rng.fill(&mut frame_buf[start..]);
}
client_writer
.write_all(frame_buf.as_slice())
.await
.map_err(ProxyError::Io)?;
write_all_client_or_cancel(client_writer, frame_buf.as_slice(), cancel).await?;
MeD2cWriteMode::Coalesced
} else {
let header = len_val.to_le_bytes();
client_writer
.write_all(&header)
.await
.map_err(ProxyError::Io)?;
client_writer
.write_all(data)
.await
.map_err(ProxyError::Io)?;
write_all_client_or_cancel(client_writer, &header, cancel).await?;
write_all_client_or_cancel(client_writer, data, cancel).await?;
if padding_len > 0 {
frame_buf.clear();
if frame_buf.capacity() < padding_len {
@@ -2566,10 +2723,7 @@ where
}
frame_buf.resize(padding_len, 0);
rng.fill(frame_buf.as_mut_slice());
client_writer
.write_all(frame_buf.as_slice())
.await
.map_err(ProxyError::Io)?;
write_all_client_or_cancel(client_writer, frame_buf.as_slice(), cancel).await?;
}
MeD2cWriteMode::Split
}
@@ -2583,6 +2737,7 @@ async fn write_client_ack<W>(
client_writer: &mut CryptoWriter<W>,
proto_tag: ProtoTag,
confirm: u32,
cancel: &CancellationToken,
) -> Result<()>
where
W: AsyncWrite + Unpin + Send + 'static,
@@ -2592,10 +2747,34 @@ where
} else {
confirm.to_le_bytes()
};
client_writer
.write_all(&bytes)
.await
.map_err(ProxyError::Io)
write_all_client_or_cancel(client_writer, &bytes, cancel).await
}
async fn write_all_client_or_cancel<W>(
client_writer: &mut CryptoWriter<W>,
bytes: &[u8],
cancel: &CancellationToken,
) -> Result<()>
where
W: AsyncWrite + Unpin + Send + 'static,
{
tokio::select! {
result = client_writer.write_all(bytes) => result.map_err(ProxyError::Io),
_ = cancel.cancelled() => Err(ProxyError::MiddleClientWriterCancelled),
}
}
async fn flush_client_or_cancel<W>(
client_writer: &mut CryptoWriter<W>,
cancel: &CancellationToken,
) -> Result<()>
where
W: AsyncWrite + Unpin + Send + 'static,
{
tokio::select! {
result = client_writer.flush() => result.map_err(ProxyError::Io),
_ = cancel.cancelled() => Err(ProxyError::MiddleClientWriterCancelled),
}
}
#[cfg(test)]
+109 -52
View File
@@ -215,6 +215,7 @@ struct StatsIo<S> {
c2s_rate_debt_bytes: u64,
c2s_wait: RateWaitState,
s2c_wait: RateWaitState,
quota_wait: RateWaitState,
quota_limit: Option<u64>,
quota_exceeded: Arc<AtomicBool>,
quota_bytes_since_check: u64,
@@ -275,6 +276,7 @@ impl<S> StatsIo<S> {
c2s_rate_debt_bytes: 0,
c2s_wait: RateWaitState::default(),
s2c_wait: RateWaitState::default(),
quota_wait: RateWaitState::default(),
quota_limit,
quota_exceeded,
quota_bytes_since_check: 0,
@@ -353,6 +355,11 @@ impl<S> StatsIo<S> {
Poll::Ready(())
}
fn arm_quota_wait(&mut self, cx: &mut Context<'_>) -> Poll<()> {
Self::arm_wait(&mut self.quota_wait, false, false);
Self::poll_wait(&mut self.quota_wait, cx, None, RateDirection::Up)
}
}
#[derive(Debug)]
@@ -430,8 +437,13 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
if this.settle_c2s_rate_debt(cx).is_pending() {
return Poll::Pending;
}
if buf.remaining() == 0 {
return Pin::new(&mut this.inner).poll_read(cx, buf);
}
let mut remaining_before = None;
let mut reserved_read_bytes = 0u64;
let mut read_limit = buf.remaining();
if let Some(limit) = this.quota_limit {
let used_before = this.user_stats.quota_used();
let remaining = limit.saturating_sub(used_before);
@@ -440,50 +452,79 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
return Poll::Ready(Err(quota_io_error()));
}
remaining_before = Some(remaining);
read_limit = read_limit.min(remaining as usize);
if read_limit == 0 {
this.quota_exceeded.store(true, Ordering::Release);
return Poll::Ready(Err(quota_io_error()));
}
let desired = read_limit as u64;
let mut reserve_rounds = 0usize;
while reserved_read_bytes == 0 {
for _ in 0..QUOTA_RESERVE_SPIN_RETRIES {
match this.user_stats.quota_try_reserve(desired, limit) {
Ok(_) => {
reserved_read_bytes = desired;
break;
}
Err(crate::stats::QuotaReserveError::LimitExceeded) => {
this.quota_exceeded.store(true, Ordering::Release);
return Poll::Ready(Err(quota_io_error()));
}
Err(crate::stats::QuotaReserveError::Contended) => {
this.stats.increment_quota_contention_total();
}
}
}
if reserved_read_bytes == 0 {
reserve_rounds = reserve_rounds.saturating_add(1);
if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS {
this.stats.increment_quota_contention_timeout_total();
if this.arm_quota_wait(cx).is_pending() {
return Poll::Pending;
}
reserve_rounds = 0;
}
}
}
}
let before = buf.filled().len();
let limited_read = read_limit < buf.remaining();
let read_result = if limited_read {
let mut limited_buf = ReadBuf::new(buf.initialize_unfilled_to(read_limit));
match Pin::new(&mut this.inner).poll_read(cx, &mut limited_buf) {
Poll::Ready(Ok(())) => {
let n = limited_buf.filled().len();
buf.advance(n);
Poll::Ready(Ok(n))
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
} else {
let before = buf.filled().len();
match Pin::new(&mut this.inner).poll_read(cx, buf) {
Poll::Ready(Ok(())) => {
let n = buf.filled().len() - before;
Poll::Ready(Ok(n))
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
};
match Pin::new(&mut this.inner).poll_read(cx, buf) {
Poll::Ready(Ok(())) => {
let n = buf.filled().len() - before;
match read_result {
Poll::Ready(Ok(n)) => {
if reserved_read_bytes > n as u64 {
let refund_bytes = reserved_read_bytes - n as u64;
refund_reserved_quota_bytes(this.user_stats.as_ref(), refund_bytes);
this.stats.add_quota_refund_bytes_total(refund_bytes);
}
if n > 0 {
let n_to_charge = n as u64;
if let (Some(limit), Some(remaining)) = (this.quota_limit, remaining_before) {
let mut reserved_total = None;
let mut reserve_rounds = 0usize;
while reserved_total.is_none() {
let mut saw_contention = false;
for _ in 0..QUOTA_RESERVE_SPIN_RETRIES {
match this.user_stats.quota_try_reserve(n_to_charge, limit) {
Ok(total) => {
reserved_total = Some(total);
break;
}
Err(crate::stats::QuotaReserveError::LimitExceeded) => {
this.quota_exceeded.store(true, Ordering::Release);
buf.set_filled(before);
return Poll::Ready(Err(quota_io_error()));
}
Err(crate::stats::QuotaReserveError::Contended) => {
saw_contention = true;
}
}
}
if reserved_total.is_none() {
reserve_rounds = reserve_rounds.saturating_add(1);
if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS {
this.quota_exceeded.store(true, Ordering::Release);
buf.set_filled(before);
return Poll::Ready(Err(quota_io_error()));
}
if saw_contention {
std::thread::yield_now();
}
}
}
if let Some(remaining) = remaining_before {
if should_immediate_quota_check(remaining, n_to_charge) {
this.quota_bytes_since_check = 0;
} else {
@@ -494,10 +535,11 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
this.quota_bytes_since_check = 0;
}
}
if reserved_total.unwrap_or(0) >= limit {
this.quota_exceeded.store(true, Ordering::Release);
}
}
if let Some(limit) = this.quota_limit
&& this.user_stats.quota_used() >= limit
{
this.quota_exceeded.store(true, Ordering::Release);
}
// C→S: client sent data
@@ -521,7 +563,20 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
}
Poll::Ready(Ok(()))
}
other => other,
Poll::Pending => {
if reserved_read_bytes > 0 {
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_read_bytes);
this.stats.add_quota_refund_bytes_total(reserved_read_bytes);
}
Poll::Pending
}
Poll::Ready(Err(err)) => {
if reserved_read_bytes > 0 {
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_read_bytes);
this.stats.add_quota_refund_bytes_total(reserved_read_bytes);
}
Poll::Ready(Err(err))
}
}
}
}
@@ -603,6 +658,7 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
break;
}
Err(crate::stats::QuotaReserveError::Contended) => {
this.stats.increment_quota_contention_total();
saw_contention = true;
}
}
@@ -611,14 +667,14 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
if reserved_bytes == 0 {
reserve_rounds = reserve_rounds.saturating_add(1);
if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS {
this.stats.increment_quota_contention_timeout_total();
if let Some(lease) = this.traffic_lease.as_ref() {
lease.refund(RateDirection::Down, shaper_reserved_bytes);
}
this.quota_exceeded.store(true, Ordering::Release);
return Poll::Ready(Err(quota_io_error()));
}
if saw_contention {
std::thread::yield_now();
let _ = this.arm_quota_wait(cx);
return Poll::Pending;
} else if saw_contention {
std::hint::spin_loop();
}
}
}
@@ -639,10 +695,9 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
match Pin::new(&mut this.inner).poll_write(cx, write_buf) {
Poll::Ready(Ok(n)) => {
if reserved_bytes > n as u64 {
refund_reserved_quota_bytes(
this.user_stats.as_ref(),
reserved_bytes - n as u64,
);
let refund_bytes = reserved_bytes - n as u64;
refund_reserved_quota_bytes(this.user_stats.as_ref(), refund_bytes);
this.stats.add_quota_refund_bytes_total(refund_bytes);
}
if shaper_reserved_bytes > n as u64
&& let Some(lease) = this.traffic_lease.as_ref()
@@ -693,6 +748,7 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
Poll::Ready(Err(err)) => {
if reserved_bytes > 0 {
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_bytes);
this.stats.add_quota_refund_bytes_total(reserved_bytes);
}
if shaper_reserved_bytes > 0
&& let Some(lease) = this.traffic_lease.as_ref()
@@ -704,6 +760,7 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
Poll::Pending => {
if reserved_bytes > 0 {
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_bytes);
this.stats.add_quota_refund_bytes_total(reserved_bytes);
}
if shaper_reserved_bytes > 0
&& let Some(lease) = this.traffic_lease.as_ref()
-2
View File
@@ -4,8 +4,6 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::watch;
pub(crate) const ROUTE_SWITCH_ERROR_MSG: &str = "Session terminated";
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[repr(u8)]
pub(crate) enum RelayRouteMode {
+1 -1
View File
@@ -661,7 +661,7 @@ async fn integration_route_cutover_and_quota_overlap_fails_closed_and_releases_s
assert!(
matches!(relay_result, Err(ProxyError::DataQuotaExceeded { .. }))
|| matches!(relay_result, Err(ProxyError::Proxy(ref msg)) if msg == crate::proxy::route_mode::ROUTE_SWITCH_ERROR_MSG),
|| matches!(relay_result, Err(ProxyError::RouteSwitched)),
"overlap race must fail closed via quota enforcement or generic cutover termination"
);
+37 -16
View File
@@ -637,6 +637,22 @@ fn unknown_dc_log_path_revalidation_rejects_parent_swapped_to_symlink() {
"telemt-unknown-dc-parent-swap-{}",
std::process::id()
));
if let Ok(meta) = fs::symlink_metadata(&parent) {
if meta.file_type().is_symlink() || meta.is_file() {
fs::remove_file(&parent).expect("stale parent-swap path must be removable");
} else {
fs::remove_dir_all(&parent).expect("stale parent-swap directory must be removable");
}
}
let moved = parent.with_extension("bak");
if let Ok(meta) = fs::symlink_metadata(&moved) {
if meta.file_type().is_symlink() || meta.is_file() {
fs::remove_file(&moved).expect("stale parent-swap backup path must be removable");
} else {
fs::remove_dir_all(&moved)
.expect("stale parent-swap backup directory must be removable");
}
}
fs::create_dir_all(&parent).expect("parent-swap test parent must be creatable");
let rel_candidate = format!(
@@ -646,8 +662,6 @@ fn unknown_dc_log_path_revalidation_rejects_parent_swapped_to_symlink() {
let sanitized = sanitize_unknown_dc_log_path(&rel_candidate)
.expect("candidate must sanitize before parent swap");
let moved = parent.with_extension("bak");
let _ = fs::remove_dir_all(&moved);
fs::rename(&parent, &moved).expect("parent must be movable for swap simulation");
symlink("/tmp", &parent).expect("symlink replacement for parent must be creatable");
@@ -720,6 +734,24 @@ fn adversarial_parent_swap_after_check_is_blocked_by_anchored_open() {
"telemt-unknown-dc-parent-swap-openat-{}",
std::process::id()
));
if let Ok(meta) = fs::symlink_metadata(&base) {
if meta.file_type().is_symlink() || meta.is_file() {
fs::remove_file(&base).expect("stale parent-swap-openat path must be removable");
} else {
fs::remove_dir_all(&base)
.expect("stale parent-swap-openat directory must be removable");
}
}
let moved = base.with_extension("bak");
if let Ok(meta) = fs::symlink_metadata(&moved) {
if meta.file_type().is_symlink() || meta.is_file() {
fs::remove_file(&moved)
.expect("stale parent-swap-openat backup path must be removable");
} else {
fs::remove_dir_all(&moved)
.expect("stale parent-swap-openat backup directory must be removable");
}
}
fs::create_dir_all(&base).expect("parent-swap-openat base must be creatable");
let rel_candidate = format!(
@@ -743,8 +775,6 @@ fn adversarial_parent_swap_after_check_is_blocked_by_anchored_open() {
let outside_target = outside_parent.join("unknown-dc.log");
let _ = fs::remove_file(&outside_target);
let moved = base.with_extension("bak");
let _ = fs::remove_dir_all(&moved);
fs::rename(&base, &moved).expect("base parent must be movable for swap simulation");
symlink(&outside_parent, &base).expect("base parent symlink replacement must be creatable");
@@ -1489,10 +1519,7 @@ async fn direct_relay_cutover_midflight_releases_route_gauge() {
"cutover should terminate direct relay session"
);
assert!(
matches!(
relay_result,
Err(ProxyError::Proxy(ref msg)) if msg == ROUTE_SWITCH_ERROR_MSG
),
matches!(relay_result, Err(ProxyError::RouteSwitched)),
"client-visible cutover error must stay generic and avoid route-internal metadata"
);
@@ -1629,10 +1656,7 @@ async fn direct_relay_cutover_storm_multi_session_keeps_generic_errors_and_relea
.expect("direct relay task must not panic");
assert!(
matches!(
relay_result,
Err(ProxyError::Proxy(ref msg)) if msg == ROUTE_SWITCH_ERROR_MSG
),
matches!(relay_result, Err(ProxyError::RouteSwitched)),
"storm-cutover termination must remain generic for all direct sessions"
);
}
@@ -1935,10 +1959,7 @@ async fn adversarial_direct_relay_cutover_integrity() {
.expect("Session must not panic");
assert!(
matches!(
result,
Err(ProxyError::Proxy(ref msg)) if msg == ROUTE_SWITCH_ERROR_MSG
),
matches!(result, Err(ProxyError::RouteSwitched)),
"Session must terminate with route switch error on cutover"
);
}
@@ -13,6 +13,8 @@ struct CountedWriter {
fail_writes: bool,
}
struct StalledWriter;
impl CountedWriter {
fn new(write_calls: Arc<AtomicUsize>, fail_writes: bool) -> Self {
Self {
@@ -49,12 +51,36 @@ impl AsyncWrite for CountedWriter {
}
}
impl AsyncWrite for StalledWriter {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &[u8],
) -> Poll<io::Result<usize>> {
Poll::Pending
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Pending
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Pending
}
}
fn make_crypto_writer(inner: CountedWriter) -> CryptoWriter<CountedWriter> {
let key = [0u8; 32];
let iv = 0u128;
CryptoWriter::new(inner, AesCtr::new(&key, iv), 8 * 1024)
}
fn make_stalled_crypto_writer() -> CryptoWriter<StalledWriter> {
let key = [0u8; 32];
let iv = 0u128;
CryptoWriter::new(StalledWriter, AesCtr::new(&key, iv), 8 * 1024)
}
#[tokio::test]
async fn me_writer_write_fail_keeps_reserved_quota_and_tracks_fail_metrics() {
let stats = Stats::new();
@@ -189,3 +215,53 @@ async fn me_writer_pre_write_quota_reject_happens_before_writer_poll() {
);
assert_eq!(bytes_me2c.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn me_writer_data_write_obeys_flow_cancellation() {
let stats = Stats::new();
let user = "middle-me-writer-cancel-user";
let mut writer = make_stalled_crypto_writer();
let mut frame_buf = Vec::new();
let bytes_me2c = AtomicU64::new(0);
let cancel = CancellationToken::new();
cancel.cancel();
let result = process_me_writer_response_with_traffic_lease(
MeResponse::Data {
flags: 0,
data: Bytes::from_static(&[0x31, 0x32, 0x33, 0x34]),
route_permit: None,
},
&mut writer,
ProtoTag::Intermediate,
&SecureRandom::new(),
&mut frame_buf,
&stats,
user,
None,
None,
0,
None,
&cancel,
&bytes_me2c,
13,
true,
false,
)
.await;
assert!(
matches!(result, Err(ProxyError::MiddleClientWriterCancelled)),
"cancelled middle writer must return a bounded cancellation error"
);
assert_eq!(
bytes_me2c.load(Ordering::Relaxed),
0,
"cancelled write must not advance committed ME->C bytes"
);
assert_eq!(
stats.get_user_total_octets(user),
0,
"cancelled write must not advance user output telemetry"
);
}
@@ -4,10 +4,67 @@ use std::io;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use std::task::{Context, Poll, Wake};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::time::Instant;
enum ReadStep {
Data(Vec<u8>),
Pending,
Eof,
Error,
}
struct ScriptedReader {
scripted_reads: Arc<Mutex<VecDeque<ReadStep>>>,
read_calls: Arc<AtomicUsize>,
}
impl ScriptedReader {
fn new(script: Vec<ReadStep>, read_calls: Arc<AtomicUsize>) -> Self {
Self {
scripted_reads: Arc::new(Mutex::new(script.into())),
read_calls,
}
}
}
impl AsyncRead for ScriptedReader {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let this = self.get_mut();
this.read_calls.fetch_add(1, Ordering::Relaxed);
let step = this
.scripted_reads
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.pop_front()
.unwrap_or(ReadStep::Eof);
match step {
ReadStep::Data(data) => {
let n = data.len().min(buf.remaining());
buf.put_slice(&data[..n]);
Poll::Ready(Ok(()))
}
ReadStep::Pending => Poll::Pending,
ReadStep::Eof => Poll::Ready(Ok(())),
ReadStep::Error => Poll::Ready(Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"forced read failure",
))),
}
}
}
struct NoopWake;
impl Wake for NoopWake {
fn wake(self: Arc<Self>) {}
}
struct ScriptedWriter {
scripted_writes: Arc<Mutex<VecDeque<usize>>>,
write_calls: Arc<AtomicUsize>,
@@ -80,6 +137,127 @@ fn make_stats_io_with_script(
(io, stats, write_calls, quota_exceeded)
}
fn make_stats_io_with_read_script(
user: &str,
quota_limit: u64,
precharged_quota: u64,
script: Vec<ReadStep>,
) -> (
StatsIo<ScriptedReader>,
Arc<Stats>,
Arc<AtomicUsize>,
Arc<AtomicBool>,
) {
let stats = Arc::new(Stats::new());
if precharged_quota > 0 {
let user_stats = stats.get_or_create_user_stats_handle(user);
stats.quota_charge_post_write(user_stats.as_ref(), precharged_quota);
}
let read_calls = Arc::new(AtomicUsize::new(0));
let quota_exceeded = Arc::new(AtomicBool::new(false));
let io = StatsIo::new(
ScriptedReader::new(script, read_calls.clone()),
Arc::new(SharedCounters::new()),
stats.clone(),
user.to_string(),
Some(quota_limit),
quota_exceeded.clone(),
Instant::now(),
);
(io, stats, read_calls, quota_exceeded)
}
fn poll_read_once<R: AsyncRead + Unpin>(
io: &mut StatsIo<R>,
storage: &mut [u8],
) -> Poll<io::Result<usize>> {
let waker = Arc::new(NoopWake).into();
let mut cx = Context::from_waker(&waker);
let mut read_buf = ReadBuf::new(storage);
let before = read_buf.filled().len();
match Pin::new(io).poll_read(&mut cx, &mut read_buf) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(read_buf.filled().len() - before)),
Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
Poll::Pending => Poll::Pending,
}
}
#[test]
fn direct_c2s_quota_refunds_unused_on_short_read() {
let user = "direct-c2s-short-read-refund-user";
let (mut io, stats, read_calls, quota_exceeded) =
make_stats_io_with_read_script(user, 64, 0, vec![ReadStep::Data(vec![0x11; 5])]);
let mut storage = [0u8; 16];
let n = match poll_read_once(&mut io, &mut storage) {
Poll::Ready(Ok(n)) => n,
other => panic!("short read must complete, got {other:?}"),
};
assert_eq!(n, 5);
assert_eq!(read_calls.load(Ordering::Relaxed), 1);
assert_eq!(stats.get_user_quota_used(user), 5);
assert_eq!(stats.get_quota_refund_bytes_total(), 11);
assert!(!quota_exceeded.load(Ordering::Acquire));
}
#[test]
fn direct_c2s_quota_refunds_full_reservation_on_pending() {
let user = "direct-c2s-pending-refund-user";
let (mut io, stats, read_calls, quota_exceeded) =
make_stats_io_with_read_script(user, 64, 0, vec![ReadStep::Pending]);
let mut storage = [0u8; 16];
assert!(matches!(
poll_read_once(&mut io, &mut storage),
Poll::Pending
));
assert_eq!(read_calls.load(Ordering::Relaxed), 1);
assert_eq!(stats.get_user_quota_used(user), 0);
assert_eq!(stats.get_quota_refund_bytes_total(), 16);
assert!(!quota_exceeded.load(Ordering::Acquire));
}
#[test]
fn direct_c2s_quota_refunds_full_reservation_on_eof() {
let user = "direct-c2s-eof-refund-user";
let (mut io, stats, read_calls, quota_exceeded) =
make_stats_io_with_read_script(user, 64, 0, vec![ReadStep::Eof]);
let mut storage = [0u8; 16];
let n = match poll_read_once(&mut io, &mut storage) {
Poll::Ready(Ok(n)) => n,
other => panic!("EOF read must complete with zero bytes, got {other:?}"),
};
assert_eq!(n, 0);
assert_eq!(read_calls.load(Ordering::Relaxed), 1);
assert_eq!(stats.get_user_quota_used(user), 0);
assert_eq!(stats.get_quota_refund_bytes_total(), 16);
assert!(!quota_exceeded.load(Ordering::Acquire));
}
#[test]
fn direct_c2s_quota_refunds_full_reservation_on_error() {
let user = "direct-c2s-error-refund-user";
let (mut io, stats, read_calls, quota_exceeded) =
make_stats_io_with_read_script(user, 64, 0, vec![ReadStep::Error]);
let mut storage = [0u8; 16];
let error = match poll_read_once(&mut io, &mut storage) {
Poll::Ready(Err(error)) => error,
other => panic!("error read must return error, got {other:?}"),
};
assert_eq!(error.kind(), io::ErrorKind::BrokenPipe);
assert_eq!(read_calls.load(Ordering::Relaxed), 1);
assert_eq!(stats.get_user_quota_used(user), 0);
assert_eq!(stats.get_quota_refund_bytes_total(), 16);
assert!(!quota_exceeded.load(Ordering::Acquire));
}
#[tokio::test]
async fn direct_partial_write_charges_only_committed_bytes_without_double_charge() {
let user = "direct-partial-charge-user";
+138 -23
View File
@@ -8,8 +8,8 @@ pub mod telemetry;
use dashmap::DashMap;
use lru::LruCache;
use parking_lot::Mutex;
use std::collections::{HashMap, VecDeque};
use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, VecDeque};
use std::hash::{Hash, Hasher};
use std::num::NonZeroUsize;
use std::sync::Arc;
@@ -274,11 +274,22 @@ pub struct Stats {
me_inline_recovery_total: AtomicU64,
ip_reservation_rollback_tcp_limit_total: AtomicU64,
ip_reservation_rollback_quota_limit_total: AtomicU64,
quota_refund_bytes_total: AtomicU64,
quota_contention_total: AtomicU64,
quota_contention_timeout_total: AtomicU64,
quota_acquire_cancelled_total: AtomicU64,
quota_write_fail_bytes_total: AtomicU64,
quota_write_fail_events_total: AtomicU64,
me_child_join_timeout_total: AtomicU64,
me_child_abort_total: AtomicU64,
flow_wait_middle_rate_limit_total: AtomicU64,
flow_wait_middle_rate_limit_cancelled_total: AtomicU64,
flow_wait_middle_rate_limit_ms_total: AtomicU64,
session_drop_fallback_total: AtomicU64,
telemetry_core_enabled: AtomicBool,
telemetry_user_enabled: AtomicBool,
telemetry_me_level: AtomicU8,
cached_epoch_secs: AtomicU64,
user_stats: DashMap<String, Arc<UserStats>>,
user_stats_last_cleanup_epoch_secs: AtomicU64,
start_time: parking_lot::RwLock<Option<Instant>>,
@@ -348,6 +359,7 @@ impl Stats {
pub fn new() -> Self {
let stats = Self::default();
stats.apply_telemetry_policy(TelemetryPolicy::default());
stats.refresh_cached_epoch_secs();
*stats.start_time.write() = Some(Instant::now());
stats
}
@@ -397,33 +409,55 @@ impl Stats {
.as_secs()
}
fn touch_user_stats(stats: &UserStats) {
fn refresh_cached_epoch_secs(&self) -> u64 {
let now_epoch_secs = Self::now_epoch_secs();
self.cached_epoch_secs
.store(now_epoch_secs, Ordering::Relaxed);
now_epoch_secs
}
fn cached_epoch_secs(&self) -> u64 {
let cached = self.cached_epoch_secs.load(Ordering::Relaxed);
if cached != 0 {
return cached;
}
self.refresh_cached_epoch_secs()
}
fn touch_user_stats(&self, stats: &UserStats) {
stats
.last_seen_epoch_secs
.store(Self::now_epoch_secs(), Ordering::Relaxed);
.store(self.cached_epoch_secs(), Ordering::Relaxed);
}
pub(crate) fn get_or_create_user_stats_handle(&self, user: &str) -> Arc<UserStats> {
self.maybe_cleanup_user_stats();
if let Some(existing) = self.user_stats.get(user) {
let handle = Arc::clone(existing.value());
Self::touch_user_stats(handle.as_ref());
self.touch_user_stats(handle.as_ref());
return handle;
}
let entry = self.user_stats.entry(user.to_string()).or_default();
if entry.last_seen_epoch_secs.load(Ordering::Relaxed) == 0 {
Self::touch_user_stats(entry.value().as_ref());
self.touch_user_stats(entry.value().as_ref());
}
Arc::clone(entry.value())
}
pub(crate) async fn run_periodic_user_stats_maintenance(self: Arc<Self>) {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
self.maybe_cleanup_user_stats();
}
}
#[inline]
pub(crate) fn add_user_octets_from_handle(&self, user_stats: &UserStats, bytes: u64) {
if !self.telemetry_user_enabled() {
return;
}
Self::touch_user_stats(user_stats);
self.touch_user_stats(user_stats);
user_stats
.octets_from_client
.fetch_add(bytes, Ordering::Relaxed);
@@ -434,7 +468,7 @@ impl Stats {
if !self.telemetry_user_enabled() {
return;
}
Self::touch_user_stats(user_stats);
self.touch_user_stats(user_stats);
user_stats
.octets_to_client
.fetch_add(bytes, Ordering::Relaxed);
@@ -445,7 +479,7 @@ impl Stats {
if !self.telemetry_user_enabled() {
return;
}
Self::touch_user_stats(user_stats);
self.touch_user_stats(user_stats);
user_stats.msgs_from_client.fetch_add(1, Ordering::Relaxed);
}
@@ -454,7 +488,7 @@ impl Stats {
if !self.telemetry_user_enabled() {
return;
}
Self::touch_user_stats(user_stats);
self.touch_user_stats(user_stats);
user_stats.msgs_to_client.fetch_add(1, Ordering::Relaxed);
}
@@ -464,7 +498,7 @@ impl Stats {
/// mixing reserve and post-charge on a single I/O event.
#[inline]
pub(crate) fn quota_charge_post_write(&self, user_stats: &UserStats, bytes: u64) -> u64 {
Self::touch_user_stats(user_stats);
self.touch_user_stats(user_stats);
user_stats
.quota_used
.fetch_add(bytes, Ordering::Relaxed)
@@ -475,7 +509,7 @@ impl Stats {
const USER_STATS_CLEANUP_INTERVAL_SECS: u64 = 60;
const USER_STATS_IDLE_TTL_SECS: u64 = 24 * 60 * 60;
let now_epoch_secs = Self::now_epoch_secs();
let now_epoch_secs = self.refresh_cached_epoch_secs();
let last_cleanup_epoch_secs = self
.user_stats_last_cleanup_epoch_secs
.load(Ordering::Relaxed);
@@ -1437,6 +1471,29 @@ impl Stats {
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn add_quota_refund_bytes_total(&self, bytes: u64) {
if self.telemetry_core_enabled() {
self.quota_refund_bytes_total
.fetch_add(bytes, Ordering::Relaxed);
}
}
pub fn increment_quota_contention_total(&self) {
if self.telemetry_core_enabled() {
self.quota_contention_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_quota_contention_timeout_total(&self) {
if self.telemetry_core_enabled() {
self.quota_contention_timeout_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_quota_acquire_cancelled_total(&self) {
if self.telemetry_core_enabled() {
self.quota_acquire_cancelled_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn add_quota_write_fail_bytes_total(&self, bytes: u64) {
if self.telemetry_core_enabled() {
self.quota_write_fail_bytes_total
@@ -1449,6 +1506,37 @@ impl Stats {
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_child_join_timeout_total(&self) {
if self.telemetry_core_enabled() {
self.me_child_join_timeout_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_child_abort_total(&self) {
if self.telemetry_core_enabled() {
self.me_child_abort_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn observe_flow_wait_middle_rate_limit_ms(&self, wait_ms: u64) {
if self.telemetry_core_enabled() {
self.flow_wait_middle_rate_limit_total
.fetch_add(1, Ordering::Relaxed);
self.flow_wait_middle_rate_limit_ms_total
.fetch_add(wait_ms, Ordering::Relaxed);
}
}
pub fn increment_flow_wait_middle_rate_limit_cancelled_total(&self) {
if self.telemetry_core_enabled() {
self.flow_wait_middle_rate_limit_cancelled_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_session_drop_fallback_total(&self) {
if self.telemetry_core_enabled() {
self.session_drop_fallback_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_endpoint_quarantine_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_endpoint_quarantine_total
@@ -2283,19 +2371,52 @@ impl Stats {
self.ip_reservation_rollback_quota_limit_total
.load(Ordering::Relaxed)
}
pub fn get_quota_refund_bytes_total(&self) -> u64 {
self.quota_refund_bytes_total.load(Ordering::Relaxed)
}
pub fn get_quota_contention_total(&self) -> u64 {
self.quota_contention_total.load(Ordering::Relaxed)
}
pub fn get_quota_contention_timeout_total(&self) -> u64 {
self.quota_contention_timeout_total.load(Ordering::Relaxed)
}
pub fn get_quota_acquire_cancelled_total(&self) -> u64 {
self.quota_acquire_cancelled_total.load(Ordering::Relaxed)
}
pub fn get_quota_write_fail_bytes_total(&self) -> u64 {
self.quota_write_fail_bytes_total.load(Ordering::Relaxed)
}
pub fn get_quota_write_fail_events_total(&self) -> u64 {
self.quota_write_fail_events_total.load(Ordering::Relaxed)
}
pub fn get_me_child_join_timeout_total(&self) -> u64 {
self.me_child_join_timeout_total.load(Ordering::Relaxed)
}
pub fn get_me_child_abort_total(&self) -> u64 {
self.me_child_abort_total.load(Ordering::Relaxed)
}
pub fn get_flow_wait_middle_rate_limit_total(&self) -> u64 {
self.flow_wait_middle_rate_limit_total
.load(Ordering::Relaxed)
}
pub fn get_flow_wait_middle_rate_limit_cancelled_total(&self) -> u64 {
self.flow_wait_middle_rate_limit_cancelled_total
.load(Ordering::Relaxed)
}
pub fn get_flow_wait_middle_rate_limit_ms_total(&self) -> u64 {
self.flow_wait_middle_rate_limit_ms_total
.load(Ordering::Relaxed)
}
pub fn get_session_drop_fallback_total(&self) -> u64 {
self.session_drop_fallback_total.load(Ordering::Relaxed)
}
pub fn increment_user_connects(&self, user: &str) {
if !self.telemetry_user_enabled() {
return;
}
let stats = self.get_or_create_user_stats_handle(user);
Self::touch_user_stats(stats.as_ref());
self.touch_user_stats(stats.as_ref());
stats.connects.fetch_add(1, Ordering::Relaxed);
}
@@ -2304,7 +2425,7 @@ impl Stats {
return;
}
let stats = self.get_or_create_user_stats_handle(user);
Self::touch_user_stats(stats.as_ref());
self.touch_user_stats(stats.as_ref());
stats.curr_connects.fetch_add(1, Ordering::Relaxed);
}
@@ -2314,7 +2435,7 @@ impl Stats {
}
let stats = self.get_or_create_user_stats_handle(user);
Self::touch_user_stats(stats.as_ref());
self.touch_user_stats(stats.as_ref());
let counter = &stats.curr_connects;
let mut current = counter.load(Ordering::Relaxed);
@@ -2337,9 +2458,8 @@ impl Stats {
}
pub fn decrement_user_curr_connects(&self, user: &str) {
self.maybe_cleanup_user_stats();
if let Some(stats) = self.user_stats.get(user) {
Self::touch_user_stats(stats.value().as_ref());
self.touch_user_stats(stats.value().as_ref());
let counter = &stats.curr_connects;
let mut current = counter.load(Ordering::Relaxed);
loop {
@@ -2415,12 +2535,7 @@ impl Stats {
.unwrap_or(0)
}
pub fn load_user_quota_state(
&self,
user: &str,
used_bytes: u64,
last_reset_epoch_secs: u64,
) {
pub fn load_user_quota_state(&self, user: &str, used_bytes: u64, last_reset_epoch_secs: u64) {
let stats = self.get_or_create_user_stats_handle(user);
stats.quota_used.store(used_bytes, Ordering::Relaxed);
stats
+4 -3
View File
@@ -150,9 +150,10 @@ impl TlsFrontCache {
is_default: cached.domain == "default",
has_cert_info: cached.cert_info.is_some(),
has_cert_payload: cached.cert_payload.is_some(),
app_data_records: cached.app_data_records_sizes.len().max(
behavior.app_data_record_sizes.len(),
),
app_data_records: cached
.app_data_records_sizes
.len()
.max(behavior.app_data_record_sizes.len()),
ticket_records: behavior.ticket_record_sizes.len(),
change_cipher_spec_count: behavior.change_cipher_spec_count,
total_app_data_len: cached.total_app_data_len,
+6
View File
@@ -18,6 +18,9 @@ const PROXY_V1_MIN_LEN: usize = 6;
/// Minimum length for v2 header
const PROXY_V2_MIN_LEN: usize = 16;
/// Maximum accepted PROXY v2 address and TLV payload.
const PROXY_V2_MAX_ADDR_LEN: usize = 216;
/// Address families for v2
mod address_family {
pub const UNSPEC: u8 = 0x0;
@@ -169,6 +172,9 @@ async fn parse_v2<R: AsyncRead + Unpin>(
let family_protocol = header[13];
let addr_len = u16::from_be_bytes([header[14], header[15]]) as usize;
if addr_len > PROXY_V2_MAX_ADDR_LEN {
return Err(ProxyError::InvalidProxyProtocol);
}
// Read address data
let mut addr_data = vec![0u8; addr_len];