This commit is contained in:
Alexey
2026-05-10 14:14:52 +03:00
parent 10c7cb2e0c
commit 57b2aa0453
15 changed files with 80 additions and 121 deletions

6
Cargo.lock generated
View File

@@ -2404,9 +2404,9 @@ checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f"
[[package]]
name = "rustls-webpki"
version = "0.103.12"
version = "0.103.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06"
checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e"
dependencies = [
"aws-lc-rs",
"ring",
@@ -2791,7 +2791,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "telemt"
version = "3.4.10"
version = "3.4.11"
dependencies = [
"aes",
"anyhow",

View File

@@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.4.10"
version = "3.4.11"
edition = "2024"
[features]

View File

@@ -621,10 +621,7 @@ fn collect_unknown_config_keys(parsed_toml: &toml::Value) -> Vec<UnknownConfigKe
}
}
if let Some(upstreams) = parsed_toml
.get("upstreams")
.and_then(toml::Value::as_array)
{
if let Some(upstreams) = parsed_toml.get("upstreams").and_then(toml::Value::as_array) {
for (idx, upstream) in upstreams.iter().enumerate() {
check_nested_table_value(
&mut unknown,

View File

@@ -492,14 +492,10 @@ pub(crate) fn spawn_tcp_accept_loops(
let handshake_close_reason =
expected_handshake_close_description(&e);
let me_closed = matches!(
&e,
crate::error::ProxyError::MiddleConnectionLost
);
let route_switched = matches!(
&e,
crate::error::ProxyError::RouteSwitched
);
let me_closed =
matches!(&e, crate::error::ProxyError::MiddleConnectionLost);
let route_switched =
matches!(&e, crate::error::ProxyError::RouteSwitched);
match (peer_close_reason, me_closed) {
(Some(reason), _) => {

View File

@@ -75,7 +75,9 @@ pub(crate) async fn spawn_runtime_tasks(
let stats_maintenance = stats.clone();
tokio::spawn(async move {
stats_maintenance.run_periodic_user_stats_maintenance().await;
stats_maintenance
.run_periodic_user_stats_maintenance()
.await;
});
let ip_tracker_maintenance = ip_tracker.clone();

View File

@@ -8,8 +8,8 @@
//!
//! SIGHUP is handled separately in config/hot_reload.rs for config reload.
use std::sync::Arc;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
#[cfg(not(unix))]
@@ -52,7 +52,14 @@ pub(crate) async fn wait_for_shutdown(
quota_state_path: PathBuf,
) {
let signal = wait_for_shutdown_signal().await;
perform_shutdown(signal, process_started_at, me_pool, &stats, quota_state_path).await;
perform_shutdown(
signal,
process_started_at,
me_pool,
&stats,
quota_state_path,
)
.await;
}
/// Waits for any shutdown signal (SIGINT, SIGTERM, SIGQUIT).

View File

@@ -374,10 +374,7 @@ async fn render_tls_front_profile_health(
out,
"# HELP telemt_tls_front_profile_age_seconds Age of cached TLS front profile data per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_age_seconds gauge"
);
let _ = writeln!(out, "# TYPE telemt_tls_front_profile_age_seconds gauge");
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_app_data_records TLS front cached app-data record count per configured domain"
@@ -390,10 +387,7 @@ async fn render_tls_front_profile_health(
out,
"# HELP telemt_tls_front_profile_ticket_records TLS front cached ticket-like tail record count per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_ticket_records gauge"
);
let _ = writeln!(out, "# TYPE telemt_tls_front_profile_ticket_records gauge");
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_change_cipher_spec_records TLS front cached ChangeCipherSpec record count per configured domain"
@@ -406,21 +400,14 @@ async fn render_tls_front_profile_health(
out,
"# HELP telemt_tls_front_profile_app_data_bytes TLS front cached total app-data bytes per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_app_data_bytes gauge"
);
let _ = writeln!(out, "# TYPE telemt_tls_front_profile_app_data_bytes gauge");
for item in health {
let domain = prometheus_label_value(&item.domain);
let _ = writeln!(
out,
"telemt_tls_front_profile_info{{domain=\"{}\",source=\"{}\",is_default=\"{}\",has_cert_info=\"{}\",has_cert_payload=\"{}\"}} 1",
domain,
item.source,
item.is_default,
item.has_cert_info,
item.has_cert_payload
domain, item.source, item.is_default, item.has_cert_info, item.has_cert_payload
);
let _ = writeln!(
out,
@@ -771,10 +758,7 @@ async fn render_metrics(
out,
"# HELP telemt_quota_contention_timeout_total Quota reservations that hit the bounded contention budget"
);
let _ = writeln!(
out,
"# TYPE telemt_quota_contention_timeout_total counter"
);
let _ = writeln!(out, "# TYPE telemt_quota_contention_timeout_total counter");
let _ = writeln!(
out,
"telemt_quota_contention_timeout_total {}",
@@ -917,10 +901,7 @@ async fn render_metrics(
out,
"# HELP telemt_rate_limiter_burst_bound_bytes Configured upper bound for one direct relay rate-limit burst"
);
let _ = writeln!(
out,
"# TYPE telemt_rate_limiter_burst_bound_bytes gauge"
);
let _ = writeln!(out, "# TYPE telemt_rate_limiter_burst_bound_bytes gauge");
let _ = writeln!(
out,
"telemt_rate_limiter_burst_bound_bytes{{direction=\"up\"}} {}",
@@ -3793,8 +3774,9 @@ mod tests {
)));
assert!(output.contains("telemt_connections_total 2"));
assert!(output.contains("telemt_connections_bad_total 1"));
assert!(output
.contains("telemt_connections_bad_by_class_total{class=\"tls_handshake_bad_client\"} 1"));
assert!(output.contains(
"telemt_connections_bad_by_class_total{class=\"tls_handshake_bad_client\"} 1"
));
assert!(output.contains("telemt_handshake_timeouts_total 1"));
assert!(output.contains("telemt_handshake_failures_by_class_total{class=\"timeout\"} 1"));
assert!(output.contains("telemt_auth_expensive_checks_total 9"));
@@ -3910,16 +3892,21 @@ mod tests {
output.contains("telemt_tls_front_profile_info{domain=\"fallback.example\",source=\"default\",is_default=\"true\",has_cert_info=\"false\",has_cert_payload=\"false\"} 1")
);
assert!(
output.contains("telemt_tls_front_profile_app_data_records{domain=\"primary.example\"} 2")
output.contains(
"telemt_tls_front_profile_app_data_records{domain=\"primary.example\"} 2"
)
);
assert!(
output.contains("telemt_tls_front_profile_ticket_records{domain=\"primary.example\"} 1")
output
.contains("telemt_tls_front_profile_ticket_records{domain=\"primary.example\"} 1")
);
assert!(output.contains(
"telemt_tls_front_profile_change_cipher_spec_records{domain=\"primary.example\"} 1"
));
assert!(
output.contains("telemt_tls_front_profile_change_cipher_spec_records{domain=\"primary.example\"} 1")
);
assert!(
output.contains("telemt_tls_front_profile_app_data_bytes{domain=\"primary.example\"} 1536")
output.contains(
"telemt_tls_front_profile_app_data_bytes{domain=\"primary.example\"} 1536"
)
);
}

View File

@@ -1002,12 +1002,7 @@ impl RunningClientHandler {
trusted = ?self.config.server.proxy_protocol_trusted_cidrs,
"Rejecting PROXY protocol header from untrusted source"
);
record_beobachten_class(
&self.beobachten,
&self.config,
self.peer.ip(),
"other",
);
record_beobachten_class(&self.beobachten, &self.config, self.peer.ip(), "other");
return Err(ProxyError::InvalidProxyProtocol);
}

View File

@@ -1901,7 +1901,10 @@ where
.auth_expensive_checks_total
.fetch_add(validation_checks as u64, Ordering::Relaxed);
if config.access.is_user_source_ip_denied(user.as_str(), peer.ip()) {
if config
.access
.is_user_source_ip_denied(user.as_str(), peer.ip())
{
auth_probe_record_failure_in(shared, peer.ip(), Instant::now());
maybe_apply_server_hello_delay(config).await;
warn!(

View File

@@ -1883,9 +1883,7 @@ where
};
// When client closes, but ME channel stopped as unregistered - it isnt error
if client_closed
&& matches!(writer_result, Err(ProxyError::MiddleConnectionLost))
{
if client_closed && matches!(writer_result, Err(ProxyError::MiddleConnectionLost)) {
writer_result = Ok(());
}
@@ -2520,27 +2518,26 @@ where
)
.await?;
let write_mode =
match write_client_payload(
client_writer,
proto_tag,
flags,
&data,
rng,
frame_buf,
cancel,
)
.await
{
Ok(mode) => mode,
Err(err) => {
if quota_limit.is_some() {
stats.add_quota_write_fail_bytes_total(data_len);
stats.increment_quota_write_fail_events_total();
}
return Err(err);
let write_mode = match write_client_payload(
client_writer,
proto_tag,
flags,
&data,
rng,
frame_buf,
cancel,
)
.await
{
Ok(mode) => mode,
Err(err) => {
if quota_limit.is_some() {
stats.add_quota_write_fail_bytes_total(data_len);
stats.increment_quota_write_fail_events_total();
}
};
return Err(err);
}
};
bytes_me2c.fetch_add(data_len, Ordering::Relaxed);
if let Some(user_stats) = quota_user_stats {

View File

@@ -518,10 +518,7 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
Poll::Ready(Ok(n)) => {
if reserved_read_bytes > n as u64 {
let refund_bytes = reserved_read_bytes - n as u64;
refund_reserved_quota_bytes(
this.user_stats.as_ref(),
refund_bytes,
);
refund_reserved_quota_bytes(this.user_stats.as_ref(), refund_bytes);
this.stats.add_quota_refund_bytes_total(refund_bytes);
}
if n > 0 {
@@ -538,7 +535,6 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
this.quota_bytes_since_check = 0;
}
}
}
if let Some(limit) = this.quota_limit
&& this.user_stats.quota_used() >= limit
@@ -700,10 +696,7 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
Poll::Ready(Ok(n)) => {
if reserved_bytes > n as u64 {
let refund_bytes = reserved_bytes - n as u64;
refund_reserved_quota_bytes(
this.user_stats.as_ref(),
refund_bytes,
);
refund_reserved_quota_bytes(this.user_stats.as_ref(), refund_bytes);
this.stats.add_quota_refund_bytes_total(refund_bytes);
}
if shaper_reserved_bytes > n as u64

View File

@@ -1519,10 +1519,7 @@ async fn direct_relay_cutover_midflight_releases_route_gauge() {
"cutover should terminate direct relay session"
);
assert!(
matches!(
relay_result,
Err(ProxyError::RouteSwitched)
),
matches!(relay_result, Err(ProxyError::RouteSwitched)),
"client-visible cutover error must stay generic and avoid route-internal metadata"
);
@@ -1659,10 +1656,7 @@ async fn direct_relay_cutover_storm_multi_session_keeps_generic_errors_and_relea
.expect("direct relay task must not panic");
assert!(
matches!(
relay_result,
Err(ProxyError::RouteSwitched)
),
matches!(relay_result, Err(ProxyError::RouteSwitched)),
"storm-cutover termination must remain generic for all direct sessions"
);
}
@@ -1965,10 +1959,7 @@ async fn adversarial_direct_relay_cutover_integrity() {
.expect("Session must not panic");
assert!(
matches!(
result,
Err(ProxyError::RouteSwitched)
),
matches!(result, Err(ProxyError::RouteSwitched)),
"Session must terminate with route switch error on cutover"
);
}

View File

@@ -187,12 +187,8 @@ fn poll_read_once<R: AsyncRead + Unpin>(
#[test]
fn direct_c2s_quota_refunds_unused_on_short_read() {
let user = "direct-c2s-short-read-refund-user";
let (mut io, stats, read_calls, quota_exceeded) = make_stats_io_with_read_script(
user,
64,
0,
vec![ReadStep::Data(vec![0x11; 5])],
);
let (mut io, stats, read_calls, quota_exceeded) =
make_stats_io_with_read_script(user, 64, 0, vec![ReadStep::Data(vec![0x11; 5])]);
let mut storage = [0u8; 16];
let n = match poll_read_once(&mut io, &mut storage) {

View File

@@ -8,8 +8,8 @@ pub mod telemetry;
use dashmap::DashMap;
use lru::LruCache;
use parking_lot::Mutex;
use std::collections::{HashMap, VecDeque};
use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, VecDeque};
use std::hash::{Hash, Hasher};
use std::num::NonZeroUsize;
use std::sync::Arc;
@@ -2378,8 +2378,7 @@ impl Stats {
self.quota_contention_total.load(Ordering::Relaxed)
}
pub fn get_quota_contention_timeout_total(&self) -> u64 {
self.quota_contention_timeout_total
.load(Ordering::Relaxed)
self.quota_contention_timeout_total.load(Ordering::Relaxed)
}
pub fn get_quota_acquire_cancelled_total(&self) -> u64 {
self.quota_acquire_cancelled_total.load(Ordering::Relaxed)
@@ -2536,12 +2535,7 @@ impl Stats {
.unwrap_or(0)
}
pub fn load_user_quota_state(
&self,
user: &str,
used_bytes: u64,
last_reset_epoch_secs: u64,
) {
pub fn load_user_quota_state(&self, user: &str, used_bytes: u64, last_reset_epoch_secs: u64) {
let stats = self.get_or_create_user_stats_handle(user);
stats.quota_used.store(used_bytes, Ordering::Relaxed);
stats

View File

@@ -150,9 +150,10 @@ impl TlsFrontCache {
is_default: cached.domain == "default",
has_cert_info: cached.cert_info.is_some(),
has_cert_payload: cached.cert_payload.is_some(),
app_data_records: cached.app_data_records_sizes.len().max(
behavior.app_data_record_sizes.len(),
),
app_data_records: cached
.app_data_records_sizes
.len()
.max(behavior.app_data_record_sizes.len()),
ticket_records: behavior.ticket_record_sizes.len(),
change_cipher_spec_count: behavior.change_cipher_spec_count,
total_app_data_len: cached.total_app_data_len,