Compare commits

...

18 Commits

Author SHA1 Message Date
Alexey bc691539a1 Bump 2026-04-07 19:28:05 +03:00
Alexey 2162a63e3e Memory Hard-bounds + Handshake Budget in Metrics + No mutable in hotpath ConnRegistry + Build-info in Metrics + TLS Fronting fixes + Round-bounded Retries + Bounded Retry-Round Constant + QueueFall Bounded Retry on Data-route: merge pull request #655 from telemt/flow
Memory Hard-bounds + Handshake Budget in Metrics + No mutable in hotpath ConnRegistry + Build-info in Metrics + TLS Fronting fixes + Round-bounded Retries + Bounded Retry-Round Constant + QueueFall Bounded Retry on Data-route
2026-04-07 19:26:07 +03:00
Alexey 4a77335ba9 Round-bounded Retries + Bounded Retry-Round Constant
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-04-07 19:19:40 +03:00
Alexey ba29b66c4c Merge branch 'flow' of https://github.com/telemt/telemt into flow 2026-04-07 18:42:10 +03:00
Alexey e8cf97095f QueueFall Bounded Retry on Data-route
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-04-07 18:41:59 +03:00
Alexey ee4264af50 Merge pull request #624 from mammuthus/feature/metrics-build-info
metrics: export CARGO_PKG_VERSION as telemt_build_info version metric
2026-04-07 18:35:06 +03:00
Alexey 59c2476650 Merge branch 'flow' into feature/metrics-build-info 2026-04-07 18:34:51 +03:00
Alexey 89d6be267d Merge pull request #652 from groozchique/flow
[docs] Hotfix for link's obtaining command
2026-04-07 18:23:34 +03:00
Alexey 3b717c75da Memory Hard-bounds + Handshake Budget in Metrics + No mutable in hotpath ConnRegistry
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-04-07 18:18:47 +03:00
Nick Parfyonov 3af7673342 [docs] add classic/secure links to the output
After further testing I discovered that the current command only returns TLS links, ignoring classic/secure links if they are present
2026-04-07 13:53:12 +03:00
Alexey 5cbcfb2a91 Merge pull request #643 from Dimasssss/patch-2
Update install.sh - fix "Permission denied (os error 13)"
2026-04-07 13:20:46 +03:00
Alexey aec2c23a0c Merge pull request #650 from pavlozt/fix/zabbix-storage
Zabbix template: disable intermediate data storage
2026-04-07 13:19:35 +03:00
PavelZ 12f99eebab Zabbix template: disable intermediate data storage 2026-04-07 11:55:51 +03:00
Dimasssss f829439e8f Update install.sh - fix "Permission denied (os error 13)" 2026-04-06 14:33:02 +03:00
Alexey 6996d6e597 Update LICENSE 2026-04-06 13:21:16 +03:00
Alexey b3f11624c9 Update LICENSE 2026-04-06 13:12:06 +03:00
mammuthus 9b64d2ee17 style(metrics): apply rustfmt for build_info additions 2026-04-03 07:49:37 +00:00
mammuthus 873618ce53 metrics: export telemt_build_info version metric 2026-04-02 18:14:50 +00:00
17 changed files with 595 additions and 96 deletions
Generated
+1 -1
View File
@@ -2780,7 +2780,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]] [[package]]
name = "telemt" name = "telemt"
version = "3.3.38" version = "3.3.39"
dependencies = [ dependencies = [
"aes", "aes",
"anyhow", "anyhow",
+1 -1
View File
@@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.3.38" version = "3.3.39"
edition = "2024" edition = "2024"
[features] [features]
+10 -6
View File
@@ -1,4 +1,4 @@
###### TELEMT Public License 3 ###### ######## TELEMT LICENSE 3.3 #########
##### Copyright (c) 2026 Telemt ##### ##### Copyright (c) 2026 Telemt #####
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
@@ -14,11 +14,15 @@ are preserved and complied with.
The canonical version of this License is the English version. The canonical version of this License is the English version.
Official translations are provided for informational purposes only Official translations are provided for informational purposes only
and for convenience, and do not have legal force. In case of any and for convenience, and do not have legal force. In case of any
discrepancy, the English version of this License shall prevail. discrepancy, the English version of this License shall prevail
Available versions:
- English in Markdown: docs/LICENSE/LICENSE.md /----------------------------------------------------------\
- German: docs/LICENSE/LICENSE.de.md | Language | Location |
- Russian: docs/LICENSE/LICENSE.ru.md |-------------|--------------------------------------------|
| English | docs/LICENSE/TELEMT-LICENSE.en.md |
| German | docs/LICENSE/TELEMT-LICENSE.de.md |
| Russian | docs/LICENSE/TELEMT-LICENSE.ru.md |
\----------------------------------------------------------/
### License Versioning Policy ### License Versioning Policy
+1 -1
View File
@@ -150,7 +150,7 @@ systemctl daemon-reload
**7.** To get the link(s), enter: **7.** To get the link(s), enter:
```bash ```bash
curl -s http://127.0.0.1:9091/v1/users | jq -r '.data[] | "User: \(.username)\n\(.links.tls[0] // empty)"' curl -s http://127.0.0.1:9091/v1/users | jq -r '.data[] | "[\(.username)]", (.links.classic[]? | "classic: \(.)"), (.links.secure[]? | "secure: \(.)"), (.links.tls[]? | "tls: \(.)"), ""'
``` ```
> Any number of people can use one link. > Any number of people can use one link.
+1 -1
View File
@@ -150,7 +150,7 @@ systemctl daemon-reload
**7.** Для получения ссылки/ссылок введите **7.** Для получения ссылки/ссылок введите
```bash ```bash
curl -s http://127.0.0.1:9091/v1/users | jq -r '.data[] | "User: \(.username)\n\(.links.tls[0] // empty)"' curl -s http://127.0.0.1:9091/v1/users | jq -r '.data[] | "[\(.username)]", (.links.classic[]? | "classic: \(.)"), (.links.secure[]? | "secure: \(.)"), (.links.tls[]? | "tls: \(.)"), ""'
``` ```
> Одной ссылкой может пользоваться сколько угодно человек. > Одной ссылкой может пользоваться сколько угодно человек.
+1 -1
View File
@@ -315,7 +315,7 @@ setup_dirs() {
$SUDO mkdir -p "$WORK_DIR" "$CONFIG_DIR" "$CONFIG_PARENT_DIR" || die "Failed to create directories" $SUDO mkdir -p "$WORK_DIR" "$CONFIG_DIR" "$CONFIG_PARENT_DIR" || die "Failed to create directories"
$SUDO chown telemt:telemt "$WORK_DIR" && $SUDO chmod 750 "$WORK_DIR" $SUDO chown telemt:telemt "$WORK_DIR" && $SUDO chmod 750 "$WORK_DIR"
$SUDO chown root:telemt "$CONFIG_DIR" && $SUDO chmod 750 "$CONFIG_DIR" $SUDO chown telemt:telemt "$CONFIG_DIR" && $SUDO chmod 750 "$CONFIG_DIR"
if [ "$CONFIG_PARENT_DIR" != "$CONFIG_DIR" ] && [ "$CONFIG_PARENT_DIR" != "." ] && [ "$CONFIG_PARENT_DIR" != "/" ]; then if [ "$CONFIG_PARENT_DIR" != "$CONFIG_DIR" ] && [ "$CONFIG_PARENT_DIR" != "." ] && [ "$CONFIG_PARENT_DIR" != "/" ]; then
$SUDO chown root:telemt "$CONFIG_PARENT_DIR" && $SUDO chmod 750 "$CONFIG_PARENT_DIR" $SUDO chown root:telemt "$CONFIG_PARENT_DIR" && $SUDO chmod 750 "$CONFIG_PARENT_DIR"
+32
View File
@@ -17,6 +17,11 @@ use super::defaults::*;
use super::types::*; use super::types::*;
const ACCESS_SECRET_BYTES: usize = 16; const ACCESS_SECRET_BYTES: usize = 16;
const MAX_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 16_384;
const MAX_ME_ROUTE_CHANNEL_CAPACITY: usize = 8_192;
const MAX_ME_C2ME_CHANNEL_CAPACITY: usize = 8_192;
const MIN_MAX_CLIENT_FRAME_BYTES: usize = 4 * 1024;
const MAX_MAX_CLIENT_FRAME_BYTES: usize = 16 * 1024 * 1024;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct LoadedConfig { pub(crate) struct LoadedConfig {
@@ -626,18 +631,41 @@ impl ProxyConfig {
"general.me_writer_cmd_channel_capacity must be > 0".to_string(), "general.me_writer_cmd_channel_capacity must be > 0".to_string(),
)); ));
} }
if config.general.me_writer_cmd_channel_capacity > MAX_ME_WRITER_CMD_CHANNEL_CAPACITY {
return Err(ProxyError::Config(format!(
"general.me_writer_cmd_channel_capacity must be within [1, {MAX_ME_WRITER_CMD_CHANNEL_CAPACITY}]"
)));
}
if config.general.me_route_channel_capacity == 0 { if config.general.me_route_channel_capacity == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.me_route_channel_capacity must be > 0".to_string(), "general.me_route_channel_capacity must be > 0".to_string(),
)); ));
} }
if config.general.me_route_channel_capacity > MAX_ME_ROUTE_CHANNEL_CAPACITY {
return Err(ProxyError::Config(format!(
"general.me_route_channel_capacity must be within [1, {MAX_ME_ROUTE_CHANNEL_CAPACITY}]"
)));
}
if config.general.me_c2me_channel_capacity == 0 { if config.general.me_c2me_channel_capacity == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.me_c2me_channel_capacity must be > 0".to_string(), "general.me_c2me_channel_capacity must be > 0".to_string(),
)); ));
} }
if config.general.me_c2me_channel_capacity > MAX_ME_C2ME_CHANNEL_CAPACITY {
return Err(ProxyError::Config(format!(
"general.me_c2me_channel_capacity must be within [1, {MAX_ME_C2ME_CHANNEL_CAPACITY}]"
)));
}
if !(MIN_MAX_CLIENT_FRAME_BYTES..=MAX_MAX_CLIENT_FRAME_BYTES)
.contains(&config.general.max_client_frame)
{
return Err(ProxyError::Config(format!(
"general.max_client_frame must be within [{MIN_MAX_CLIENT_FRAME_BYTES}, {MAX_MAX_CLIENT_FRAME_BYTES}]"
)));
}
if config.general.me_c2me_send_timeout_ms > 60_000 { if config.general.me_c2me_send_timeout_ms > 60_000 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
@@ -1346,6 +1374,10 @@ mod load_mask_shape_security_tests;
#[path = "tests/load_mask_classifier_prefetch_timeout_security_tests.rs"] #[path = "tests/load_mask_classifier_prefetch_timeout_security_tests.rs"]
mod load_mask_classifier_prefetch_timeout_security_tests; mod load_mask_classifier_prefetch_timeout_security_tests;
#[cfg(test)]
#[path = "tests/load_memory_envelope_tests.rs"]
mod load_memory_envelope_tests;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@@ -0,0 +1,117 @@
use super::*;
use std::fs;
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
fn write_temp_config(contents: &str) -> PathBuf {
let nonce = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time must be after unix epoch")
.as_nanos();
let path = std::env::temp_dir().join(format!("telemt-load-memory-envelope-{nonce}.toml"));
fs::write(&path, contents).expect("temp config write must succeed");
path
}
fn remove_temp_config(path: &PathBuf) {
let _ = fs::remove_file(path);
}
#[test]
fn load_rejects_writer_cmd_capacity_above_upper_bound() {
let path = write_temp_config(
r#"
[general]
me_writer_cmd_channel_capacity = 16385
"#,
);
let err =
ProxyConfig::load(&path).expect_err("writer command capacity above hard cap must fail");
let msg = err.to_string();
assert!(
msg.contains("general.me_writer_cmd_channel_capacity must be within [1, 16384]"),
"error must explain writer command capacity hard cap, got: {msg}"
);
remove_temp_config(&path);
}
#[test]
fn load_rejects_route_channel_capacity_above_upper_bound() {
let path = write_temp_config(
r#"
[general]
me_route_channel_capacity = 8193
"#,
);
let err =
ProxyConfig::load(&path).expect_err("route channel capacity above hard cap must fail");
let msg = err.to_string();
assert!(
msg.contains("general.me_route_channel_capacity must be within [1, 8192]"),
"error must explain route channel hard cap, got: {msg}"
);
remove_temp_config(&path);
}
#[test]
fn load_rejects_c2me_channel_capacity_above_upper_bound() {
let path = write_temp_config(
r#"
[general]
me_c2me_channel_capacity = 8193
"#,
);
let err = ProxyConfig::load(&path).expect_err("c2me channel capacity above hard cap must fail");
let msg = err.to_string();
assert!(
msg.contains("general.me_c2me_channel_capacity must be within [1, 8192]"),
"error must explain c2me channel hard cap, got: {msg}"
);
remove_temp_config(&path);
}
#[test]
fn load_rejects_max_client_frame_above_upper_bound() {
let path = write_temp_config(
r#"
[general]
max_client_frame = 16777217
"#,
);
let err = ProxyConfig::load(&path).expect_err("max_client_frame above hard cap must fail");
let msg = err.to_string();
assert!(
msg.contains("general.max_client_frame must be within [4096, 16777216]"),
"error must explain max_client_frame hard cap, got: {msg}"
);
remove_temp_config(&path);
}
#[test]
fn load_accepts_memory_limits_at_hard_upper_bounds() {
let path = write_temp_config(
r#"
[general]
me_writer_cmd_channel_capacity = 16384
me_route_channel_capacity = 8192
me_c2me_channel_capacity = 8192
max_client_frame = 16777216
"#,
);
let cfg = ProxyConfig::load(&path).expect("hard upper bound values must be accepted");
assert_eq!(cfg.general.me_writer_cmd_channel_capacity, 16384);
assert_eq!(cfg.general.me_route_channel_capacity, 8192);
assert_eq!(cfg.general.me_c2me_channel_capacity, 8192);
assert_eq!(cfg.general.max_client_frame, 16 * 1024 * 1024);
remove_temp_config(&path);
}
+1
View File
@@ -786,6 +786,7 @@ async fn run_inner(
&startup_tracker, &startup_tracker,
stats.clone(), stats.clone(),
beobachten.clone(), beobachten.clone(),
shared_state.clone(),
ip_tracker.clone(), ip_tracker.clone(),
config_rx.clone(), config_rx.clone(),
) )
+4
View File
@@ -13,6 +13,7 @@ use crate::crypto::SecureRandom;
use crate::ip_tracker::UserIpTracker; use crate::ip_tracker::UserIpTracker;
use crate::metrics; use crate::metrics;
use crate::network::probe::NetworkProbe; use crate::network::probe::NetworkProbe;
use crate::proxy::shared_state::ProxySharedState;
use crate::startup::{ use crate::startup::{
COMPONENT_CONFIG_WATCHER_START, COMPONENT_METRICS_START, COMPONENT_RUNTIME_READY, COMPONENT_CONFIG_WATCHER_START, COMPONENT_METRICS_START, COMPONENT_RUNTIME_READY,
StartupTracker, StartupTracker,
@@ -287,6 +288,7 @@ pub(crate) async fn spawn_metrics_if_configured(
startup_tracker: &Arc<StartupTracker>, startup_tracker: &Arc<StartupTracker>,
stats: Arc<Stats>, stats: Arc<Stats>,
beobachten: Arc<BeobachtenStore>, beobachten: Arc<BeobachtenStore>,
shared_state: Arc<ProxySharedState>,
ip_tracker: Arc<UserIpTracker>, ip_tracker: Arc<UserIpTracker>,
config_rx: watch::Receiver<Arc<ProxyConfig>>, config_rx: watch::Receiver<Arc<ProxyConfig>>,
) { ) {
@@ -320,6 +322,7 @@ pub(crate) async fn spawn_metrics_if_configured(
.await; .await;
let stats = stats.clone(); let stats = stats.clone();
let beobachten = beobachten.clone(); let beobachten = beobachten.clone();
let shared_state = shared_state.clone();
let config_rx_metrics = config_rx.clone(); let config_rx_metrics = config_rx.clone();
let ip_tracker_metrics = ip_tracker.clone(); let ip_tracker_metrics = ip_tracker.clone();
let whitelist = config.server.metrics_whitelist.clone(); let whitelist = config.server.metrics_whitelist.clone();
@@ -331,6 +334,7 @@ pub(crate) async fn spawn_metrics_if_configured(
listen_backlog, listen_backlog,
stats, stats,
beobachten, beobachten,
shared_state,
ip_tracker_metrics, ip_tracker_metrics,
config_rx_metrics, config_rx_metrics,
whitelist, whitelist,
+159 -19
View File
@@ -15,6 +15,7 @@ use tracing::{debug, info, warn};
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
use crate::ip_tracker::UserIpTracker; use crate::ip_tracker::UserIpTracker;
use crate::proxy::shared_state::ProxySharedState;
use crate::stats::Stats; use crate::stats::Stats;
use crate::stats::beobachten::BeobachtenStore; use crate::stats::beobachten::BeobachtenStore;
use crate::transport::{ListenOptions, create_listener}; use crate::transport::{ListenOptions, create_listener};
@@ -25,6 +26,7 @@ pub async fn serve(
listen_backlog: u32, listen_backlog: u32,
stats: Arc<Stats>, stats: Arc<Stats>,
beobachten: Arc<BeobachtenStore>, beobachten: Arc<BeobachtenStore>,
shared_state: Arc<ProxySharedState>,
ip_tracker: Arc<UserIpTracker>, ip_tracker: Arc<UserIpTracker>,
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>, config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
whitelist: Vec<IpNetwork>, whitelist: Vec<IpNetwork>,
@@ -45,7 +47,13 @@ pub async fn serve(
Ok(listener) => { Ok(listener) => {
info!("Metrics endpoint: http://{}/metrics and /beobachten", addr); info!("Metrics endpoint: http://{}/metrics and /beobachten", addr);
serve_listener( serve_listener(
listener, stats, beobachten, ip_tracker, config_rx, whitelist, listener,
stats,
beobachten,
shared_state,
ip_tracker,
config_rx,
whitelist,
) )
.await; .await;
} }
@@ -94,13 +102,20 @@ pub async fn serve(
} }
(Some(listener), None) | (None, Some(listener)) => { (Some(listener), None) | (None, Some(listener)) => {
serve_listener( serve_listener(
listener, stats, beobachten, ip_tracker, config_rx, whitelist, listener,
stats,
beobachten,
shared_state,
ip_tracker,
config_rx,
whitelist,
) )
.await; .await;
} }
(Some(listener4), Some(listener6)) => { (Some(listener4), Some(listener6)) => {
let stats_v6 = stats.clone(); let stats_v6 = stats.clone();
let beobachten_v6 = beobachten.clone(); let beobachten_v6 = beobachten.clone();
let shared_state_v6 = shared_state.clone();
let ip_tracker_v6 = ip_tracker.clone(); let ip_tracker_v6 = ip_tracker.clone();
let config_rx_v6 = config_rx.clone(); let config_rx_v6 = config_rx.clone();
let whitelist_v6 = whitelist.clone(); let whitelist_v6 = whitelist.clone();
@@ -109,6 +124,7 @@ pub async fn serve(
listener6, listener6,
stats_v6, stats_v6,
beobachten_v6, beobachten_v6,
shared_state_v6,
ip_tracker_v6, ip_tracker_v6,
config_rx_v6, config_rx_v6,
whitelist_v6, whitelist_v6,
@@ -116,7 +132,13 @@ pub async fn serve(
.await; .await;
}); });
serve_listener( serve_listener(
listener4, stats, beobachten, ip_tracker, config_rx, whitelist, listener4,
stats,
beobachten,
shared_state,
ip_tracker,
config_rx,
whitelist,
) )
.await; .await;
} }
@@ -142,6 +164,7 @@ async fn serve_listener(
listener: TcpListener, listener: TcpListener,
stats: Arc<Stats>, stats: Arc<Stats>,
beobachten: Arc<BeobachtenStore>, beobachten: Arc<BeobachtenStore>,
shared_state: Arc<ProxySharedState>,
ip_tracker: Arc<UserIpTracker>, ip_tracker: Arc<UserIpTracker>,
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>, config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
whitelist: Arc<Vec<IpNetwork>>, whitelist: Arc<Vec<IpNetwork>>,
@@ -162,15 +185,27 @@ async fn serve_listener(
let stats = stats.clone(); let stats = stats.clone();
let beobachten = beobachten.clone(); let beobachten = beobachten.clone();
let shared_state = shared_state.clone();
let ip_tracker = ip_tracker.clone(); let ip_tracker = ip_tracker.clone();
let config_rx_conn = config_rx.clone(); let config_rx_conn = config_rx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let svc = service_fn(move |req| { let svc = service_fn(move |req| {
let stats = stats.clone(); let stats = stats.clone();
let beobachten = beobachten.clone(); let beobachten = beobachten.clone();
let shared_state = shared_state.clone();
let ip_tracker = ip_tracker.clone(); let ip_tracker = ip_tracker.clone();
let config = config_rx_conn.borrow().clone(); let config = config_rx_conn.borrow().clone();
async move { handle(req, &stats, &beobachten, &ip_tracker, &config).await } async move {
handle(
req,
&stats,
&beobachten,
&shared_state,
&ip_tracker,
&config,
)
.await
}
}); });
if let Err(e) = http1::Builder::new() if let Err(e) = http1::Builder::new()
.serve_connection(hyper_util::rt::TokioIo::new(stream), svc) .serve_connection(hyper_util::rt::TokioIo::new(stream), svc)
@@ -186,11 +221,12 @@ async fn handle<B>(
req: Request<B>, req: Request<B>,
stats: &Stats, stats: &Stats,
beobachten: &BeobachtenStore, beobachten: &BeobachtenStore,
shared_state: &ProxySharedState,
ip_tracker: &UserIpTracker, ip_tracker: &UserIpTracker,
config: &ProxyConfig, config: &ProxyConfig,
) -> Result<Response<Full<Bytes>>, Infallible> { ) -> Result<Response<Full<Bytes>>, Infallible> {
if req.uri().path() == "/metrics" { if req.uri().path() == "/metrics" {
let body = render_metrics(stats, config, ip_tracker).await; let body = render_metrics(stats, shared_state, config, ip_tracker).await;
let resp = Response::builder() let resp = Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.header("content-type", "text/plain; version=0.0.4; charset=utf-8") .header("content-type", "text/plain; version=0.0.4; charset=utf-8")
@@ -225,7 +261,12 @@ fn render_beobachten(beobachten: &BeobachtenStore, config: &ProxyConfig) -> Stri
beobachten.snapshot_text(ttl) beobachten.snapshot_text(ttl)
} }
async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIpTracker) -> String { async fn render_metrics(
stats: &Stats,
shared_state: &ProxySharedState,
config: &ProxyConfig,
ip_tracker: &UserIpTracker,
) -> String {
use std::fmt::Write; use std::fmt::Write;
let mut out = String::with_capacity(4096); let mut out = String::with_capacity(4096);
let telemetry = stats.telemetry_policy(); let telemetry = stats.telemetry_policy();
@@ -234,6 +275,17 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
let me_allows_normal = telemetry.me_level.allows_normal(); let me_allows_normal = telemetry.me_level.allows_normal();
let me_allows_debug = telemetry.me_level.allows_debug(); let me_allows_debug = telemetry.me_level.allows_debug();
let _ = writeln!(
out,
"# HELP telemt_build_info Build information for the running telemt binary"
);
let _ = writeln!(out, "# TYPE telemt_build_info gauge");
let _ = writeln!(
out,
"telemt_build_info{{version=\"{}\"}} 1",
env!("CARGO_PKG_VERSION")
);
let _ = writeln!(out, "# HELP telemt_uptime_seconds Proxy uptime"); let _ = writeln!(out, "# HELP telemt_uptime_seconds Proxy uptime");
let _ = writeln!(out, "# TYPE telemt_uptime_seconds gauge"); let _ = writeln!(out, "# TYPE telemt_uptime_seconds gauge");
let _ = writeln!(out, "telemt_uptime_seconds {:.1}", stats.uptime_secs()); let _ = writeln!(out, "telemt_uptime_seconds {:.1}", stats.uptime_secs());
@@ -359,6 +411,42 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
} }
); );
let _ = writeln!(
out,
"# HELP telemt_auth_expensive_checks_total Expensive authentication candidate checks executed during handshake validation"
);
let _ = writeln!(out, "# TYPE telemt_auth_expensive_checks_total counter");
let _ = writeln!(
out,
"telemt_auth_expensive_checks_total {}",
if core_enabled {
shared_state
.handshake
.auth_expensive_checks_total
.load(std::sync::atomic::Ordering::Relaxed)
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_auth_budget_exhausted_total Handshake validations that hit authentication candidate budget limits"
);
let _ = writeln!(out, "# TYPE telemt_auth_budget_exhausted_total counter");
let _ = writeln!(
out,
"telemt_auth_budget_exhausted_total {}",
if core_enabled {
shared_state
.handshake
.auth_budget_exhausted_total
.load(std::sync::atomic::Ordering::Relaxed)
} else {
0
}
);
let _ = writeln!( let _ = writeln!(
out, out,
"# HELP telemt_accept_permit_timeout_total Accepted connections dropped due to permit wait timeout" "# HELP telemt_accept_permit_timeout_total Accepted connections dropped due to permit wait timeout"
@@ -2847,6 +2935,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_render_metrics_format() { async fn test_render_metrics_format() {
let stats = Arc::new(Stats::new()); let stats = Arc::new(Stats::new());
let shared_state = ProxySharedState::new();
let tracker = UserIpTracker::new(); let tracker = UserIpTracker::new();
let mut config = ProxyConfig::default(); let mut config = ProxyConfig::default();
config config
@@ -2858,6 +2947,14 @@ mod tests {
stats.increment_connects_all(); stats.increment_connects_all();
stats.increment_connects_bad(); stats.increment_connects_bad();
stats.increment_handshake_timeouts(); stats.increment_handshake_timeouts();
shared_state
.handshake
.auth_expensive_checks_total
.fetch_add(9, std::sync::atomic::Ordering::Relaxed);
shared_state
.handshake
.auth_budget_exhausted_total
.fetch_add(2, std::sync::atomic::Ordering::Relaxed);
stats.increment_upstream_connect_attempt_total(); stats.increment_upstream_connect_attempt_total();
stats.increment_upstream_connect_attempt_total(); stats.increment_upstream_connect_attempt_total();
stats.increment_upstream_connect_success_total(); stats.increment_upstream_connect_success_total();
@@ -2901,11 +2998,17 @@ mod tests {
.await .await
.unwrap(); .unwrap();
let output = render_metrics(&stats, &config, &tracker).await; let output = render_metrics(&stats, shared_state.as_ref(), &config, &tracker).await;
assert!(output.contains(&format!(
"telemt_build_info{{version=\"{}\"}} 1",
env!("CARGO_PKG_VERSION")
)));
assert!(output.contains("telemt_connections_total 2")); assert!(output.contains("telemt_connections_total 2"));
assert!(output.contains("telemt_connections_bad_total 1")); assert!(output.contains("telemt_connections_bad_total 1"));
assert!(output.contains("telemt_handshake_timeouts_total 1")); assert!(output.contains("telemt_handshake_timeouts_total 1"));
assert!(output.contains("telemt_auth_expensive_checks_total 9"));
assert!(output.contains("telemt_auth_budget_exhausted_total 2"));
assert!(output.contains("telemt_upstream_connect_attempt_total 2")); assert!(output.contains("telemt_upstream_connect_attempt_total 2"));
assert!(output.contains("telemt_upstream_connect_success_total 1")); assert!(output.contains("telemt_upstream_connect_success_total 1"));
assert!(output.contains("telemt_upstream_connect_fail_total 1")); assert!(output.contains("telemt_upstream_connect_fail_total 1"));
@@ -2960,12 +3063,15 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_render_empty_stats() { async fn test_render_empty_stats() {
let stats = Stats::new(); let stats = Stats::new();
let shared_state = ProxySharedState::new();
let tracker = UserIpTracker::new(); let tracker = UserIpTracker::new();
let config = ProxyConfig::default(); let config = ProxyConfig::default();
let output = render_metrics(&stats, &config, &tracker).await; let output = render_metrics(&stats, &shared_state, &config, &tracker).await;
assert!(output.contains("telemt_connections_total 0")); assert!(output.contains("telemt_connections_total 0"));
assert!(output.contains("telemt_connections_bad_total 0")); assert!(output.contains("telemt_connections_bad_total 0"));
assert!(output.contains("telemt_handshake_timeouts_total 0")); assert!(output.contains("telemt_handshake_timeouts_total 0"));
assert!(output.contains("telemt_auth_expensive_checks_total 0"));
assert!(output.contains("telemt_auth_budget_exhausted_total 0"));
assert!(output.contains("telemt_user_unique_ips_current{user=")); assert!(output.contains("telemt_user_unique_ips_current{user="));
assert!(output.contains("telemt_user_unique_ips_recent_window{user=")); assert!(output.contains("telemt_user_unique_ips_recent_window{user="));
} }
@@ -2973,6 +3079,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_render_uses_global_each_unique_ip_limit() { async fn test_render_uses_global_each_unique_ip_limit() {
let stats = Stats::new(); let stats = Stats::new();
let shared_state = ProxySharedState::new();
stats.increment_user_connects("alice"); stats.increment_user_connects("alice");
stats.increment_user_curr_connects("alice"); stats.increment_user_curr_connects("alice");
let tracker = UserIpTracker::new(); let tracker = UserIpTracker::new();
@@ -2983,7 +3090,7 @@ mod tests {
let mut config = ProxyConfig::default(); let mut config = ProxyConfig::default();
config.access.user_max_unique_ips_global_each = 2; config.access.user_max_unique_ips_global_each = 2;
let output = render_metrics(&stats, &config, &tracker).await; let output = render_metrics(&stats, &shared_state, &config, &tracker).await;
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 2")); assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 2"));
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.500000")); assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.500000"));
@@ -2992,13 +3099,16 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_render_has_type_annotations() { async fn test_render_has_type_annotations() {
let stats = Stats::new(); let stats = Stats::new();
let shared_state = ProxySharedState::new();
let tracker = UserIpTracker::new(); let tracker = UserIpTracker::new();
let config = ProxyConfig::default(); let config = ProxyConfig::default();
let output = render_metrics(&stats, &config, &tracker).await; let output = render_metrics(&stats, &shared_state, &config, &tracker).await;
assert!(output.contains("# TYPE telemt_uptime_seconds gauge")); assert!(output.contains("# TYPE telemt_uptime_seconds gauge"));
assert!(output.contains("# TYPE telemt_connections_total counter")); assert!(output.contains("# TYPE telemt_connections_total counter"));
assert!(output.contains("# TYPE telemt_connections_bad_total counter")); assert!(output.contains("# TYPE telemt_connections_bad_total counter"));
assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter")); assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter"));
assert!(output.contains("# TYPE telemt_auth_expensive_checks_total counter"));
assert!(output.contains("# TYPE telemt_auth_budget_exhausted_total counter"));
assert!(output.contains("# TYPE telemt_upstream_connect_attempt_total counter")); assert!(output.contains("# TYPE telemt_upstream_connect_attempt_total counter"));
assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter")); assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"));
assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter")); assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter"));
@@ -3035,6 +3145,7 @@ mod tests {
async fn test_endpoint_integration() { async fn test_endpoint_integration() {
let stats = Arc::new(Stats::new()); let stats = Arc::new(Stats::new());
let beobachten = Arc::new(BeobachtenStore::new()); let beobachten = Arc::new(BeobachtenStore::new());
let shared_state = ProxySharedState::new();
let tracker = UserIpTracker::new(); let tracker = UserIpTracker::new();
let mut config = ProxyConfig::default(); let mut config = ProxyConfig::default();
stats.increment_connects_all(); stats.increment_connects_all();
@@ -3042,9 +3153,16 @@ mod tests {
stats.increment_connects_all(); stats.increment_connects_all();
let req = Request::builder().uri("/metrics").body(()).unwrap(); let req = Request::builder().uri("/metrics").body(()).unwrap();
let resp = handle(req, &stats, &beobachten, &tracker, &config) let resp = handle(
.await req,
.unwrap(); &stats,
&beobachten,
shared_state.as_ref(),
&tracker,
&config,
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_body().collect().await.unwrap().to_bytes(); let body = resp.into_body().collect().await.unwrap().to_bytes();
assert!( assert!(
@@ -3052,6 +3170,14 @@ mod tests {
.unwrap() .unwrap()
.contains("telemt_connections_total 3") .contains("telemt_connections_total 3")
); );
assert!(
std::str::from_utf8(body.as_ref())
.unwrap()
.contains(&format!(
"telemt_build_info{{version=\"{}\"}} 1",
env!("CARGO_PKG_VERSION")
))
);
config.general.beobachten = true; config.general.beobachten = true;
config.general.beobachten_minutes = 10; config.general.beobachten_minutes = 10;
@@ -3061,9 +3187,16 @@ mod tests {
Duration::from_secs(600), Duration::from_secs(600),
); );
let req_beob = Request::builder().uri("/beobachten").body(()).unwrap(); let req_beob = Request::builder().uri("/beobachten").body(()).unwrap();
let resp_beob = handle(req_beob, &stats, &beobachten, &tracker, &config) let resp_beob = handle(
.await req_beob,
.unwrap(); &stats,
&beobachten,
shared_state.as_ref(),
&tracker,
&config,
)
.await
.unwrap();
assert_eq!(resp_beob.status(), StatusCode::OK); assert_eq!(resp_beob.status(), StatusCode::OK);
let body_beob = resp_beob.into_body().collect().await.unwrap().to_bytes(); let body_beob = resp_beob.into_body().collect().await.unwrap().to_bytes();
let beob_text = std::str::from_utf8(body_beob.as_ref()).unwrap(); let beob_text = std::str::from_utf8(body_beob.as_ref()).unwrap();
@@ -3071,9 +3204,16 @@ mod tests {
assert!(beob_text.contains("203.0.113.10-1")); assert!(beob_text.contains("203.0.113.10-1"));
let req404 = Request::builder().uri("/other").body(()).unwrap(); let req404 = Request::builder().uri("/other").body(()).unwrap();
let resp404 = handle(req404, &stats, &beobachten, &tracker, &config) let resp404 = handle(
.await req404,
.unwrap(); &stats,
&beobachten,
shared_state.as_ref(),
&tracker,
&config,
)
.await
.unwrap();
assert_eq!(resp404.status(), StatusCode::NOT_FOUND); assert_eq!(resp404.status(), StatusCode::NOT_FOUND);
} }
} }
+7
View File
@@ -56,6 +56,8 @@ const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
const ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR: usize = 2; const ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR: usize = 2;
const ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES: usize = 128 * 1024; const ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES: usize = 128 * 1024;
const QUOTA_RESERVE_SPIN_RETRIES: usize = 32; const QUOTA_RESERVE_SPIN_RETRIES: usize = 32;
const QUOTA_RESERVE_BACKOFF_MIN_MS: u64 = 1;
const QUOTA_RESERVE_BACKOFF_MAX_MS: u64 = 16;
#[derive(Default)] #[derive(Default)]
pub(crate) struct DesyncDedupRotationState { pub(crate) struct DesyncDedupRotationState {
@@ -573,6 +575,7 @@ async fn reserve_user_quota_with_yield(
bytes: u64, bytes: u64,
limit: u64, limit: u64,
) -> std::result::Result<u64, QuotaReserveError> { ) -> std::result::Result<u64, QuotaReserveError> {
let mut backoff_ms = QUOTA_RESERVE_BACKOFF_MIN_MS;
loop { loop {
for _ in 0..QUOTA_RESERVE_SPIN_RETRIES { for _ in 0..QUOTA_RESERVE_SPIN_RETRIES {
match user_stats.quota_try_reserve(bytes, limit) { match user_stats.quota_try_reserve(bytes, limit) {
@@ -585,6 +588,10 @@ async fn reserve_user_quota_with_yield(
} }
tokio::task::yield_now().await; tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
backoff_ms = backoff_ms
.saturating_mul(2)
.min(QUOTA_RESERVE_BACKOFF_MAX_MS);
} }
} }
+24 -11
View File
@@ -271,6 +271,7 @@ const QUOTA_LARGE_CHARGE_BYTES: u64 = 16 * 1024;
const QUOTA_ADAPTIVE_INTERVAL_MIN_BYTES: u64 = 4 * 1024; const QUOTA_ADAPTIVE_INTERVAL_MIN_BYTES: u64 = 4 * 1024;
const QUOTA_ADAPTIVE_INTERVAL_MAX_BYTES: u64 = 64 * 1024; const QUOTA_ADAPTIVE_INTERVAL_MAX_BYTES: u64 = 64 * 1024;
const QUOTA_RESERVE_SPIN_RETRIES: usize = 64; const QUOTA_RESERVE_SPIN_RETRIES: usize = 64;
const QUOTA_RESERVE_MAX_ROUNDS: usize = 8;
#[inline] #[inline]
fn quota_adaptive_interval_bytes(remaining_before: u64) -> u64 { fn quota_adaptive_interval_bytes(remaining_before: u64) -> u64 {
@@ -319,6 +320,7 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
let mut reserved_total = None; let mut reserved_total = None;
let mut reserve_rounds = 0usize; let mut reserve_rounds = 0usize;
while reserved_total.is_none() { while reserved_total.is_none() {
let mut saw_contention = false;
for _ in 0..QUOTA_RESERVE_SPIN_RETRIES { for _ in 0..QUOTA_RESERVE_SPIN_RETRIES {
match this.user_stats.quota_try_reserve(n_to_charge, limit) { match this.user_stats.quota_try_reserve(n_to_charge, limit) {
Ok(total) => { Ok(total) => {
@@ -331,15 +333,20 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
return Poll::Ready(Err(quota_io_error())); return Poll::Ready(Err(quota_io_error()));
} }
Err(crate::stats::QuotaReserveError::Contended) => { Err(crate::stats::QuotaReserveError::Contended) => {
std::hint::spin_loop(); saw_contention = true;
} }
} }
} }
reserve_rounds = reserve_rounds.saturating_add(1); if reserved_total.is_none() {
if reserved_total.is_none() && reserve_rounds >= 8 { reserve_rounds = reserve_rounds.saturating_add(1);
this.quota_exceeded.store(true, Ordering::Release); if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS {
buf.set_filled(before); this.quota_exceeded.store(true, Ordering::Release);
return Poll::Ready(Err(quota_io_error())); buf.set_filled(before);
return Poll::Ready(Err(quota_io_error()));
}
if saw_contention {
std::thread::yield_now();
}
} }
} }
@@ -407,6 +414,7 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
remaining_before = Some(remaining); remaining_before = Some(remaining);
let desired = remaining.min(buf.len() as u64); let desired = remaining.min(buf.len() as u64);
let mut saw_contention = false;
for _ in 0..QUOTA_RESERVE_SPIN_RETRIES { for _ in 0..QUOTA_RESERVE_SPIN_RETRIES {
match this.user_stats.quota_try_reserve(desired, limit) { match this.user_stats.quota_try_reserve(desired, limit) {
Ok(_) => { Ok(_) => {
@@ -418,15 +426,20 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
break; break;
} }
Err(crate::stats::QuotaReserveError::Contended) => { Err(crate::stats::QuotaReserveError::Contended) => {
std::hint::spin_loop(); saw_contention = true;
} }
} }
} }
reserve_rounds = reserve_rounds.saturating_add(1); if reserved_bytes == 0 {
if reserved_bytes == 0 && reserve_rounds >= 8 { reserve_rounds = reserve_rounds.saturating_add(1);
this.quota_exceeded.store(true, Ordering::Release); if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS {
return Poll::Ready(Err(quota_io_error())); this.quota_exceeded.store(true, Ordering::Release);
return Poll::Ready(Err(quota_io_error()));
}
if saw_contention {
std::thread::yield_now();
}
} }
} }
} else { } else {
+3 -2
View File
@@ -1,6 +1,5 @@
#![allow(clippy::too_many_arguments)] #![allow(clippy::too_many_arguments)]
use crc32fast::Hasher;
use crate::crypto::{SecureRandom, sha256_hmac}; use crate::crypto::{SecureRandom, sha256_hmac};
use crate::protocol::constants::{ use crate::protocol::constants::{
MAX_TLS_CIPHERTEXT_SIZE, TLS_RECORD_APPLICATION, TLS_RECORD_CHANGE_CIPHER, MAX_TLS_CIPHERTEXT_SIZE, TLS_RECORD_APPLICATION, TLS_RECORD_CHANGE_CIPHER,
@@ -8,6 +7,7 @@ use crate::protocol::constants::{
}; };
use crate::protocol::tls::{TLS_DIGEST_LEN, TLS_DIGEST_POS, gen_fake_x25519_key}; use crate::protocol::tls::{TLS_DIGEST_LEN, TLS_DIGEST_POS, gen_fake_x25519_key};
use crate::tls_front::types::{CachedTlsData, ParsedCertificateInfo, TlsProfileSource}; use crate::tls_front::types::{CachedTlsData, ParsedCertificateInfo, TlsProfileSource};
use crc32fast::Hasher;
const MIN_APP_DATA: usize = 64; const MIN_APP_DATA: usize = 64;
const MAX_APP_DATA: usize = MAX_TLS_CIPHERTEXT_SIZE; const MAX_APP_DATA: usize = MAX_TLS_CIPHERTEXT_SIZE;
@@ -343,7 +343,8 @@ mod tests {
}; };
use super::{ use super::{
build_compact_cert_info_payload, build_emulated_server_hello, hash_compact_cert_info_payload, build_compact_cert_info_payload, build_emulated_server_hello,
hash_compact_cert_info_payload,
}; };
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::protocol::constants::{ use crate::protocol::constants::{
+178 -20
View File
@@ -23,6 +23,60 @@ use super::codec::{RpcChecksumMode, WriterCommand, rpc_crc};
use super::registry::RouteResult; use super::registry::RouteResult;
use super::{ConnRegistry, MeResponse}; use super::{ConnRegistry, MeResponse};
const DATA_ROUTE_MAX_ATTEMPTS: usize = 3;
const DATA_ROUTE_QUEUE_FULL_STARVATION_THRESHOLD: u8 = 3;
fn should_close_on_route_result_for_data(result: RouteResult) -> bool {
matches!(result, RouteResult::NoConn | RouteResult::ChannelClosed)
}
fn should_close_on_route_result_for_ack(result: RouteResult) -> bool {
matches!(result, RouteResult::NoConn | RouteResult::ChannelClosed)
}
fn is_data_route_queue_full(result: RouteResult) -> bool {
matches!(
result,
RouteResult::QueueFullBase | RouteResult::QueueFullHigh
)
}
fn should_close_on_queue_full_streak(streak: u8) -> bool {
streak >= DATA_ROUTE_QUEUE_FULL_STARVATION_THRESHOLD
}
async fn route_data_with_retry(
reg: &ConnRegistry,
conn_id: u64,
flags: u32,
data: Bytes,
timeout_ms: u64,
) -> RouteResult {
let mut attempt = 0usize;
loop {
let routed = reg
.route_with_timeout(
conn_id,
MeResponse::Data {
flags,
data: data.clone(),
},
timeout_ms,
)
.await;
match routed {
RouteResult::QueueFullBase | RouteResult::QueueFullHigh => {
attempt = attempt.saturating_add(1);
if attempt >= DATA_ROUTE_MAX_ATTEMPTS {
return routed;
}
tokio::task::yield_now().await;
}
_ => return routed,
}
}
}
pub(crate) async fn reader_loop( pub(crate) async fn reader_loop(
mut rd: tokio::io::ReadHalf<TcpStream>, mut rd: tokio::io::ReadHalf<TcpStream>,
dk: [u8; 32], dk: [u8; 32],
@@ -43,6 +97,7 @@ pub(crate) async fn reader_loop(
) -> Result<()> { ) -> Result<()> {
let mut raw = enc_leftover; let mut raw = enc_leftover;
let mut expected_seq: i32 = 0; let mut expected_seq: i32 = 0;
let mut data_route_queue_full_streak = HashMap::<u64, u8>::new();
loop { loop {
let mut tmp = [0u8; 65_536]; let mut tmp = [0u8; 65_536];
@@ -127,27 +182,39 @@ pub(crate) async fn reader_loop(
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS"); trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed); let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
let routed = reg let routed =
.route_with_timeout(cid, MeResponse::Data { flags, data }, route_wait_ms) route_data_with_retry(reg.as_ref(), cid, flags, data, route_wait_ms).await;
.await; if matches!(routed, RouteResult::Routed) {
if !matches!(routed, RouteResult::Routed) { data_route_queue_full_streak.remove(&cid);
match routed { continue;
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(), }
RouteResult::ChannelClosed => { match routed {
stats.increment_me_route_drop_channel_closed() RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
} RouteResult::ChannelClosed => stats.increment_me_route_drop_channel_closed(),
RouteResult::QueueFullBase => { RouteResult::QueueFullBase => {
stats.increment_me_route_drop_queue_full(); stats.increment_me_route_drop_queue_full();
stats.increment_me_route_drop_queue_full_base(); stats.increment_me_route_drop_queue_full_base();
}
RouteResult::QueueFullHigh => {
stats.increment_me_route_drop_queue_full();
stats.increment_me_route_drop_queue_full_high();
}
RouteResult::Routed => {}
} }
RouteResult::QueueFullHigh => {
stats.increment_me_route_drop_queue_full();
stats.increment_me_route_drop_queue_full_high();
}
RouteResult::Routed => {}
}
if should_close_on_route_result_for_data(routed) {
data_route_queue_full_streak.remove(&cid);
reg.unregister(cid).await; reg.unregister(cid).await;
send_close_conn(&tx, cid).await; send_close_conn(&tx, cid).await;
continue;
}
if is_data_route_queue_full(routed) {
let streak = data_route_queue_full_streak.entry(cid).or_insert(0);
*streak = streak.saturating_add(1);
if should_close_on_queue_full_streak(*streak) {
data_route_queue_full_streak.remove(&cid);
reg.unregister(cid).await;
send_close_conn(&tx, cid).await;
}
} }
} else if pt == RPC_SIMPLE_ACK_U32 && body.len() >= 12 { } else if pt == RPC_SIMPLE_ACK_U32 && body.len() >= 12 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
@@ -171,19 +238,23 @@ pub(crate) async fn reader_loop(
} }
RouteResult::Routed => {} RouteResult::Routed => {}
} }
reg.unregister(cid).await; if should_close_on_route_result_for_ack(routed) {
send_close_conn(&tx, cid).await; reg.unregister(cid).await;
send_close_conn(&tx, cid).await;
}
} }
} else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 { } else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
debug!(cid, "RPC_CLOSE_EXT from ME"); debug!(cid, "RPC_CLOSE_EXT from ME");
let _ = reg.route_nowait(cid, MeResponse::Close).await; let _ = reg.route_nowait(cid, MeResponse::Close).await;
reg.unregister(cid).await; reg.unregister(cid).await;
data_route_queue_full_streak.remove(&cid);
} else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 { } else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
debug!(cid, "RPC_CLOSE_CONN from ME"); debug!(cid, "RPC_CLOSE_CONN from ME");
let _ = reg.route_nowait(cid, MeResponse::Close).await; let _ = reg.route_nowait(cid, MeResponse::Close).await;
reg.unregister(cid).await; reg.unregister(cid).await;
data_route_queue_full_streak.remove(&cid);
} else if pt == RPC_PING_U32 && body.len() >= 8 { } else if pt == RPC_PING_U32 && body.len() >= 8 {
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
trace!(ping_id, "RPC_PING -> RPC_PONG"); trace!(ping_id, "RPC_PING -> RPC_PONG");
@@ -243,6 +314,93 @@ pub(crate) async fn reader_loop(
} }
} }
#[cfg(test)]
mod tests {
use bytes::Bytes;
use crate::transport::middle_proxy::ConnRegistry;
use super::{
MeResponse, RouteResult, is_data_route_queue_full, route_data_with_retry,
should_close_on_queue_full_streak, should_close_on_route_result_for_ack,
should_close_on_route_result_for_data,
};
#[test]
fn data_route_only_fatal_results_close_immediately() {
assert!(!should_close_on_route_result_for_data(RouteResult::Routed));
assert!(!should_close_on_route_result_for_data(
RouteResult::QueueFullBase
));
assert!(!should_close_on_route_result_for_data(
RouteResult::QueueFullHigh
));
assert!(should_close_on_route_result_for_data(RouteResult::NoConn));
assert!(should_close_on_route_result_for_data(
RouteResult::ChannelClosed
));
}
#[test]
fn data_route_queue_full_uses_starvation_threshold() {
assert!(is_data_route_queue_full(RouteResult::QueueFullBase));
assert!(is_data_route_queue_full(RouteResult::QueueFullHigh));
assert!(!is_data_route_queue_full(RouteResult::NoConn));
assert!(!should_close_on_queue_full_streak(1));
assert!(!should_close_on_queue_full_streak(2));
assert!(should_close_on_queue_full_streak(3));
assert!(should_close_on_queue_full_streak(u8::MAX));
}
#[test]
fn ack_queue_full_is_soft_dropped_without_forced_close() {
assert!(!should_close_on_route_result_for_ack(RouteResult::Routed));
assert!(!should_close_on_route_result_for_ack(
RouteResult::QueueFullBase
));
assert!(!should_close_on_route_result_for_ack(
RouteResult::QueueFullHigh
));
assert!(should_close_on_route_result_for_ack(RouteResult::NoConn));
assert!(should_close_on_route_result_for_ack(
RouteResult::ChannelClosed
));
}
#[tokio::test]
async fn route_data_with_retry_returns_routed_when_channel_has_capacity() {
let reg = ConnRegistry::with_route_channel_capacity(1);
let (conn_id, mut rx) = reg.register().await;
let routed = route_data_with_retry(&reg, conn_id, 0, Bytes::from_static(b"a"), 20).await;
assert!(matches!(routed, RouteResult::Routed));
match rx.recv().await {
Some(MeResponse::Data { flags, data }) => {
assert_eq!(flags, 0);
assert_eq!(data, Bytes::from_static(b"a"));
}
other => panic!("expected routed data response, got {other:?}"),
}
}
#[tokio::test]
async fn route_data_with_retry_stops_after_bounded_attempts() {
let reg = ConnRegistry::with_route_channel_capacity(1);
let (conn_id, _rx) = reg.register().await;
assert!(matches!(
reg.route_nowait(conn_id, MeResponse::Ack(1)).await,
RouteResult::Routed
));
let routed = route_data_with_retry(&reg, conn_id, 0, Bytes::from_static(b"a"), 0).await;
assert!(matches!(
routed,
RouteResult::QueueFullBase | RouteResult::QueueFullHigh
));
}
}
async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) { async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) {
let mut p = Vec::with_capacity(12); let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes()); p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
+54 -33
View File
@@ -55,6 +55,20 @@ struct RoutingTable {
map: DashMap<u64, mpsc::Sender<MeResponse>>, map: DashMap<u64, mpsc::Sender<MeResponse>>,
} }
struct WriterTable {
map: DashMap<u64, mpsc::Sender<WriterCommand>>,
}
#[derive(Clone)]
struct HotConnBinding {
writer_id: u64,
meta: ConnMeta,
}
struct HotBindingTable {
map: DashMap<u64, HotConnBinding>,
}
struct BindingState { struct BindingState {
inner: Mutex<BindingInner>, inner: Mutex<BindingInner>,
} }
@@ -83,6 +97,8 @@ impl BindingInner {
pub struct ConnRegistry { pub struct ConnRegistry {
routing: RoutingTable, routing: RoutingTable,
writers: WriterTable,
hot_binding: HotBindingTable,
binding: BindingState, binding: BindingState,
next_id: AtomicU64, next_id: AtomicU64,
route_channel_capacity: usize, route_channel_capacity: usize,
@@ -105,6 +121,12 @@ impl ConnRegistry {
routing: RoutingTable { routing: RoutingTable {
map: DashMap::new(), map: DashMap::new(),
}, },
writers: WriterTable {
map: DashMap::new(),
},
hot_binding: HotBindingTable {
map: DashMap::new(),
},
binding: BindingState { binding: BindingState {
inner: Mutex::new(BindingInner::new()), inner: Mutex::new(BindingInner::new()),
}, },
@@ -149,16 +171,18 @@ impl ConnRegistry {
pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender<WriterCommand>) { pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender<WriterCommand>) {
let mut binding = self.binding.inner.lock().await; let mut binding = self.binding.inner.lock().await;
binding.writers.insert(writer_id, tx); binding.writers.insert(writer_id, tx.clone());
binding binding
.conns_for_writer .conns_for_writer
.entry(writer_id) .entry(writer_id)
.or_insert_with(HashSet::new); .or_insert_with(HashSet::new);
self.writers.map.insert(writer_id, tx);
} }
/// Unregister connection, returning associated writer_id if any. /// Unregister connection, returning associated writer_id if any.
pub async fn unregister(&self, id: u64) -> Option<u64> { pub async fn unregister(&self, id: u64) -> Option<u64> {
self.routing.map.remove(&id); self.routing.map.remove(&id);
self.hot_binding.map.remove(&id);
let mut binding = self.binding.inner.lock().await; let mut binding = self.binding.inner.lock().await;
binding.meta.remove(&id); binding.meta.remove(&id);
if let Some(writer_id) = binding.writer_for_conn.remove(&id) { if let Some(writer_id) = binding.writer_for_conn.remove(&id) {
@@ -325,13 +349,16 @@ impl ConnRegistry {
} }
binding.meta.insert(conn_id, meta.clone()); binding.meta.insert(conn_id, meta.clone());
binding.last_meta_for_writer.insert(writer_id, meta); binding.last_meta_for_writer.insert(writer_id, meta.clone());
binding.writer_idle_since_epoch_secs.remove(&writer_id); binding.writer_idle_since_epoch_secs.remove(&writer_id);
binding binding
.conns_for_writer .conns_for_writer
.entry(writer_id) .entry(writer_id)
.or_insert_with(HashSet::new) .or_insert_with(HashSet::new)
.insert(conn_id); .insert(conn_id);
self.hot_binding
.map
.insert(conn_id, HotConnBinding { writer_id, meta });
true true
} }
@@ -392,39 +419,20 @@ impl ConnRegistry {
} }
pub async fn get_writer(&self, conn_id: u64) -> Option<ConnWriter> { pub async fn get_writer(&self, conn_id: u64) -> Option<ConnWriter> {
let mut binding = self.binding.inner.lock().await;
// ROUTING IS THE SOURCE OF TRUTH:
// stale bindings are ignored and lazily cleaned when routing no longer
// contains the connection.
if !self.routing.map.contains_key(&conn_id) { if !self.routing.map.contains_key(&conn_id) {
binding.meta.remove(&conn_id);
if let Some(stale_writer_id) = binding.writer_for_conn.remove(&conn_id)
&& let Some(conns) = binding.conns_for_writer.get_mut(&stale_writer_id)
{
conns.remove(&conn_id);
if conns.is_empty() {
binding
.writer_idle_since_epoch_secs
.insert(stale_writer_id, Self::now_epoch_secs());
}
}
return None; return None;
} }
let writer_id = binding.writer_for_conn.get(&conn_id).copied()?; let writer_id = self
let Some(writer) = binding.writers.get(&writer_id).cloned() else { .hot_binding
binding.writer_for_conn.remove(&conn_id); .map
binding.meta.remove(&conn_id); .get(&conn_id)
if let Some(conns) = binding.conns_for_writer.get_mut(&writer_id) { .map(|entry| entry.writer_id)?;
conns.remove(&conn_id); let writer = self
if conns.is_empty() { .writers
binding .map
.writer_idle_since_epoch_secs .get(&writer_id)
.insert(writer_id, Self::now_epoch_secs()); .map(|entry| entry.value().clone())?;
}
}
return None;
};
Some(ConnWriter { Some(ConnWriter {
writer_id, writer_id,
tx: writer, tx: writer,
@@ -439,6 +447,7 @@ impl ConnRegistry {
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> { pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
let mut binding = self.binding.inner.lock().await; let mut binding = self.binding.inner.lock().await;
binding.writers.remove(&writer_id); binding.writers.remove(&writer_id);
self.writers.map.remove(&writer_id);
binding.last_meta_for_writer.remove(&writer_id); binding.last_meta_for_writer.remove(&writer_id);
binding.writer_idle_since_epoch_secs.remove(&writer_id); binding.writer_idle_since_epoch_secs.remove(&writer_id);
let conns = binding let conns = binding
@@ -454,6 +463,15 @@ impl ConnRegistry {
continue; continue;
} }
binding.writer_for_conn.remove(&conn_id); binding.writer_for_conn.remove(&conn_id);
let remove_hot = self
.hot_binding
.map
.get(&conn_id)
.map(|hot| hot.writer_id == writer_id)
.unwrap_or(false);
if remove_hot {
self.hot_binding.map.remove(&conn_id);
}
if let Some(m) = binding.meta.get(&conn_id) { if let Some(m) = binding.meta.get(&conn_id) {
out.push(BoundConn { out.push(BoundConn {
conn_id, conn_id,
@@ -466,8 +484,10 @@ impl ConnRegistry {
#[allow(dead_code)] #[allow(dead_code)]
pub async fn get_meta(&self, conn_id: u64) -> Option<ConnMeta> { pub async fn get_meta(&self, conn_id: u64) -> Option<ConnMeta> {
let binding = self.binding.inner.lock().await; self.hot_binding
binding.meta.get(&conn_id).cloned() .map
.get(&conn_id)
.map(|entry| entry.meta.clone())
} }
pub async fn is_writer_empty(&self, writer_id: u64) -> bool { pub async fn is_writer_empty(&self, writer_id: u64) -> bool {
@@ -491,6 +511,7 @@ impl ConnRegistry {
} }
binding.writers.remove(&writer_id); binding.writers.remove(&writer_id);
self.writers.map.remove(&writer_id);
binding.last_meta_for_writer.remove(&writer_id); binding.last_meta_for_writer.remove(&writer_id);
binding.writer_idle_since_epoch_secs.remove(&writer_id); binding.writer_idle_since_epoch_secs.remove(&writer_id);
binding.conns_for_writer.remove(&writer_id); binding.conns_for_writer.remove(&writer_id);
+1
View File
@@ -842,6 +842,7 @@ zabbix_export:
name: 'Prometheus metrics' name: 'Prometheus metrics'
type: HTTP_AGENT type: HTTP_AGENT
key: telemt.prom_metrics key: telemt.prom_metrics
history: '0'
value_type: TEXT value_type: TEXT
trends: '0' trends: '0'
url: '{$TELEMT_URL}' url: '{$TELEMT_URL}'