mirror of https://github.com/telemt/telemt.git
commit
f9e9ddd0f7
|
|
@ -121,6 +121,18 @@ pub(crate) fn default_desync_all_full() -> bool {
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_beobachten_minutes() -> u64 {
|
||||||
|
10
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_beobachten_flush_secs() -> u64 {
|
||||||
|
15
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_beobachten_file() -> String {
|
||||||
|
"cache/beobachten.txt".to_string()
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_tls_new_session_tickets() -> u8 {
|
pub(crate) fn default_tls_new_session_tickets() -> u8 {
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -153,6 +153,24 @@ impl ProxyConfig {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.general.beobachten_minutes == 0 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.beobachten_minutes must be > 0".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.general.beobachten_flush_secs == 0 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.beobachten_flush_secs must be > 0".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.general.beobachten_file.trim().is_empty() {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.beobachten_file cannot be empty".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if config.general.me_hardswap_warmup_delay_max_ms == 0 {
|
if config.general.me_hardswap_warmup_delay_max_ms == 0 {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
"general.me_hardswap_warmup_delay_max_ms must be > 0".to_string(),
|
"general.me_hardswap_warmup_delay_max_ms must be > 0".to_string(),
|
||||||
|
|
|
||||||
|
|
@ -206,6 +206,22 @@ pub struct GeneralConfig {
|
||||||
#[serde(default = "default_desync_all_full")]
|
#[serde(default = "default_desync_all_full")]
|
||||||
pub desync_all_full: bool,
|
pub desync_all_full: bool,
|
||||||
|
|
||||||
|
/// Enable per-IP forensic observation buckets for scanners and handshake failures.
|
||||||
|
#[serde(default)]
|
||||||
|
pub beobachten: bool,
|
||||||
|
|
||||||
|
/// Observation retention window in minutes for per-IP forensic buckets.
|
||||||
|
#[serde(default = "default_beobachten_minutes")]
|
||||||
|
pub beobachten_minutes: u64,
|
||||||
|
|
||||||
|
/// Snapshot flush interval in seconds for beob output file.
|
||||||
|
#[serde(default = "default_beobachten_flush_secs")]
|
||||||
|
pub beobachten_flush_secs: u64,
|
||||||
|
|
||||||
|
/// Snapshot file path for beob output.
|
||||||
|
#[serde(default = "default_beobachten_file")]
|
||||||
|
pub beobachten_file: String,
|
||||||
|
|
||||||
/// Enable C-like hard-swap for ME pool generations.
|
/// Enable C-like hard-swap for ME pool generations.
|
||||||
/// When true, Telemt prewarms a new generation and switches once full coverage is reached.
|
/// When true, Telemt prewarms a new generation and switches once full coverage is reached.
|
||||||
#[serde(default = "default_hardswap")]
|
#[serde(default = "default_hardswap")]
|
||||||
|
|
@ -383,6 +399,10 @@ impl Default for GeneralConfig {
|
||||||
crypto_pending_buffer: default_crypto_pending_buffer(),
|
crypto_pending_buffer: default_crypto_pending_buffer(),
|
||||||
max_client_frame: default_max_client_frame(),
|
max_client_frame: default_max_client_frame(),
|
||||||
desync_all_full: default_desync_all_full(),
|
desync_all_full: default_desync_all_full(),
|
||||||
|
beobachten: false,
|
||||||
|
beobachten_minutes: default_beobachten_minutes(),
|
||||||
|
beobachten_flush_secs: default_beobachten_flush_secs(),
|
||||||
|
beobachten_file: default_beobachten_file(),
|
||||||
hardswap: default_hardswap(),
|
hardswap: default_hardswap(),
|
||||||
fast_mode_min_tls_record: default_fast_mode_min_tls_record(),
|
fast_mode_min_tls_record: default_fast_mode_min_tls_record(),
|
||||||
update_every: Some(default_update_every_secs()),
|
update_every: Some(default_update_every_secs()),
|
||||||
|
|
|
||||||
42
src/main.rs
42
src/main.rs
|
|
@ -35,6 +35,7 @@ use crate::crypto::SecureRandom;
|
||||||
use crate::ip_tracker::UserIpTracker;
|
use crate::ip_tracker::UserIpTracker;
|
||||||
use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe};
|
use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe};
|
||||||
use crate::proxy::ClientHandler;
|
use crate::proxy::ClientHandler;
|
||||||
|
use crate::stats::beobachten::BeobachtenStore;
|
||||||
use crate::stats::{ReplayChecker, Stats};
|
use crate::stats::{ReplayChecker, Stats};
|
||||||
use crate::stream::BufferPool;
|
use crate::stream::BufferPool;
|
||||||
use crate::transport::middle_proxy::{
|
use crate::transport::middle_proxy::{
|
||||||
|
|
@ -159,6 +160,15 @@ fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {
|
||||||
info!(target: "telemt::links", "------------------------");
|
info!(target: "telemt::links", "------------------------");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result<()> {
|
||||||
|
if let Some(parent) = std::path::Path::new(path).parent()
|
||||||
|
&& !parent.as_os_str().is_empty()
|
||||||
|
{
|
||||||
|
tokio::fs::create_dir_all(parent).await?;
|
||||||
|
}
|
||||||
|
tokio::fs::write(path, payload).await
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
let (config_path, cli_silent, cli_log_level) = parse_cli();
|
let (config_path, cli_silent, cli_log_level) = parse_cli();
|
||||||
|
|
@ -256,6 +266,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
let prefer_ipv6 = decision.prefer_ipv6();
|
let prefer_ipv6 = decision.prefer_ipv6();
|
||||||
let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me);
|
let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me);
|
||||||
let stats = Arc::new(Stats::new());
|
let stats = Arc::new(Stats::new());
|
||||||
|
let beobachten = Arc::new(BeobachtenStore::new());
|
||||||
let rng = Arc::new(SecureRandom::new());
|
let rng = Arc::new(SecureRandom::new());
|
||||||
|
|
||||||
// IP Tracker initialization
|
// IP Tracker initialization
|
||||||
|
|
@ -692,6 +703,26 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
detected_ip_v6,
|
detected_ip_v6,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let beobachten_writer = beobachten.clone();
|
||||||
|
let config_rx_beobachten = config_rx.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
let cfg = config_rx_beobachten.borrow().clone();
|
||||||
|
let sleep_secs = cfg.general.beobachten_flush_secs.max(1);
|
||||||
|
|
||||||
|
if cfg.general.beobachten {
|
||||||
|
let ttl = Duration::from_secs(cfg.general.beobachten_minutes.saturating_mul(60));
|
||||||
|
let path = cfg.general.beobachten_file.clone();
|
||||||
|
let snapshot = beobachten_writer.snapshot_text(ttl);
|
||||||
|
if let Err(e) = write_beobachten_snapshot(&path, &snapshot).await {
|
||||||
|
warn!(error = %e, path = %path, "Failed to flush beobachten snapshot");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_secs(sleep_secs)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
if let Some(ref pool) = me_pool {
|
if let Some(ref pool) = me_pool {
|
||||||
let pool_clone = pool.clone();
|
let pool_clone = pool.clone();
|
||||||
let rng_clone = rng.clone();
|
let rng_clone = rng.clone();
|
||||||
|
|
@ -860,6 +891,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
let me_pool = me_pool.clone();
|
let me_pool = me_pool.clone();
|
||||||
let tls_cache = tls_cache.clone();
|
let tls_cache = tls_cache.clone();
|
||||||
let ip_tracker = ip_tracker.clone();
|
let ip_tracker = ip_tracker.clone();
|
||||||
|
let beobachten = beobachten.clone();
|
||||||
let max_connections_unix = max_connections.clone();
|
let max_connections_unix = max_connections.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
@ -887,6 +919,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
let me_pool = me_pool.clone();
|
let me_pool = me_pool.clone();
|
||||||
let tls_cache = tls_cache.clone();
|
let tls_cache = tls_cache.clone();
|
||||||
let ip_tracker = ip_tracker.clone();
|
let ip_tracker = ip_tracker.clone();
|
||||||
|
let beobachten = beobachten.clone();
|
||||||
let proxy_protocol_enabled = config.server.proxy_protocol;
|
let proxy_protocol_enabled = config.server.proxy_protocol;
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
@ -894,7 +927,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
if let Err(e) = crate::proxy::client::handle_client_stream(
|
if let Err(e) = crate::proxy::client::handle_client_stream(
|
||||||
stream, fake_peer, config, stats,
|
stream, fake_peer, config, stats,
|
||||||
upstream_manager, replay_checker, buffer_pool, rng,
|
upstream_manager, replay_checker, buffer_pool, rng,
|
||||||
me_pool, tls_cache, ip_tracker, proxy_protocol_enabled,
|
me_pool, tls_cache, ip_tracker, beobachten, proxy_protocol_enabled,
|
||||||
).await {
|
).await {
|
||||||
debug!(error = %e, "Unix socket connection error");
|
debug!(error = %e, "Unix socket connection error");
|
||||||
}
|
}
|
||||||
|
|
@ -942,9 +975,11 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
|
|
||||||
if let Some(port) = config.server.metrics_port {
|
if let Some(port) = config.server.metrics_port {
|
||||||
let stats = stats.clone();
|
let stats = stats.clone();
|
||||||
|
let beobachten = beobachten.clone();
|
||||||
|
let config_rx_metrics = config_rx.clone();
|
||||||
let whitelist = config.server.metrics_whitelist.clone();
|
let whitelist = config.server.metrics_whitelist.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
metrics::serve(port, stats, whitelist).await;
|
metrics::serve(port, stats, beobachten, config_rx_metrics, whitelist).await;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -958,6 +993,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
let me_pool = me_pool.clone();
|
let me_pool = me_pool.clone();
|
||||||
let tls_cache = tls_cache.clone();
|
let tls_cache = tls_cache.clone();
|
||||||
let ip_tracker = ip_tracker.clone();
|
let ip_tracker = ip_tracker.clone();
|
||||||
|
let beobachten = beobachten.clone();
|
||||||
let max_connections_tcp = max_connections.clone();
|
let max_connections_tcp = max_connections.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
@ -980,6 +1016,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
let me_pool = me_pool.clone();
|
let me_pool = me_pool.clone();
|
||||||
let tls_cache = tls_cache.clone();
|
let tls_cache = tls_cache.clone();
|
||||||
let ip_tracker = ip_tracker.clone();
|
let ip_tracker = ip_tracker.clone();
|
||||||
|
let beobachten = beobachten.clone();
|
||||||
let proxy_protocol_enabled = listener_proxy_protocol;
|
let proxy_protocol_enabled = listener_proxy_protocol;
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
@ -996,6 +1033,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
me_pool,
|
me_pool,
|
||||||
tls_cache,
|
tls_cache,
|
||||||
ip_tracker,
|
ip_tracker,
|
||||||
|
beobachten,
|
||||||
proxy_protocol_enabled,
|
proxy_protocol_enabled,
|
||||||
)
|
)
|
||||||
.run()
|
.run()
|
||||||
|
|
|
||||||
177
src/metrics.rs
177
src/metrics.rs
|
|
@ -1,6 +1,7 @@
|
||||||
use std::convert::Infallible;
|
use std::convert::Infallible;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use http_body_util::Full;
|
use http_body_util::Full;
|
||||||
use hyper::body::Bytes;
|
use hyper::body::Bytes;
|
||||||
|
|
@ -11,9 +12,17 @@ use ipnetwork::IpNetwork;
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
use tracing::{info, warn, debug};
|
use tracing::{info, warn, debug};
|
||||||
|
|
||||||
|
use crate::config::ProxyConfig;
|
||||||
|
use crate::stats::beobachten::BeobachtenStore;
|
||||||
use crate::stats::Stats;
|
use crate::stats::Stats;
|
||||||
|
|
||||||
pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) {
|
pub async fn serve(
|
||||||
|
port: u16,
|
||||||
|
stats: Arc<Stats>,
|
||||||
|
beobachten: Arc<BeobachtenStore>,
|
||||||
|
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
|
||||||
|
whitelist: Vec<IpNetwork>,
|
||||||
|
) {
|
||||||
let addr = SocketAddr::from(([0, 0, 0, 0], port));
|
let addr = SocketAddr::from(([0, 0, 0, 0], port));
|
||||||
let listener = match TcpListener::bind(addr).await {
|
let listener = match TcpListener::bind(addr).await {
|
||||||
Ok(l) => l,
|
Ok(l) => l,
|
||||||
|
|
@ -22,7 +31,7 @@ pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
info!("Metrics endpoint: http://{}/metrics", addr);
|
info!("Metrics endpoint: http://{}/metrics and /beobachten", addr);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (stream, peer) = match listener.accept().await {
|
let (stream, peer) = match listener.accept().await {
|
||||||
|
|
@ -39,10 +48,14 @@ pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) {
|
||||||
}
|
}
|
||||||
|
|
||||||
let stats = stats.clone();
|
let stats = stats.clone();
|
||||||
|
let beobachten = beobachten.clone();
|
||||||
|
let config_rx_conn = config_rx.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let svc = service_fn(move |req| {
|
let svc = service_fn(move |req| {
|
||||||
let stats = stats.clone();
|
let stats = stats.clone();
|
||||||
async move { handle(req, &stats) }
|
let beobachten = beobachten.clone();
|
||||||
|
let config = config_rx_conn.borrow().clone();
|
||||||
|
async move { handle(req, &stats, &beobachten, &config) }
|
||||||
});
|
});
|
||||||
if let Err(e) = http1::Builder::new()
|
if let Err(e) = http1::Builder::new()
|
||||||
.serve_connection(hyper_util::rt::TokioIo::new(stream), svc)
|
.serve_connection(hyper_util::rt::TokioIo::new(stream), svc)
|
||||||
|
|
@ -54,24 +67,48 @@ pub async fn serve(port: u16, stats: Arc<Stats>, whitelist: Vec<IpNetwork>) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle<B>(req: Request<B>, stats: &Stats) -> Result<Response<Full<Bytes>>, Infallible> {
|
fn handle<B>(
|
||||||
if req.uri().path() != "/metrics" {
|
req: Request<B>,
|
||||||
|
stats: &Stats,
|
||||||
|
beobachten: &BeobachtenStore,
|
||||||
|
config: &ProxyConfig,
|
||||||
|
) -> Result<Response<Full<Bytes>>, Infallible> {
|
||||||
|
if req.uri().path() == "/metrics" {
|
||||||
|
let body = render_metrics(stats);
|
||||||
let resp = Response::builder()
|
let resp = Response::builder()
|
||||||
.status(StatusCode::NOT_FOUND)
|
.status(StatusCode::OK)
|
||||||
.body(Full::new(Bytes::from("Not Found\n")))
|
.header("content-type", "text/plain; version=0.0.4; charset=utf-8")
|
||||||
|
.body(Full::new(Bytes::from(body)))
|
||||||
|
.unwrap();
|
||||||
|
return Ok(resp);
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.uri().path() == "/beobachten" {
|
||||||
|
let body = render_beobachten(beobachten, config);
|
||||||
|
let resp = Response::builder()
|
||||||
|
.status(StatusCode::OK)
|
||||||
|
.header("content-type", "text/plain; charset=utf-8")
|
||||||
|
.body(Full::new(Bytes::from(body)))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
return Ok(resp);
|
return Ok(resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
let body = render_metrics(stats);
|
|
||||||
let resp = Response::builder()
|
let resp = Response::builder()
|
||||||
.status(StatusCode::OK)
|
.status(StatusCode::NOT_FOUND)
|
||||||
.header("content-type", "text/plain; version=0.0.4; charset=utf-8")
|
.body(Full::new(Bytes::from("Not Found\n")))
|
||||||
.body(Full::new(Bytes::from(body)))
|
|
||||||
.unwrap();
|
.unwrap();
|
||||||
Ok(resp)
|
Ok(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn render_beobachten(beobachten: &BeobachtenStore, config: &ProxyConfig) -> String {
|
||||||
|
if !config.general.beobachten {
|
||||||
|
return "beobachten disabled\n".to_string();
|
||||||
|
}
|
||||||
|
|
||||||
|
let ttl = Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60));
|
||||||
|
beobachten.snapshot_text(ttl)
|
||||||
|
}
|
||||||
|
|
||||||
fn render_metrics(stats: &Stats) -> String {
|
fn render_metrics(stats: &Stats) -> String {
|
||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
let mut out = String::with_capacity(4096);
|
let mut out = String::with_capacity(4096);
|
||||||
|
|
@ -199,6 +236,95 @@ fn render_metrics(stats: &Stats) -> String {
|
||||||
stats.get_pool_stale_pick_total()
|
stats.get_pool_stale_pick_total()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_writer_removed_total {}",
|
||||||
|
stats.get_me_writer_removed_total()
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_writer_removed_unexpected_total Unexpected ME writer removals that triggered refill"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_unexpected_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_writer_removed_unexpected_total {}",
|
||||||
|
stats.get_me_writer_removed_unexpected_total()
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_refill_triggered_total Immediate ME refill runs started");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_refill_triggered_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_refill_triggered_total {}",
|
||||||
|
stats.get_me_refill_triggered_total()
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_refill_skipped_inflight_total Immediate ME refill skips due to inflight dedup"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_refill_skipped_inflight_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_refill_skipped_inflight_total {}",
|
||||||
|
stats.get_me_refill_skipped_inflight_total()
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_refill_failed_total Immediate ME refill failures");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_refill_failed_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_refill_failed_total {}",
|
||||||
|
stats.get_me_refill_failed_total()
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_writer_restored_same_endpoint_total Refilled ME writer restored on the same endpoint"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_writer_restored_same_endpoint_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_writer_restored_same_endpoint_total {}",
|
||||||
|
stats.get_me_writer_restored_same_endpoint_total()
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_writer_restored_fallback_total Refilled ME writer restored via fallback endpoint"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_writer_restored_fallback_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_writer_restored_fallback_total {}",
|
||||||
|
stats.get_me_writer_restored_fallback_total()
|
||||||
|
);
|
||||||
|
|
||||||
|
let unresolved_writer_losses = stats
|
||||||
|
.get_me_writer_removed_unexpected_total()
|
||||||
|
.saturating_sub(
|
||||||
|
stats
|
||||||
|
.get_me_writer_restored_same_endpoint_total()
|
||||||
|
.saturating_add(stats.get_me_writer_restored_fallback_total()),
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_writer_removed_unexpected_minus_restored_total Unexpected writer removals not yet compensated by restore"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_writer_removed_unexpected_minus_restored_total {}",
|
||||||
|
unresolved_writer_losses
|
||||||
|
);
|
||||||
|
|
||||||
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
|
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
|
||||||
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
|
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
|
||||||
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");
|
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");
|
||||||
|
|
@ -229,6 +355,7 @@ fn render_metrics(stats: &Stats) -> String {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use std::net::IpAddr;
|
||||||
use http_body_util::BodyExt;
|
use http_body_util::BodyExt;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
@ -277,11 +404,17 @@ mod tests {
|
||||||
assert!(output.contains("# TYPE telemt_connections_total counter"));
|
assert!(output.contains("# TYPE telemt_connections_total counter"));
|
||||||
assert!(output.contains("# TYPE telemt_connections_bad_total counter"));
|
assert!(output.contains("# TYPE telemt_connections_bad_total counter"));
|
||||||
assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter"));
|
assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter"));
|
||||||
|
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
|
||||||
|
assert!(output.contains(
|
||||||
|
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_endpoint_integration() {
|
async fn test_endpoint_integration() {
|
||||||
let stats = Arc::new(Stats::new());
|
let stats = Arc::new(Stats::new());
|
||||||
|
let beobachten = Arc::new(BeobachtenStore::new());
|
||||||
|
let mut config = ProxyConfig::default();
|
||||||
stats.increment_connects_all();
|
stats.increment_connects_all();
|
||||||
stats.increment_connects_all();
|
stats.increment_connects_all();
|
||||||
stats.increment_connects_all();
|
stats.increment_connects_all();
|
||||||
|
|
@ -290,16 +423,34 @@ mod tests {
|
||||||
.uri("/metrics")
|
.uri("/metrics")
|
||||||
.body(())
|
.body(())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let resp = handle(req, &stats).unwrap();
|
let resp = handle(req, &stats, &beobachten, &config).unwrap();
|
||||||
assert_eq!(resp.status(), StatusCode::OK);
|
assert_eq!(resp.status(), StatusCode::OK);
|
||||||
let body = resp.into_body().collect().await.unwrap().to_bytes();
|
let body = resp.into_body().collect().await.unwrap().to_bytes();
|
||||||
assert!(std::str::from_utf8(body.as_ref()).unwrap().contains("telemt_connections_total 3"));
|
assert!(std::str::from_utf8(body.as_ref()).unwrap().contains("telemt_connections_total 3"));
|
||||||
|
|
||||||
|
config.general.beobachten = true;
|
||||||
|
config.general.beobachten_minutes = 10;
|
||||||
|
beobachten.record(
|
||||||
|
"TLS-scanner",
|
||||||
|
"203.0.113.10".parse::<IpAddr>().unwrap(),
|
||||||
|
Duration::from_secs(600),
|
||||||
|
);
|
||||||
|
let req_beob = Request::builder()
|
||||||
|
.uri("/beobachten")
|
||||||
|
.body(())
|
||||||
|
.unwrap();
|
||||||
|
let resp_beob = handle(req_beob, &stats, &beobachten, &config).unwrap();
|
||||||
|
assert_eq!(resp_beob.status(), StatusCode::OK);
|
||||||
|
let body_beob = resp_beob.into_body().collect().await.unwrap().to_bytes();
|
||||||
|
let beob_text = std::str::from_utf8(body_beob.as_ref()).unwrap();
|
||||||
|
assert!(beob_text.contains("[TLS-scanner]"));
|
||||||
|
assert!(beob_text.contains("203.0.113.10-1"));
|
||||||
|
|
||||||
let req404 = Request::builder()
|
let req404 = Request::builder()
|
||||||
.uri("/other")
|
.uri("/other")
|
||||||
.body(())
|
.body(())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let resp404 = handle(req404, &stats).unwrap();
|
let resp404 = handle(req404, &stats, &beobachten, &config).unwrap();
|
||||||
assert_eq!(resp404.status(), StatusCode::NOT_FOUND);
|
assert_eq!(resp404.status(), StatusCode::NOT_FOUND);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
//! Client Handler
|
//! Client Handler
|
||||||
|
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::net::SocketAddr;
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
@ -27,6 +27,7 @@ use crate::error::{HandshakeResult, ProxyError, Result};
|
||||||
use crate::ip_tracker::UserIpTracker;
|
use crate::ip_tracker::UserIpTracker;
|
||||||
use crate::protocol::constants::*;
|
use crate::protocol::constants::*;
|
||||||
use crate::protocol::tls;
|
use crate::protocol::tls;
|
||||||
|
use crate::stats::beobachten::BeobachtenStore;
|
||||||
use crate::stats::{ReplayChecker, Stats};
|
use crate::stats::{ReplayChecker, Stats};
|
||||||
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
|
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
|
||||||
use crate::transport::middle_proxy::MePool;
|
use crate::transport::middle_proxy::MePool;
|
||||||
|
|
@ -39,6 +40,36 @@ use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle
|
||||||
use crate::proxy::masking::handle_bad_client;
|
use crate::proxy::masking::handle_bad_client;
|
||||||
use crate::proxy::middle_relay::handle_via_middle_proxy;
|
use crate::proxy::middle_relay::handle_via_middle_proxy;
|
||||||
|
|
||||||
|
fn beobachten_ttl(config: &ProxyConfig) -> Duration {
|
||||||
|
Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn record_beobachten_class(
|
||||||
|
beobachten: &BeobachtenStore,
|
||||||
|
config: &ProxyConfig,
|
||||||
|
peer_ip: IpAddr,
|
||||||
|
class: &str,
|
||||||
|
) {
|
||||||
|
if !config.general.beobachten {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
beobachten.record(class, peer_ip, beobachten_ttl(config));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn record_handshake_failure_class(
|
||||||
|
beobachten: &BeobachtenStore,
|
||||||
|
config: &ProxyConfig,
|
||||||
|
peer_ip: IpAddr,
|
||||||
|
error: &ProxyError,
|
||||||
|
) {
|
||||||
|
let class = if error.to_string().contains("expected 64 bytes, got 0") {
|
||||||
|
"expected_64_got_0"
|
||||||
|
} else {
|
||||||
|
"other"
|
||||||
|
};
|
||||||
|
record_beobachten_class(beobachten, config, peer_ip, class);
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn handle_client_stream<S>(
|
pub async fn handle_client_stream<S>(
|
||||||
mut stream: S,
|
mut stream: S,
|
||||||
peer: SocketAddr,
|
peer: SocketAddr,
|
||||||
|
|
@ -51,6 +82,7 @@ pub async fn handle_client_stream<S>(
|
||||||
me_pool: Option<Arc<MePool>>,
|
me_pool: Option<Arc<MePool>>,
|
||||||
tls_cache: Option<Arc<TlsFrontCache>>,
|
tls_cache: Option<Arc<TlsFrontCache>>,
|
||||||
ip_tracker: Arc<UserIpTracker>,
|
ip_tracker: Arc<UserIpTracker>,
|
||||||
|
beobachten: Arc<BeobachtenStore>,
|
||||||
proxy_protocol_enabled: bool,
|
proxy_protocol_enabled: bool,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where
|
||||||
|
|
@ -73,6 +105,7 @@ where
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad();
|
||||||
warn!(peer = %peer, error = %e, "Invalid PROXY protocol header");
|
warn!(peer = %peer, error = %e, "Invalid PROXY protocol header");
|
||||||
|
record_beobachten_class(&beobachten, &config, peer.ip(), "other");
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -82,6 +115,9 @@ where
|
||||||
|
|
||||||
let handshake_timeout = Duration::from_secs(config.timeouts.client_handshake);
|
let handshake_timeout = Duration::from_secs(config.timeouts.client_handshake);
|
||||||
let stats_for_timeout = stats.clone();
|
let stats_for_timeout = stats.clone();
|
||||||
|
let config_for_timeout = config.clone();
|
||||||
|
let beobachten_for_timeout = beobachten.clone();
|
||||||
|
let peer_for_timeout = real_peer.ip();
|
||||||
|
|
||||||
// For non-TCP streams, use a synthetic local address
|
// For non-TCP streams, use a synthetic local address
|
||||||
let local_addr: SocketAddr = format!("0.0.0.0:{}", config.server.port)
|
let local_addr: SocketAddr = format!("0.0.0.0:{}", config.server.port)
|
||||||
|
|
@ -103,7 +139,15 @@ where
|
||||||
debug!(peer = %real_peer, tls_len = tls_len, "TLS handshake too short");
|
debug!(peer = %real_peer, tls_len = tls_len, "TLS handshake too short");
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad();
|
||||||
let (reader, writer) = tokio::io::split(stream);
|
let (reader, writer) = tokio::io::split(stream);
|
||||||
handle_bad_client(reader, writer, &first_bytes, &config).await;
|
handle_bad_client(
|
||||||
|
reader,
|
||||||
|
writer,
|
||||||
|
&first_bytes,
|
||||||
|
real_peer.ip(),
|
||||||
|
&config,
|
||||||
|
&beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
return Ok(HandshakeOutcome::Handled);
|
return Ok(HandshakeOutcome::Handled);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -120,7 +164,15 @@ where
|
||||||
HandshakeResult::Success(result) => result,
|
HandshakeResult::Success(result) => result,
|
||||||
HandshakeResult::BadClient { reader, writer } => {
|
HandshakeResult::BadClient { reader, writer } => {
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad();
|
||||||
handle_bad_client(reader, writer, &handshake, &config).await;
|
handle_bad_client(
|
||||||
|
reader,
|
||||||
|
writer,
|
||||||
|
&handshake,
|
||||||
|
real_peer.ip(),
|
||||||
|
&config,
|
||||||
|
&beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
return Ok(HandshakeOutcome::Handled);
|
return Ok(HandshakeOutcome::Handled);
|
||||||
}
|
}
|
||||||
HandshakeResult::Error(e) => return Err(e),
|
HandshakeResult::Error(e) => return Err(e),
|
||||||
|
|
@ -156,7 +208,15 @@ where
|
||||||
debug!(peer = %real_peer, "Non-TLS modes disabled");
|
debug!(peer = %real_peer, "Non-TLS modes disabled");
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad();
|
||||||
let (reader, writer) = tokio::io::split(stream);
|
let (reader, writer) = tokio::io::split(stream);
|
||||||
handle_bad_client(reader, writer, &first_bytes, &config).await;
|
handle_bad_client(
|
||||||
|
reader,
|
||||||
|
writer,
|
||||||
|
&first_bytes,
|
||||||
|
real_peer.ip(),
|
||||||
|
&config,
|
||||||
|
&beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
return Ok(HandshakeOutcome::Handled);
|
return Ok(HandshakeOutcome::Handled);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -173,7 +233,15 @@ where
|
||||||
HandshakeResult::Success(result) => result,
|
HandshakeResult::Success(result) => result,
|
||||||
HandshakeResult::BadClient { reader, writer } => {
|
HandshakeResult::BadClient { reader, writer } => {
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad();
|
||||||
handle_bad_client(reader, writer, &handshake, &config).await;
|
handle_bad_client(
|
||||||
|
reader,
|
||||||
|
writer,
|
||||||
|
&handshake,
|
||||||
|
real_peer.ip(),
|
||||||
|
&config,
|
||||||
|
&beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
return Ok(HandshakeOutcome::Handled);
|
return Ok(HandshakeOutcome::Handled);
|
||||||
}
|
}
|
||||||
HandshakeResult::Error(e) => return Err(e),
|
HandshakeResult::Error(e) => return Err(e),
|
||||||
|
|
@ -200,11 +268,23 @@ where
|
||||||
Ok(Ok(outcome)) => outcome,
|
Ok(Ok(outcome)) => outcome,
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
debug!(peer = %peer, error = %e, "Handshake failed");
|
debug!(peer = %peer, error = %e, "Handshake failed");
|
||||||
|
record_handshake_failure_class(
|
||||||
|
&beobachten_for_timeout,
|
||||||
|
&config_for_timeout,
|
||||||
|
peer_for_timeout,
|
||||||
|
&e,
|
||||||
|
);
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
stats_for_timeout.increment_handshake_timeouts();
|
stats_for_timeout.increment_handshake_timeouts();
|
||||||
debug!(peer = %peer, "Handshake timeout");
|
debug!(peer = %peer, "Handshake timeout");
|
||||||
|
record_beobachten_class(
|
||||||
|
&beobachten_for_timeout,
|
||||||
|
&config_for_timeout,
|
||||||
|
peer_for_timeout,
|
||||||
|
"other",
|
||||||
|
);
|
||||||
return Err(ProxyError::TgHandshakeTimeout);
|
return Err(ProxyError::TgHandshakeTimeout);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -230,6 +310,7 @@ pub struct RunningClientHandler {
|
||||||
me_pool: Option<Arc<MePool>>,
|
me_pool: Option<Arc<MePool>>,
|
||||||
tls_cache: Option<Arc<TlsFrontCache>>,
|
tls_cache: Option<Arc<TlsFrontCache>>,
|
||||||
ip_tracker: Arc<UserIpTracker>,
|
ip_tracker: Arc<UserIpTracker>,
|
||||||
|
beobachten: Arc<BeobachtenStore>,
|
||||||
proxy_protocol_enabled: bool,
|
proxy_protocol_enabled: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -246,6 +327,7 @@ impl ClientHandler {
|
||||||
me_pool: Option<Arc<MePool>>,
|
me_pool: Option<Arc<MePool>>,
|
||||||
tls_cache: Option<Arc<TlsFrontCache>>,
|
tls_cache: Option<Arc<TlsFrontCache>>,
|
||||||
ip_tracker: Arc<UserIpTracker>,
|
ip_tracker: Arc<UserIpTracker>,
|
||||||
|
beobachten: Arc<BeobachtenStore>,
|
||||||
proxy_protocol_enabled: bool,
|
proxy_protocol_enabled: bool,
|
||||||
) -> RunningClientHandler {
|
) -> RunningClientHandler {
|
||||||
RunningClientHandler {
|
RunningClientHandler {
|
||||||
|
|
@ -260,6 +342,7 @@ impl ClientHandler {
|
||||||
me_pool,
|
me_pool,
|
||||||
tls_cache,
|
tls_cache,
|
||||||
ip_tracker,
|
ip_tracker,
|
||||||
|
beobachten,
|
||||||
proxy_protocol_enabled,
|
proxy_protocol_enabled,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -284,17 +367,32 @@ impl RunningClientHandler {
|
||||||
|
|
||||||
let handshake_timeout = Duration::from_secs(self.config.timeouts.client_handshake);
|
let handshake_timeout = Duration::from_secs(self.config.timeouts.client_handshake);
|
||||||
let stats = self.stats.clone();
|
let stats = self.stats.clone();
|
||||||
|
let config_for_timeout = self.config.clone();
|
||||||
|
let beobachten_for_timeout = self.beobachten.clone();
|
||||||
|
let peer_for_timeout = peer.ip();
|
||||||
|
|
||||||
// Phase 1: handshake (with timeout)
|
// Phase 1: handshake (with timeout)
|
||||||
let outcome = match timeout(handshake_timeout, self.do_handshake()).await {
|
let outcome = match timeout(handshake_timeout, self.do_handshake()).await {
|
||||||
Ok(Ok(outcome)) => outcome,
|
Ok(Ok(outcome)) => outcome,
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
debug!(peer = %peer, error = %e, "Handshake failed");
|
debug!(peer = %peer, error = %e, "Handshake failed");
|
||||||
|
record_handshake_failure_class(
|
||||||
|
&beobachten_for_timeout,
|
||||||
|
&config_for_timeout,
|
||||||
|
peer_for_timeout,
|
||||||
|
&e,
|
||||||
|
);
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
stats.increment_handshake_timeouts();
|
stats.increment_handshake_timeouts();
|
||||||
debug!(peer = %peer, "Handshake timeout");
|
debug!(peer = %peer, "Handshake timeout");
|
||||||
|
record_beobachten_class(
|
||||||
|
&beobachten_for_timeout,
|
||||||
|
&config_for_timeout,
|
||||||
|
peer_for_timeout,
|
||||||
|
"other",
|
||||||
|
);
|
||||||
return Err(ProxyError::TgHandshakeTimeout);
|
return Err(ProxyError::TgHandshakeTimeout);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -321,6 +419,12 @@ impl RunningClientHandler {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
self.stats.increment_connects_bad();
|
self.stats.increment_connects_bad();
|
||||||
warn!(peer = %self.peer, error = %e, "Invalid PROXY protocol header");
|
warn!(peer = %self.peer, error = %e, "Invalid PROXY protocol header");
|
||||||
|
record_beobachten_class(
|
||||||
|
&self.beobachten,
|
||||||
|
&self.config,
|
||||||
|
self.peer.ip(),
|
||||||
|
"other",
|
||||||
|
);
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -354,7 +458,15 @@ impl RunningClientHandler {
|
||||||
debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short");
|
debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short");
|
||||||
self.stats.increment_connects_bad();
|
self.stats.increment_connects_bad();
|
||||||
let (reader, writer) = self.stream.into_split();
|
let (reader, writer) = self.stream.into_split();
|
||||||
handle_bad_client(reader, writer, &first_bytes, &self.config).await;
|
handle_bad_client(
|
||||||
|
reader,
|
||||||
|
writer,
|
||||||
|
&first_bytes,
|
||||||
|
peer.ip(),
|
||||||
|
&self.config,
|
||||||
|
&self.beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
return Ok(HandshakeOutcome::Handled);
|
return Ok(HandshakeOutcome::Handled);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -385,7 +497,15 @@ impl RunningClientHandler {
|
||||||
HandshakeResult::Success(result) => result,
|
HandshakeResult::Success(result) => result,
|
||||||
HandshakeResult::BadClient { reader, writer } => {
|
HandshakeResult::BadClient { reader, writer } => {
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad();
|
||||||
handle_bad_client(reader, writer, &handshake, &config).await;
|
handle_bad_client(
|
||||||
|
reader,
|
||||||
|
writer,
|
||||||
|
&handshake,
|
||||||
|
peer.ip(),
|
||||||
|
&config,
|
||||||
|
&self.beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
return Ok(HandshakeOutcome::Handled);
|
return Ok(HandshakeOutcome::Handled);
|
||||||
}
|
}
|
||||||
HandshakeResult::Error(e) => return Err(e),
|
HandshakeResult::Error(e) => return Err(e),
|
||||||
|
|
@ -446,7 +566,15 @@ impl RunningClientHandler {
|
||||||
debug!(peer = %peer, "Non-TLS modes disabled");
|
debug!(peer = %peer, "Non-TLS modes disabled");
|
||||||
self.stats.increment_connects_bad();
|
self.stats.increment_connects_bad();
|
||||||
let (reader, writer) = self.stream.into_split();
|
let (reader, writer) = self.stream.into_split();
|
||||||
handle_bad_client(reader, writer, &first_bytes, &self.config).await;
|
handle_bad_client(
|
||||||
|
reader,
|
||||||
|
writer,
|
||||||
|
&first_bytes,
|
||||||
|
peer.ip(),
|
||||||
|
&self.config,
|
||||||
|
&self.beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
return Ok(HandshakeOutcome::Handled);
|
return Ok(HandshakeOutcome::Handled);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -476,7 +604,15 @@ impl RunningClientHandler {
|
||||||
HandshakeResult::Success(result) => result,
|
HandshakeResult::Success(result) => result,
|
||||||
HandshakeResult::BadClient { reader, writer } => {
|
HandshakeResult::BadClient { reader, writer } => {
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad();
|
||||||
handle_bad_client(reader, writer, &handshake, &config).await;
|
handle_bad_client(
|
||||||
|
reader,
|
||||||
|
writer,
|
||||||
|
&handshake,
|
||||||
|
peer.ip(),
|
||||||
|
&config,
|
||||||
|
&self.beobachten,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
return Ok(HandshakeOutcome::Handled);
|
return Ok(HandshakeOutcome::Handled);
|
||||||
}
|
}
|
||||||
HandshakeResult::Error(e) => return Err(e),
|
HandshakeResult::Error(e) => return Err(e),
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
//! Masking - forward unrecognized traffic to mask host
|
//! Masking - forward unrecognized traffic to mask host
|
||||||
|
|
||||||
use std::str;
|
use std::str;
|
||||||
|
use std::net::IpAddr;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
|
|
@ -9,6 +10,7 @@ use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt};
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
use crate::config::ProxyConfig;
|
use crate::config::ProxyConfig;
|
||||||
|
use crate::stats::beobachten::BeobachtenStore;
|
||||||
|
|
||||||
const MASK_TIMEOUT: Duration = Duration::from_secs(5);
|
const MASK_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
/// Maximum duration for the entire masking relay.
|
/// Maximum duration for the entire masking relay.
|
||||||
|
|
@ -50,20 +52,26 @@ pub async fn handle_bad_client<R, W>(
|
||||||
reader: R,
|
reader: R,
|
||||||
writer: W,
|
writer: W,
|
||||||
initial_data: &[u8],
|
initial_data: &[u8],
|
||||||
|
peer_ip: IpAddr,
|
||||||
config: &ProxyConfig,
|
config: &ProxyConfig,
|
||||||
|
beobachten: &BeobachtenStore,
|
||||||
)
|
)
|
||||||
where
|
where
|
||||||
R: AsyncRead + Unpin + Send + 'static,
|
R: AsyncRead + Unpin + Send + 'static,
|
||||||
W: AsyncWrite + Unpin + Send + 'static,
|
W: AsyncWrite + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
|
let client_type = detect_client_type(initial_data);
|
||||||
|
if config.general.beobachten {
|
||||||
|
let ttl = Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60));
|
||||||
|
beobachten.record(client_type, peer_ip, ttl);
|
||||||
|
}
|
||||||
|
|
||||||
if !config.censorship.mask {
|
if !config.censorship.mask {
|
||||||
// Masking disabled, just consume data
|
// Masking disabled, just consume data
|
||||||
consume_client_data(reader).await;
|
consume_client_data(reader).await;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let client_type = detect_client_type(initial_data);
|
|
||||||
|
|
||||||
// Connect via Unix socket or TCP
|
// Connect via Unix socket or TCP
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
if let Some(ref sock_path) = config.censorship.mask_unix_sock {
|
if let Some(ref sock_path) = config.censorship.mask_unix_sock {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,117 @@
|
||||||
|
//! Per-IP forensic buckets for scanner and handshake failure observation.
|
||||||
|
|
||||||
|
use std::collections::{BTreeMap, HashMap};
|
||||||
|
use std::net::IpAddr;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
|
||||||
|
const CLEANUP_INTERVAL: Duration = Duration::from_secs(30);
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct BeobachtenInner {
|
||||||
|
entries: HashMap<(String, IpAddr), BeobachtenEntry>,
|
||||||
|
last_cleanup: Option<Instant>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
|
struct BeobachtenEntry {
|
||||||
|
tries: u64,
|
||||||
|
last_seen: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// In-memory, TTL-scoped per-IP counters keyed by source class.
|
||||||
|
pub struct BeobachtenStore {
|
||||||
|
inner: Mutex<BeobachtenInner>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for BeobachtenStore {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BeobachtenStore {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Mutex::new(BeobachtenInner::default()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn record(&self, class: &str, ip: IpAddr, ttl: Duration) {
|
||||||
|
if class.is_empty() || ttl.is_zero() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
let mut guard = self.inner.lock();
|
||||||
|
Self::cleanup_if_needed(&mut guard, now, ttl);
|
||||||
|
|
||||||
|
let key = (class.to_string(), ip);
|
||||||
|
let entry = guard.entries.entry(key).or_insert(BeobachtenEntry {
|
||||||
|
tries: 0,
|
||||||
|
last_seen: now,
|
||||||
|
});
|
||||||
|
entry.tries = entry.tries.saturating_add(1);
|
||||||
|
entry.last_seen = now;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn snapshot_text(&self, ttl: Duration) -> String {
|
||||||
|
if ttl.is_zero() {
|
||||||
|
return "beobachten disabled\n".to_string();
|
||||||
|
}
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
let mut guard = self.inner.lock();
|
||||||
|
Self::cleanup(&mut guard, now, ttl);
|
||||||
|
guard.last_cleanup = Some(now);
|
||||||
|
|
||||||
|
let mut grouped = BTreeMap::<String, Vec<(IpAddr, u64)>>::new();
|
||||||
|
for ((class, ip), entry) in &guard.entries {
|
||||||
|
grouped
|
||||||
|
.entry(class.clone())
|
||||||
|
.or_default()
|
||||||
|
.push((*ip, entry.tries));
|
||||||
|
}
|
||||||
|
|
||||||
|
if grouped.is_empty() {
|
||||||
|
return "empty\n".to_string();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut out = String::with_capacity(grouped.len() * 64);
|
||||||
|
for (class, entries) in &mut grouped {
|
||||||
|
out.push('[');
|
||||||
|
out.push_str(class);
|
||||||
|
out.push_str("]\n");
|
||||||
|
|
||||||
|
entries.sort_by(|(ip_a, tries_a), (ip_b, tries_b)| {
|
||||||
|
tries_b
|
||||||
|
.cmp(tries_a)
|
||||||
|
.then_with(|| ip_a.to_string().cmp(&ip_b.to_string()))
|
||||||
|
});
|
||||||
|
|
||||||
|
for (ip, tries) in entries {
|
||||||
|
out.push_str(&format!("{ip}-{tries}\n"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cleanup_if_needed(inner: &mut BeobachtenInner, now: Instant, ttl: Duration) {
|
||||||
|
let should_cleanup = match inner.last_cleanup {
|
||||||
|
Some(last) => now.saturating_duration_since(last) >= CLEANUP_INTERVAL,
|
||||||
|
None => true,
|
||||||
|
};
|
||||||
|
if should_cleanup {
|
||||||
|
Self::cleanup(inner, now, ttl);
|
||||||
|
inner.last_cleanup = Some(now);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cleanup(inner: &mut BeobachtenInner, now: Instant, ttl: Duration) {
|
||||||
|
inner.entries.retain(|_, entry| {
|
||||||
|
now.saturating_duration_since(entry.last_seen) <= ttl
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -2,6 +2,8 @@
|
||||||
|
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
pub mod beobachten;
|
||||||
|
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::time::{Instant, Duration};
|
use std::time::{Instant, Duration};
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
|
|
@ -43,6 +45,13 @@ pub struct Stats {
|
||||||
pool_drain_active: AtomicU64,
|
pool_drain_active: AtomicU64,
|
||||||
pool_force_close_total: AtomicU64,
|
pool_force_close_total: AtomicU64,
|
||||||
pool_stale_pick_total: AtomicU64,
|
pool_stale_pick_total: AtomicU64,
|
||||||
|
me_writer_removed_total: AtomicU64,
|
||||||
|
me_writer_removed_unexpected_total: AtomicU64,
|
||||||
|
me_refill_triggered_total: AtomicU64,
|
||||||
|
me_refill_skipped_inflight_total: AtomicU64,
|
||||||
|
me_refill_failed_total: AtomicU64,
|
||||||
|
me_writer_restored_same_endpoint_total: AtomicU64,
|
||||||
|
me_writer_restored_fallback_total: AtomicU64,
|
||||||
user_stats: DashMap<String, UserStats>,
|
user_stats: DashMap<String, UserStats>,
|
||||||
start_time: parking_lot::RwLock<Option<Instant>>,
|
start_time: parking_lot::RwLock<Option<Instant>>,
|
||||||
}
|
}
|
||||||
|
|
@ -142,6 +151,27 @@ impl Stats {
|
||||||
pub fn increment_pool_stale_pick_total(&self) {
|
pub fn increment_pool_stale_pick_total(&self) {
|
||||||
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
|
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
pub fn increment_me_writer_removed_total(&self) {
|
||||||
|
self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_me_writer_removed_unexpected_total(&self) {
|
||||||
|
self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_me_refill_triggered_total(&self) {
|
||||||
|
self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_me_refill_skipped_inflight_total(&self) {
|
||||||
|
self.me_refill_skipped_inflight_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_me_refill_failed_total(&self) {
|
||||||
|
self.me_refill_failed_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_me_writer_restored_same_endpoint_total(&self) {
|
||||||
|
self.me_writer_restored_same_endpoint_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_me_writer_restored_fallback_total(&self) {
|
||||||
|
self.me_writer_restored_fallback_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
||||||
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
||||||
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
|
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
|
||||||
|
|
@ -195,6 +225,27 @@ impl Stats {
|
||||||
pub fn get_pool_stale_pick_total(&self) -> u64 {
|
pub fn get_pool_stale_pick_total(&self) -> u64 {
|
||||||
self.pool_stale_pick_total.load(Ordering::Relaxed)
|
self.pool_stale_pick_total.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
pub fn get_me_writer_removed_total(&self) -> u64 {
|
||||||
|
self.me_writer_removed_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_removed_unexpected_total(&self) -> u64 {
|
||||||
|
self.me_writer_removed_unexpected_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_refill_triggered_total(&self) -> u64 {
|
||||||
|
self.me_refill_triggered_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_refill_skipped_inflight_total(&self) -> u64 {
|
||||||
|
self.me_refill_skipped_inflight_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_refill_failed_total(&self) -> u64 {
|
||||||
|
self.me_refill_failed_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_restored_same_endpoint_total(&self) -> u64 {
|
||||||
|
self.me_writer_restored_same_endpoint_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_restored_fallback_total(&self) -> u64 {
|
||||||
|
self.me_writer_restored_fallback_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn increment_user_connects(&self, user: &str) {
|
pub fn increment_user_connects(&self, user: &str) {
|
||||||
self.user_stats.entry(user.to_string()).or_default()
|
self.user_stats.entry(user.to_string()).or_default()
|
||||||
|
|
|
||||||
|
|
@ -708,6 +708,7 @@ impl MePool {
|
||||||
match self.connect_one(addr, self.rng.as_ref()).await {
|
match self.connect_one(addr, self.rng.as_ref()).await {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
self.stats.increment_me_reconnect_success();
|
self.stats.increment_me_reconnect_success();
|
||||||
|
self.stats.increment_me_writer_restored_same_endpoint_total();
|
||||||
info!(
|
info!(
|
||||||
%addr,
|
%addr,
|
||||||
attempt = attempt + 1,
|
attempt = attempt + 1,
|
||||||
|
|
@ -728,6 +729,7 @@ impl MePool {
|
||||||
|
|
||||||
let dc_endpoints = self.endpoints_for_same_dc(addr).await;
|
let dc_endpoints = self.endpoints_for_same_dc(addr).await;
|
||||||
if dc_endpoints.is_empty() {
|
if dc_endpoints.is_empty() {
|
||||||
|
self.stats.increment_me_refill_failed_total();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -738,6 +740,7 @@ impl MePool {
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
self.stats.increment_me_reconnect_success();
|
self.stats.increment_me_reconnect_success();
|
||||||
|
self.stats.increment_me_writer_restored_fallback_total();
|
||||||
info!(
|
info!(
|
||||||
%addr,
|
%addr,
|
||||||
attempt = attempt + 1,
|
attempt = attempt + 1,
|
||||||
|
|
@ -747,6 +750,7 @@ impl MePool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.stats.increment_me_refill_failed_total();
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -756,9 +760,11 @@ impl MePool {
|
||||||
{
|
{
|
||||||
let mut guard = pool.refill_inflight.lock().await;
|
let mut guard = pool.refill_inflight.lock().await;
|
||||||
if !guard.insert(addr) {
|
if !guard.insert(addr) {
|
||||||
|
pool.stats.increment_me_refill_skipped_inflight_total();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pool.stats.increment_me_refill_triggered_total();
|
||||||
|
|
||||||
let restored = pool.refill_writer_after_loss(addr).await;
|
let restored = pool.refill_writer_after_loss(addr).await;
|
||||||
if !restored {
|
if !restored {
|
||||||
|
|
@ -1189,9 +1195,13 @@ impl MePool {
|
||||||
if was_draining {
|
if was_draining {
|
||||||
self.stats.decrement_pool_drain_active();
|
self.stats.decrement_pool_drain_active();
|
||||||
}
|
}
|
||||||
|
self.stats.increment_me_writer_removed_total();
|
||||||
w.cancel.cancel();
|
w.cancel.cancel();
|
||||||
removed_addr = Some(w.addr);
|
removed_addr = Some(w.addr);
|
||||||
trigger_refill = !was_draining;
|
trigger_refill = !was_draining;
|
||||||
|
if trigger_refill {
|
||||||
|
self.stats.increment_me_writer_removed_unexpected_total();
|
||||||
|
}
|
||||||
close_tx = Some(w.tx.clone());
|
close_tx = Some(w.tx.clone());
|
||||||
self.conn_count.fetch_sub(1, Ordering::Relaxed);
|
self.conn_count.fetch_sub(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue