ME Pool Beobachter

This commit is contained in:
Alexey
2026-02-25 02:10:14 +03:00
parent c6c3d71b08
commit 618b7a1837
8 changed files with 318 additions and 26 deletions
+40 -2
View File
@@ -35,6 +35,7 @@ use crate::crypto::SecureRandom;
use crate::ip_tracker::UserIpTracker;
use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe};
use crate::proxy::ClientHandler;
use crate::stats::beobachten::BeobachtenStore;
use crate::stats::{ReplayChecker, Stats};
use crate::stream::BufferPool;
use crate::transport::middle_proxy::{
@@ -159,6 +160,15 @@ fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {
info!(target: "telemt::links", "------------------------");
}
async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result<()> {
if let Some(parent) = std::path::Path::new(path).parent()
&& !parent.as_os_str().is_empty()
{
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(path, payload).await
}
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let (config_path, cli_silent, cli_log_level) = parse_cli();
@@ -256,6 +266,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let prefer_ipv6 = decision.prefer_ipv6();
let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me);
let stats = Arc::new(Stats::new());
let beobachten = Arc::new(BeobachtenStore::new());
let rng = Arc::new(SecureRandom::new());
// IP Tracker initialization
@@ -692,6 +703,26 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
detected_ip_v6,
);
let beobachten_writer = beobachten.clone();
let config_rx_beobachten = config_rx.clone();
tokio::spawn(async move {
loop {
let cfg = config_rx_beobachten.borrow().clone();
let sleep_secs = cfg.general.beobachten_flush_secs.max(1);
if cfg.general.beobachten {
let ttl = Duration::from_secs(cfg.general.beobachten_minutes.saturating_mul(60));
let path = cfg.general.beobachten_file.clone();
let snapshot = beobachten_writer.snapshot_text(ttl);
if let Err(e) = write_beobachten_snapshot(&path, &snapshot).await {
warn!(error = %e, path = %path, "Failed to flush beobachten snapshot");
}
}
tokio::time::sleep(Duration::from_secs(sleep_secs)).await;
}
});
if let Some(ref pool) = me_pool {
let pool_clone = pool.clone();
let rng_clone = rng.clone();
@@ -860,6 +891,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
let beobachten = beobachten.clone();
let max_connections_unix = max_connections.clone();
tokio::spawn(async move {
@@ -887,6 +919,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
let beobachten = beobachten.clone();
let proxy_protocol_enabled = config.server.proxy_protocol;
tokio::spawn(async move {
@@ -894,7 +927,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
if let Err(e) = crate::proxy::client::handle_client_stream(
stream, fake_peer, config, stats,
upstream_manager, replay_checker, buffer_pool, rng,
me_pool, tls_cache, ip_tracker, proxy_protocol_enabled,
me_pool, tls_cache, ip_tracker, beobachten, proxy_protocol_enabled,
).await {
debug!(error = %e, "Unix socket connection error");
}
@@ -942,9 +975,11 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
if let Some(port) = config.server.metrics_port {
let stats = stats.clone();
let beobachten = beobachten.clone();
let config_rx_metrics = config_rx.clone();
let whitelist = config.server.metrics_whitelist.clone();
tokio::spawn(async move {
metrics::serve(port, stats, whitelist).await;
metrics::serve(port, stats, beobachten, config_rx_metrics, whitelist).await;
});
}
@@ -958,6 +993,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
let beobachten = beobachten.clone();
let max_connections_tcp = max_connections.clone();
tokio::spawn(async move {
@@ -980,6 +1016,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
let beobachten = beobachten.clone();
let proxy_protocol_enabled = listener_proxy_protocol;
tokio::spawn(async move {
@@ -996,6 +1033,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
me_pool,
tls_cache,
ip_tracker,
beobachten,
proxy_protocol_enabled,
)
.run()