From 31f6258c4794c3eadd7c9245ca18bfb1013706b8 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 18 Mar 2026 13:54:59 +0300 Subject: [PATCH 01/11] Hot-Reload fixes Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/hot_reload.rs | 111 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 109 insertions(+), 2 deletions(-) diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index c0ca98d..fdf06fa 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -39,6 +39,7 @@ use super::load::{LoadedConfig, ProxyConfig}; const HOT_RELOAD_STABLE_SNAPSHOTS: u8 = 2; const HOT_RELOAD_DEBOUNCE: Duration = Duration::from_millis(50); +const HOT_RELOAD_STABLE_RECHECK: Duration = Duration::from_millis(75); // ── Hot fields ──────────────────────────────────────────────────────────────── @@ -379,6 +380,14 @@ impl ReloadState { self.applied_snapshot_hash = Some(hash); self.reset_candidate(); } + + fn pending_candidate(&self) -> Option<(u64, u8)> { + let hash = self.candidate_snapshot_hash?; + if self.candidate_hits < HOT_RELOAD_STABLE_SNAPSHOTS { + return Some((hash, self.candidate_hits)); + } + None + } } fn normalize_watch_path(path: &Path) -> PathBuf { @@ -1253,6 +1262,73 @@ fn reload_config( Some(next_manifest) } +async fn reload_with_internal_stable_rechecks( + config_path: &PathBuf, + config_tx: &watch::Sender>, + log_tx: &watch::Sender, + detected_ip_v4: Option, + detected_ip_v6: Option, + reload_state: &mut ReloadState, +) -> Option { + let mut next_manifest = reload_config( + config_path, + config_tx, + log_tx, + detected_ip_v4, + detected_ip_v6, + reload_state, + ); + let mut rechecks_left = HOT_RELOAD_STABLE_SNAPSHOTS.saturating_sub(1); + + while rechecks_left > 0 { + let Some((snapshot_hash, candidate_hits)) = reload_state.pending_candidate() else { + break; + }; + + info!( + snapshot_hash, + candidate_hits, + required_hits = HOT_RELOAD_STABLE_SNAPSHOTS, + rechecks_left, + recheck_delay_ms = HOT_RELOAD_STABLE_RECHECK.as_millis(), + "config reload: scheduling internal stable recheck" + ); + tokio::time::sleep(HOT_RELOAD_STABLE_RECHECK).await; + + let recheck_manifest = reload_config( + config_path, + config_tx, + log_tx, + detected_ip_v4, + detected_ip_v6, + reload_state, + ); + if recheck_manifest.is_some() { + next_manifest = recheck_manifest; + } + + if reload_state.is_applied(snapshot_hash) { + info!( + snapshot_hash, + "config reload: applied after internal stable recheck" + ); + break; + } + + if reload_state.pending_candidate().is_none() { + info!( + snapshot_hash, + "config reload: internal stable recheck aborted" + ); + break; + } + + rechecks_left = rechecks_left.saturating_sub(1); + } + + next_manifest +} + // ── Public API ──────────────────────────────────────────────────────────────── /// Spawn the hot-reload watcher task. @@ -1376,14 +1452,16 @@ pub fn spawn_config_watcher( tokio::time::sleep(HOT_RELOAD_DEBOUNCE).await; while notify_rx.try_recv().is_ok() {} - if let Some(next_manifest) = reload_config( + if let Some(next_manifest) = reload_with_internal_stable_rechecks( &config_path, &config_tx, &log_tx, detected_ip_v4, detected_ip_v6, &mut reload_state, - ) { + ) + .await + { apply_watch_manifest( inotify_watcher.as_mut(), poll_watcher.as_mut(), @@ -1540,6 +1618,35 @@ mod tests { let _ = std::fs::remove_file(path); } + #[tokio::test] + async fn reload_cycle_applies_after_single_external_event() { + let initial_tag = "10101010101010101010101010101010"; + let final_tag = "20202020202020202020202020202020"; + let path = temp_config_path("telemt_hot_reload_single_event"); + + write_reload_config(&path, Some(initial_tag), None); + let initial_cfg = Arc::new(ProxyConfig::load(&path).unwrap()); + let initial_hash = ProxyConfig::load_with_metadata(&path).unwrap().rendered_hash; + let (config_tx, _config_rx) = watch::channel(initial_cfg.clone()); + let (log_tx, _log_rx) = watch::channel(initial_cfg.general.log_level.clone()); + let mut reload_state = ReloadState::new(Some(initial_hash)); + + write_reload_config(&path, Some(final_tag), None); + reload_with_internal_stable_rechecks( + &path, + &config_tx, + &log_tx, + None, + None, + &mut reload_state, + ) + .await + .unwrap(); + + assert_eq!(config_tx.borrow().general.ad_tag.as_deref(), Some(final_tag)); + let _ = std::fs::remove_file(path); + } + #[test] fn reload_keeps_hot_apply_when_non_hot_fields_change() { let initial_tag = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; From 85295a9961bb90b8bd6c7452aed84337f029c6a5 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 18 Mar 2026 13:58:27 +0300 Subject: [PATCH 02/11] Update Cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 788bc2e..eba2e2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.3.21" +version = "3.3.22" edition = "2024" [dependencies] From 193545525650a0d960cecf950b97ac05e5028ff2 Mon Sep 17 00:00:00 2001 From: Dimasssss Date: Wed, 18 Mar 2026 18:20:23 +0300 Subject: [PATCH 03/11] Update CONFIG_PARAMS.en.md --- docs/CONFIG_PARAMS.en.md | 491 ++++++++++++++++++++------------------- 1 file changed, 248 insertions(+), 243 deletions(-) diff --git a/docs/CONFIG_PARAMS.en.md b/docs/CONFIG_PARAMS.en.md index ed89b3d..90da08a 100644 --- a/docs/CONFIG_PARAMS.en.md +++ b/docs/CONFIG_PARAMS.en.md @@ -8,282 +8,287 @@ This document lists all configuration keys accepted by `config.toml`. ## Top-level keys -| Parameter | Type | Description | -|---|---|---| -| include | `String` (special directive) | Includes another TOML file with `include = "relative/or/absolute/path.toml"`; includes are processed recursively before parsing. | -| show_link | `"*" \| String[]` | Legacy top-level link visibility selector (`"*"` for all users or explicit usernames list). | -| dc_overrides | `Map` | Overrides DC endpoints for non-standard DCs; key is DC id string, value is `ip:port` list. | -| default_dc | `u8` | Default DC index used for unmapped non-standard DCs. | +| Parameter | Type | Default | Constraints / validation | Description | +|---|---|---|---|---| +| include | `String` (special directive) | `null` | — | Includes another TOML file with `include = "relative/or/absolute/path.toml"`; includes are processed recursively before parsing. | +| show_link | `"*" \| String[]` | `[]` (`ShowLink::None`) | — | Legacy top-level link visibility selector (`"*"` for all users or explicit usernames list). | +| dc_overrides | `Map` | `{}` | — | Overrides DC endpoints for non-standard DCs; key is DC id string, value is `ip:port` list. | +| default_dc | `u8 \| null` | `null` (effective fallback: `2` in ME routing) | — | Default DC index used for unmapped non-standard DCs. | ## [general] -| Parameter | Type | Description | -|---|---|---| -| data_path | `String` | Optional runtime data directory path. | -| prefer_ipv6 | `bool` | Prefer IPv6 where applicable in runtime logic. | -| fast_mode | `bool` | Enables fast-path optimizations for traffic processing. | -| use_middle_proxy | `bool` | Enables Middle Proxy mode. | -| proxy_secret_path | `String` | Path to proxy secret binary; can be auto-downloaded if absent. | -| proxy_config_v4_cache_path | `String` | Optional cache path for raw `getProxyConfig` (IPv4) snapshot. | -| proxy_config_v6_cache_path | `String` | Optional cache path for raw `getProxyConfigV6` (IPv6) snapshot. | -| ad_tag | `String` | Global fallback ad tag (32 hex characters). | -| middle_proxy_nat_ip | `IpAddr` | Explicit public IP override for NAT environments. | -| middle_proxy_nat_probe | `bool` | Enables NAT probing for Middle Proxy KDF/public address discovery. | -| middle_proxy_nat_stun | `String` | Deprecated legacy single STUN server for NAT probing. | -| middle_proxy_nat_stun_servers | `String[]` | Deprecated legacy STUN list for NAT probing fallback. | -| stun_nat_probe_concurrency | `usize` | Maximum concurrent STUN probes during NAT detection. | -| middle_proxy_pool_size | `usize` | Target size of active Middle Proxy writer pool. | -| middle_proxy_warm_standby | `usize` | Number of warm standby Middle-End connections. | -| me_init_retry_attempts | `u32` | Startup retries for ME pool initialization (`0` means unlimited). | -| me2dc_fallback | `bool` | Allows fallback from ME mode to direct DC when ME startup fails. | -| me_keepalive_enabled | `bool` | Enables ME keepalive padding frames. | -| me_keepalive_interval_secs | `u64` | Keepalive interval in seconds. | -| me_keepalive_jitter_secs | `u64` | Keepalive jitter in seconds. | -| me_keepalive_payload_random | `bool` | Randomizes keepalive payload bytes instead of zero payload. | -| rpc_proxy_req_every | `u64` | Interval for service `RPC_PROXY_REQ` activity signals (`0` disables). | -| me_writer_cmd_channel_capacity | `usize` | Capacity of per-writer command channel. | -| me_route_channel_capacity | `usize` | Capacity of per-connection ME response route channel. | -| me_c2me_channel_capacity | `usize` | Capacity of per-client command queue (client reader -> ME sender). | -| me_reader_route_data_wait_ms | `u64` | Bounded wait for routing ME DATA to per-connection queue (`0` = no wait). | -| me_d2c_flush_batch_max_frames | `usize` | Max ME->client frames coalesced before flush. | -| me_d2c_flush_batch_max_bytes | `usize` | Max ME->client payload bytes coalesced before flush. | -| me_d2c_flush_batch_max_delay_us | `u64` | Max microsecond wait for coalescing more ME->client frames (`0` disables timed coalescing). | -| me_d2c_ack_flush_immediate | `bool` | Flushes client writer immediately after quick-ack write. | -| direct_relay_copy_buf_c2s_bytes | `usize` | Copy buffer size for client->DC direction in direct relay. | -| direct_relay_copy_buf_s2c_bytes | `usize` | Copy buffer size for DC->client direction in direct relay. | -| crypto_pending_buffer | `usize` | Max pending ciphertext buffer per client writer (bytes). | -| max_client_frame | `usize` | Maximum allowed client MTProto frame size (bytes). | -| desync_all_full | `bool` | Emits full crypto-desync forensic logs for every event. | -| beobachten | `bool` | Enables per-IP forensic observation buckets. | -| beobachten_minutes | `u64` | Retention window (minutes) for per-IP observation buckets. | -| beobachten_flush_secs | `u64` | Snapshot flush interval (seconds) for observation output file. | -| beobachten_file | `String` | Observation snapshot output file path. | -| hardswap | `bool` | Enables hard-swap generation switching for ME pool updates. | -| me_warmup_stagger_enabled | `bool` | Enables staggered warmup for extra ME writers. | -| me_warmup_step_delay_ms | `u64` | Base delay between warmup connections (ms). | -| me_warmup_step_jitter_ms | `u64` | Jitter for warmup delay (ms). | -| me_reconnect_max_concurrent_per_dc | `u32` | Max concurrent reconnect attempts per DC. | -| me_reconnect_backoff_base_ms | `u64` | Base reconnect backoff in ms. | -| me_reconnect_backoff_cap_ms | `u64` | Cap reconnect backoff in ms. | -| me_reconnect_fast_retry_count | `u32` | Number of fast retry attempts before backoff. | -| me_single_endpoint_shadow_writers | `u8` | Additional reserve writers for one-endpoint DC groups. | -| me_single_endpoint_outage_mode_enabled | `bool` | Enables aggressive outage recovery for one-endpoint DC groups. | -| me_single_endpoint_outage_disable_quarantine | `bool` | Ignores endpoint quarantine in one-endpoint outage mode. | -| me_single_endpoint_outage_backoff_min_ms | `u64` | Minimum reconnect backoff in outage mode (ms). | -| me_single_endpoint_outage_backoff_max_ms | `u64` | Maximum reconnect backoff in outage mode (ms). | -| me_single_endpoint_shadow_rotate_every_secs | `u64` | Periodic shadow writer rotation interval (`0` disables). | -| me_floor_mode | `"static" \| "adaptive"` | Writer floor policy mode. | -| me_adaptive_floor_idle_secs | `u64` | Idle time before adaptive floor may reduce one-endpoint target. | -| me_adaptive_floor_min_writers_single_endpoint | `u8` | Minimum adaptive writer target for one-endpoint DC groups. | -| me_adaptive_floor_min_writers_multi_endpoint | `u8` | Minimum adaptive writer target for multi-endpoint DC groups. | -| me_adaptive_floor_recover_grace_secs | `u64` | Grace period to hold static floor after activity. | -| me_adaptive_floor_writers_per_core_total | `u16` | Global writer budget per logical CPU core in adaptive mode. | -| me_adaptive_floor_cpu_cores_override | `u16` | Manual CPU core count override (`0` uses auto-detection). | -| me_adaptive_floor_max_extra_writers_single_per_core | `u16` | Per-core max extra writers above base floor for one-endpoint DCs. | -| me_adaptive_floor_max_extra_writers_multi_per_core | `u16` | Per-core max extra writers above base floor for multi-endpoint DCs. | -| me_adaptive_floor_max_active_writers_per_core | `u16` | Hard cap for active ME writers per logical CPU core. | -| me_adaptive_floor_max_warm_writers_per_core | `u16` | Hard cap for warm ME writers per logical CPU core. | -| me_adaptive_floor_max_active_writers_global | `u32` | Hard global cap for active ME writers. | -| me_adaptive_floor_max_warm_writers_global | `u32` | Hard global cap for warm ME writers. | -| upstream_connect_retry_attempts | `u32` | Connect attempts for selected upstream before error/fallback. | -| upstream_connect_retry_backoff_ms | `u64` | Delay between upstream connect attempts (ms). | -| upstream_connect_budget_ms | `u64` | Total wall-clock budget for one upstream connect request (ms). | -| upstream_unhealthy_fail_threshold | `u32` | Consecutive failed requests before upstream is marked unhealthy. | -| upstream_connect_failfast_hard_errors | `bool` | Skips additional retries for hard non-transient connect errors. | -| stun_iface_mismatch_ignore | `bool` | Ignores STUN/interface mismatch and keeps Middle Proxy mode. | -| unknown_dc_log_path | `String` | File path for unknown-DC request logging (`null` disables file path). | -| unknown_dc_file_log_enabled | `bool` | Enables unknown-DC file logging. | -| log_level | `"debug" \| "verbose" \| "normal" \| "silent"` | Runtime logging verbosity. | -| disable_colors | `bool` | Disables ANSI colors in logs. | -| me_socks_kdf_policy | `"strict" \| "compat"` | SOCKS-bound KDF fallback policy for ME handshake. | -| me_route_backpressure_base_timeout_ms | `u64` | Base backpressure timeout for route-channel send (ms). | -| me_route_backpressure_high_timeout_ms | `u64` | High backpressure timeout when queue occupancy exceeds watermark (ms). | -| me_route_backpressure_high_watermark_pct | `u8` | Queue occupancy threshold (%) for high timeout mode. | -| me_health_interval_ms_unhealthy | `u64` | Health monitor interval while writer coverage is degraded (ms). | -| me_health_interval_ms_healthy | `u64` | Health monitor interval while writer coverage is healthy (ms). | -| me_admission_poll_ms | `u64` | Poll interval for conditional-admission checks (ms). | -| me_warn_rate_limit_ms | `u64` | Cooldown for repetitive ME warning logs (ms). | -| me_route_no_writer_mode | `"async_recovery_failfast" \| "inline_recovery_legacy" \| "hybrid_async_persistent"` | Route behavior when no writer is immediately available. | -| me_route_no_writer_wait_ms | `u64` | Max wait in async-recovery failfast mode (ms). | -| me_route_inline_recovery_attempts | `u32` | Inline recovery attempts in legacy mode. | -| me_route_inline_recovery_wait_ms | `u64` | Max inline recovery wait in legacy mode (ms). | -| fast_mode_min_tls_record | `usize` | Minimum TLS record size when fast-mode coalescing is enabled (`0` disables). | -| update_every | `u64` | Unified interval for config/secret updater tasks. | -| me_reinit_every_secs | `u64` | Periodic ME pool reinitialization interval (seconds). | -| me_hardswap_warmup_delay_min_ms | `u64` | Minimum delay between hardswap warmup connects (ms). | -| me_hardswap_warmup_delay_max_ms | `u64` | Maximum delay between hardswap warmup connects (ms). | -| me_hardswap_warmup_extra_passes | `u8` | Additional warmup passes per hardswap cycle. | -| me_hardswap_warmup_pass_backoff_base_ms | `u64` | Base backoff between hardswap warmup passes (ms). | -| me_config_stable_snapshots | `u8` | Number of identical config snapshots required before apply. | -| me_config_apply_cooldown_secs | `u64` | Cooldown between applied ME map updates (seconds). | -| me_snapshot_require_http_2xx | `bool` | Requires 2xx HTTP responses for applying config snapshots. | -| me_snapshot_reject_empty_map | `bool` | Rejects empty config snapshots. | -| me_snapshot_min_proxy_for_lines | `u32` | Minimum parsed `proxy_for` rows required to accept snapshot. | -| proxy_secret_stable_snapshots | `u8` | Number of identical secret snapshots required before runtime rotation. | -| proxy_secret_rotate_runtime | `bool` | Enables runtime proxy-secret rotation from remote source. | -| me_secret_atomic_snapshot | `bool` | Keeps selector and secret bytes from the same snapshot atomically. | -| proxy_secret_len_max | `usize` | Maximum allowed proxy-secret length (bytes). | -| me_pool_drain_ttl_secs | `u64` | Drain TTL for stale ME writers after endpoint-map changes (seconds). | -| me_pool_drain_threshold | `u64` | Max draining stale writers before batch force-close (`0` disables threshold cleanup). | -| me_bind_stale_mode | `"never" \| "ttl" \| "always"` | Policy for new binds on stale draining writers. | -| me_bind_stale_ttl_secs | `u64` | TTL for stale bind allowance when stale mode is `ttl`. | -| me_pool_min_fresh_ratio | `f32` | Minimum desired-DC fresh coverage ratio before draining stale writers. | -| me_reinit_drain_timeout_secs | `u64` | Force-close timeout for stale writers after endpoint-map changes (`0` disables force-close). | -| proxy_secret_auto_reload_secs | `u64` | Deprecated legacy secret reload interval (fallback when `update_every` is not set). | -| proxy_config_auto_reload_secs | `u64` | Deprecated legacy config reload interval (fallback when `update_every` is not set). | -| me_reinit_singleflight | `bool` | Serializes ME reinit cycles across trigger sources. | -| me_reinit_trigger_channel | `usize` | Trigger queue capacity for reinit scheduler. | -| me_reinit_coalesce_window_ms | `u64` | Trigger coalescing window before starting reinit (ms). | -| me_deterministic_writer_sort | `bool` | Enables deterministic candidate sort for writer binding path. | -| me_writer_pick_mode | `"sorted_rr" \| "p2c"` | Writer selection mode for route bind path. | -| me_writer_pick_sample_size | `u8` | Number of candidates sampled by picker in `p2c` mode. | -| ntp_check | `bool` | Enables NTP drift check at startup. | -| ntp_servers | `String[]` | NTP servers used for drift check. | -| auto_degradation_enabled | `bool` | Enables automatic degradation from ME to direct DC. | -| degradation_min_unavailable_dc_groups | `u8` | Minimum unavailable ME DC groups required before degrading. | +| Parameter | Type | Default | Constraints / validation | Description | +|---|---|---|---|---| +| data_path | `String \| null` | `null` | — | Optional runtime data directory path. | +| prefer_ipv6 | `bool` | `false` | — | Prefer IPv6 where applicable in runtime logic. | +| fast_mode | `bool` | `true` | — | Enables fast-path optimizations for traffic processing. | +| use_middle_proxy | `bool` | `true` | none | Enables ME transport mode; if `false`, runtime falls back to direct DC routing. | +| proxy_secret_path | `String \| null` | `"proxy-secret"` | Path may be `null`. | Path to Telegram infrastructure proxy-secret file used by ME handshake logic. | +| proxy_config_v4_cache_path | `String \| null` | `"cache/proxy-config-v4.txt"` | — | Optional cache path for raw `getProxyConfig` (IPv4) snapshot. | +| proxy_config_v6_cache_path | `String \| null` | `"cache/proxy-config-v6.txt"` | — | Optional cache path for raw `getProxyConfigV6` (IPv6) snapshot. | +| ad_tag | `String \| null` | `null` | — | Global fallback ad tag (32 hex characters). | +| middle_proxy_nat_ip | `IpAddr \| null` | `null` | Must be a valid IP when set. | Manual public NAT IP override used as ME address material when set. | +| middle_proxy_nat_probe | `bool` | `true` | Auto-forced to `true` when `use_middle_proxy = true`. | Enables ME NAT probing; runtime may force it on when ME mode is active. | +| middle_proxy_nat_stun | `String \| null` | `null` | Deprecated. Use `network.stun_servers`. | Deprecated legacy single STUN server for NAT probing. | +| middle_proxy_nat_stun_servers | `String[]` | `[]` | Deprecated. Use `network.stun_servers`. | Deprecated legacy STUN list for NAT probing fallback. | +| stun_nat_probe_concurrency | `usize` | `8` | Must be `> 0`. | Maximum number of parallel STUN probes during NAT/public endpoint discovery. | +| middle_proxy_pool_size | `usize` | `8` | none | Target size of active ME writer pool. | +| middle_proxy_warm_standby | `usize` | `16` | none | Reserved compatibility field in current runtime revision. | +| me_init_retry_attempts | `u32` | `0` | `0..=1_000_000`. | Startup retries for ME pool initialization (`0` means unlimited). | +| me2dc_fallback | `bool` | `true` | — | Allows fallback from ME mode to direct DC when ME startup fails. | +| me_keepalive_enabled | `bool` | `true` | none | Enables periodic ME keepalive/ping traffic. | +| me_keepalive_interval_secs | `u64` | `8` | none | Base ME keepalive interval in seconds. | +| me_keepalive_jitter_secs | `u64` | `2` | none | Keepalive jitter in seconds to reduce synchronized bursts. | +| me_keepalive_payload_random | `bool` | `true` | none | Randomizes keepalive payload bytes instead of fixed zero payload. | +| rpc_proxy_req_every | `u64` | `0` | `0` or `10..=300`. | Interval for service `RPC_PROXY_REQ` activity signals (`0` disables). | +| me_writer_cmd_channel_capacity | `usize` | `4096` | Must be `> 0`. | Capacity of per-writer command channel. | +| me_route_channel_capacity | `usize` | `768` | Must be `> 0`. | Capacity of per-connection ME response route channel. | +| me_c2me_channel_capacity | `usize` | `1024` | Must be `> 0`. | Capacity of per-client command queue (client reader -> ME sender). | +| me_reader_route_data_wait_ms | `u64` | `2` | `0..=20`. | Bounded wait for routing ME DATA to per-connection queue (`0` = no wait). | +| me_d2c_flush_batch_max_frames | `usize` | `32` | `1..=512`. | Max ME->client frames coalesced before flush. | +| me_d2c_flush_batch_max_bytes | `usize` | `131072` | `4096..=2_097_152`. | Max ME->client payload bytes coalesced before flush. | +| me_d2c_flush_batch_max_delay_us | `u64` | `500` | `0..=5000`. | Max microsecond wait for coalescing more ME->client frames (`0` disables timed coalescing). | +| me_d2c_ack_flush_immediate | `bool` | `true` | — | Flushes client writer immediately after quick-ack write. | +| direct_relay_copy_buf_c2s_bytes | `usize` | `65536` | `4096..=1_048_576`. | Copy buffer size for client->DC direction in direct relay. | +| direct_relay_copy_buf_s2c_bytes | `usize` | `262144` | `8192..=2_097_152`. | Copy buffer size for DC->client direction in direct relay. | +| crypto_pending_buffer | `usize` | `262144` | — | Max pending ciphertext buffer per client writer (bytes). | +| max_client_frame | `usize` | `16777216` | — | Maximum allowed client MTProto frame size (bytes). | +| desync_all_full | `bool` | `false` | — | Emits full crypto-desync forensic logs for every event. | +| beobachten | `bool` | `true` | — | Enables per-IP forensic observation buckets. | +| beobachten_minutes | `u64` | `10` | Must be `> 0`. | Retention window (minutes) for per-IP observation buckets. | +| beobachten_flush_secs | `u64` | `15` | Must be `> 0`. | Snapshot flush interval (seconds) for observation output file. | +| beobachten_file | `String` | `"cache/beobachten.txt"` | — | Observation snapshot output file path. | +| hardswap | `bool` | `true` | none | Enables generation-based ME hardswap strategy. | +| me_warmup_stagger_enabled | `bool` | `true` | none | Staggers extra ME warmup dials to avoid connection spikes. | +| me_warmup_step_delay_ms | `u64` | `500` | none | Base delay in milliseconds between warmup dial steps. | +| me_warmup_step_jitter_ms | `u64` | `300` | none | Additional random delay in milliseconds for warmup steps. | +| me_reconnect_max_concurrent_per_dc | `u32` | `8` | none | Limits concurrent reconnect workers per DC during health recovery. | +| me_reconnect_backoff_base_ms | `u64` | `500` | none | Initial reconnect backoff in milliseconds. | +| me_reconnect_backoff_cap_ms | `u64` | `30000` | none | Maximum reconnect backoff cap in milliseconds. | +| me_reconnect_fast_retry_count | `u32` | `16` | none | Immediate retry budget before long backoff behavior applies. | +| me_single_endpoint_shadow_writers | `u8` | `2` | `0..=32`. | Additional reserve writers for one-endpoint DC groups. | +| me_single_endpoint_outage_mode_enabled | `bool` | `true` | — | Enables aggressive outage recovery for one-endpoint DC groups. | +| me_single_endpoint_outage_disable_quarantine | `bool` | `true` | — | Ignores endpoint quarantine in one-endpoint outage mode. | +| me_single_endpoint_outage_backoff_min_ms | `u64` | `250` | Must be `> 0`; also `<= me_single_endpoint_outage_backoff_max_ms`. | Minimum reconnect backoff in outage mode (ms). | +| me_single_endpoint_outage_backoff_max_ms | `u64` | `3000` | Must be `> 0`; also `>= me_single_endpoint_outage_backoff_min_ms`. | Maximum reconnect backoff in outage mode (ms). | +| me_single_endpoint_shadow_rotate_every_secs | `u64` | `900` | — | Periodic shadow writer rotation interval (`0` disables). | +| me_floor_mode | `"static" \| "adaptive"` | `"adaptive"` | — | Writer floor policy mode. | +| me_adaptive_floor_idle_secs | `u64` | `90` | — | Idle time before adaptive floor may reduce one-endpoint target. | +| me_adaptive_floor_min_writers_single_endpoint | `u8` | `1` | `1..=32`. | Minimum adaptive writer target for one-endpoint DC groups. | +| me_adaptive_floor_min_writers_multi_endpoint | `u8` | `1` | `1..=32`. | Minimum adaptive writer target for multi-endpoint DC groups. | +| me_adaptive_floor_recover_grace_secs | `u64` | `180` | — | Grace period to hold static floor after activity. | +| me_adaptive_floor_writers_per_core_total | `u16` | `48` | Must be `> 0`. | Global writer budget per logical CPU core in adaptive mode. | +| me_adaptive_floor_cpu_cores_override | `u16` | `0` | — | Manual CPU core count override (`0` uses auto-detection). | +| me_adaptive_floor_max_extra_writers_single_per_core | `u16` | `1` | — | Per-core max extra writers above base floor for one-endpoint DCs. | +| me_adaptive_floor_max_extra_writers_multi_per_core | `u16` | `2` | — | Per-core max extra writers above base floor for multi-endpoint DCs. | +| me_adaptive_floor_max_active_writers_per_core | `u16` | `64` | Must be `> 0`. | Hard cap for active ME writers per logical CPU core. | +| me_adaptive_floor_max_warm_writers_per_core | `u16` | `64` | Must be `> 0`. | Hard cap for warm ME writers per logical CPU core. | +| me_adaptive_floor_max_active_writers_global | `u32` | `256` | Must be `> 0`. | Hard global cap for active ME writers. | +| me_adaptive_floor_max_warm_writers_global | `u32` | `256` | Must be `> 0`. | Hard global cap for warm ME writers. | +| upstream_connect_retry_attempts | `u32` | `2` | Must be `> 0`. | Connect attempts for selected upstream before error/fallback. | +| upstream_connect_retry_backoff_ms | `u64` | `100` | — | Delay between upstream connect attempts (ms). | +| upstream_connect_budget_ms | `u64` | `3000` | Must be `> 0`. | Total wall-clock budget for one upstream connect request (ms). | +| upstream_unhealthy_fail_threshold | `u32` | `5` | Must be `> 0`. | Consecutive failed requests before upstream is marked unhealthy. | +| upstream_connect_failfast_hard_errors | `bool` | `false` | — | Skips additional retries for hard non-transient connect errors. | +| stun_iface_mismatch_ignore | `bool` | `false` | none | Reserved compatibility flag in current runtime revision. | +| unknown_dc_log_path | `String \| null` | `"unknown-dc.txt"` | — | File path for unknown-DC request logging (`null` disables file path). | +| unknown_dc_file_log_enabled | `bool` | `false` | — | Enables unknown-DC file logging. | +| log_level | `"debug" \| "verbose" \| "normal" \| "silent"` | `"normal"` | — | Runtime logging verbosity. | +| disable_colors | `bool` | `false` | — | Disables ANSI colors in logs. | +| me_socks_kdf_policy | `"strict" \| "compat"` | `"strict"` | — | SOCKS-bound KDF fallback policy for ME handshake. | +| me_route_backpressure_base_timeout_ms | `u64` | `25` | Must be `> 0`. | Base backpressure timeout for route-channel send (ms). | +| me_route_backpressure_high_timeout_ms | `u64` | `120` | Must be `>= me_route_backpressure_base_timeout_ms`. | High backpressure timeout when queue occupancy exceeds watermark (ms). | +| me_route_backpressure_high_watermark_pct | `u8` | `80` | `1..=100`. | Queue occupancy threshold (%) for high timeout mode. | +| me_health_interval_ms_unhealthy | `u64` | `1000` | Must be `> 0`. | Health monitor interval while writer coverage is degraded (ms). | +| me_health_interval_ms_healthy | `u64` | `3000` | Must be `> 0`. | Health monitor interval while writer coverage is healthy (ms). | +| me_admission_poll_ms | `u64` | `1000` | Must be `> 0`. | Poll interval for conditional-admission checks (ms). | +| me_warn_rate_limit_ms | `u64` | `5000` | Must be `> 0`. | Cooldown for repetitive ME warning logs (ms). | +| me_route_no_writer_mode | `"async_recovery_failfast" \| "inline_recovery_legacy" \| "hybrid_async_persistent"` | `"hybrid_async_persistent"` | — | Route behavior when no writer is immediately available. | +| me_route_no_writer_wait_ms | `u64` | `250` | `10..=5000`. | Max wait in async-recovery failfast mode (ms). | +| me_route_inline_recovery_attempts | `u32` | `3` | Must be `> 0`. | Inline recovery attempts in legacy mode. | +| me_route_inline_recovery_wait_ms | `u64` | `3000` | `10..=30000`. | Max inline recovery wait in legacy mode (ms). | +| fast_mode_min_tls_record | `usize` | `0` | — | Minimum TLS record size when fast-mode coalescing is enabled (`0` disables). | +| update_every | `u64 \| null` | `300` | If set: must be `> 0`; if `null`: legacy fallback path is used. | Unified refresh interval for ME config and proxy-secret updater tasks. | +| me_reinit_every_secs | `u64` | `900` | Must be `> 0`. | Periodic interval for zero-downtime ME reinit cycle. | +| me_hardswap_warmup_delay_min_ms | `u64` | `1000` | Must be `<= me_hardswap_warmup_delay_max_ms`. | Lower bound for hardswap warmup dial spacing. | +| me_hardswap_warmup_delay_max_ms | `u64` | `2000` | Must be `> 0`. | Upper bound for hardswap warmup dial spacing. | +| me_hardswap_warmup_extra_passes | `u8` | `3` | Must be within `[0, 10]`. | Additional warmup passes after the base pass in one hardswap cycle. | +| me_hardswap_warmup_pass_backoff_base_ms | `u64` | `500` | Must be `> 0`. | Base backoff between extra hardswap warmup passes. | +| me_config_stable_snapshots | `u8` | `2` | Must be `> 0`. | Number of identical ME config snapshots required before apply. | +| me_config_apply_cooldown_secs | `u64` | `300` | none | Cooldown between applied ME endpoint-map updates. | +| me_snapshot_require_http_2xx | `bool` | `true` | — | Requires 2xx HTTP responses for applying config snapshots. | +| me_snapshot_reject_empty_map | `bool` | `true` | — | Rejects empty config snapshots. | +| me_snapshot_min_proxy_for_lines | `u32` | `1` | Must be `> 0`. | Minimum parsed `proxy_for` rows required to accept snapshot. | +| proxy_secret_stable_snapshots | `u8` | `2` | Must be `> 0`. | Number of identical proxy-secret snapshots required before rotation. | +| proxy_secret_rotate_runtime | `bool` | `true` | none | Enables runtime proxy-secret rotation from updater snapshots. | +| me_secret_atomic_snapshot | `bool` | `true` | — | Keeps selector and secret bytes from the same snapshot atomically. | +| proxy_secret_len_max | `usize` | `256` | Must be within `[32, 4096]`. | Upper length limit for accepted proxy-secret bytes. | +| me_pool_drain_ttl_secs | `u64` | `90` | none | Time window where stale writers remain fallback-eligible after map change. | +| me_pool_drain_threshold | `u64` | `128` | — | Max draining stale writers before batch force-close (`0` disables threshold cleanup). | +| me_pool_drain_soft_evict_enabled | `bool` | `true` | — | Enables gradual soft-eviction of stale writers during drain/reinit instead of immediate hard close. | +| me_pool_drain_soft_evict_grace_secs | `u64` | `30` | `0..=3600`. | Grace period before stale writers become soft-evict candidates. | +| me_pool_drain_soft_evict_per_writer | `u8` | `1` | `1..=16`. | Maximum stale routes soft-evicted per writer in one eviction pass. | +| me_pool_drain_soft_evict_budget_per_core | `u16` | `8` | `1..=64`. | Per-core budget limiting aggregate soft-eviction work per pass. | +| me_pool_drain_soft_evict_cooldown_ms | `u64` | `5000` | Must be `> 0`. | Cooldown between consecutive soft-eviction passes (ms). | +| me_bind_stale_mode | `"never" \| "ttl" \| "always"` | `"ttl"` | — | Policy for new binds on stale draining writers. | +| me_bind_stale_ttl_secs | `u64` | `90` | — | TTL for stale bind allowance when stale mode is `ttl`. | +| me_pool_min_fresh_ratio | `f32` | `0.8` | Must be within `[0.0, 1.0]`. | Minimum fresh desired-DC coverage ratio before stale writers are drained. | +| me_reinit_drain_timeout_secs | `u64` | `120` | `0` disables force-close; if `> 0` and `< me_pool_drain_ttl_secs`, runtime bumps it to TTL. | Force-close timeout for draining stale writers (`0` keeps indefinite draining). | +| proxy_secret_auto_reload_secs | `u64` | `3600` | Deprecated. Use `general.update_every`. | Deprecated legacy secret reload interval (fallback when `update_every` is not set). | +| proxy_config_auto_reload_secs | `u64` | `3600` | Deprecated. Use `general.update_every`. | Deprecated legacy config reload interval (fallback when `update_every` is not set). | +| me_reinit_singleflight | `bool` | `true` | — | Serializes ME reinit cycles across trigger sources. | +| me_reinit_trigger_channel | `usize` | `64` | Must be `> 0`. | Trigger queue capacity for reinit scheduler. | +| me_reinit_coalesce_window_ms | `u64` | `200` | — | Trigger coalescing window before starting reinit (ms). | +| me_deterministic_writer_sort | `bool` | `true` | — | Enables deterministic candidate sort for writer binding path. | +| me_writer_pick_mode | `"sorted_rr" \| "p2c"` | `"p2c"` | — | Writer selection mode for route bind path. | +| me_writer_pick_sample_size | `u8` | `3` | `2..=4`. | Number of candidates sampled by picker in `p2c` mode. | +| ntp_check | `bool` | `true` | — | Enables NTP drift check at startup. | +| ntp_servers | `String[]` | `["pool.ntp.org"]` | — | NTP servers used for drift check. | +| auto_degradation_enabled | `bool` | `true` | none | Reserved compatibility flag in current runtime revision. | +| degradation_min_unavailable_dc_groups | `u8` | `2` | none | Reserved compatibility threshold in current runtime revision. | ## [general.modes] -| Parameter | Type | Description | -|---|---|---| -| classic | `bool` | Enables classic MTProxy mode. | -| secure | `bool` | Enables secure mode. | -| tls | `bool` | Enables TLS mode. | +| Parameter | Type | Default | Constraints / validation | Description | +|---|---|---|---|---| +| classic | `bool` | `false` | — | Enables classic MTProxy mode. | +| secure | `bool` | `false` | — | Enables secure mode. | +| tls | `bool` | `true` | — | Enables TLS mode. | ## [general.links] -| Parameter | Type | Description | -|---|---|---| -| show | `"*" \| String[]` | Selects users whose tg:// links are shown at startup. | -| public_host | `String` | Public hostname/IP override for generated tg:// links. | -| public_port | `u16` | Public port override for generated tg:// links. | +| Parameter | Type | Default | Constraints / validation | Description | +|---|---|---|---|---| +| show | `"*" \| String[]` | `"*"` | — | Selects users whose tg:// links are shown at startup. | +| public_host | `String \| null` | `null` | — | Public hostname/IP override for generated tg:// links. | +| public_port | `u16 \| null` | `null` | — | Public port override for generated tg:// links. | ## [general.telemetry] -| Parameter | Type | Description | -|---|---|---| -| core_enabled | `bool` | Enables core hot-path telemetry counters. | -| user_enabled | `bool` | Enables per-user telemetry counters. | -| me_level | `"silent" \| "normal" \| "debug"` | Middle-End telemetry verbosity level. | +| Parameter | Type | Default | Constraints / validation | Description | +|---|---|---|---|---| +| core_enabled | `bool` | `true` | — | Enables core hot-path telemetry counters. | +| user_enabled | `bool` | `true` | — | Enables per-user telemetry counters. | +| me_level | `"silent" \| "normal" \| "debug"` | `"normal"` | — | Middle-End telemetry verbosity level. | ## [network] -| Parameter | Type | Description | -|---|---|---| -| ipv4 | `bool` | Enables IPv4 networking. | -| ipv6 | `bool` | Enables/disables IPv6 (`null` = auto-detect availability). | -| prefer | `u8` | Preferred IP family for selection (`4` or `6`). | -| multipath | `bool` | Enables multipath behavior where supported. | -| stun_use | `bool` | Global switch for STUN probing. | -| stun_servers | `String[]` | STUN server list for public IP detection. | -| stun_tcp_fallback | `bool` | Enables TCP STUN fallback when UDP STUN is blocked. | -| http_ip_detect_urls | `String[]` | HTTP endpoints used as fallback public IP detectors. | -| cache_public_ip_path | `String` | File path for caching detected public IP. | -| dns_overrides | `String[]` | Runtime DNS overrides in `host:port:ip` format. | +| Parameter | Type | Default | Constraints / validation | Description | +|---|---|---|---|---| +| ipv4 | `bool` | `true` | — | Enables IPv4 networking. | +| ipv6 | `bool` | `false` | — | Enables/disables IPv6 when set | +| prefer | `u8` | `4` | Must be `4` or `6`. | Preferred IP family for selection (`4` or `6`). | +| multipath | `bool` | `false` | — | Enables multipath behavior where supported. | +| stun_use | `bool` | `true` | none | Global STUN switch; when `false`, STUN probing path is disabled. | +| stun_servers | `String[]` | Built-in STUN list (13 hosts) | Deduplicated; empty values are removed. | Primary STUN server list for NAT/public endpoint discovery. | +| stun_tcp_fallback | `bool` | `true` | none | Enables TCP fallback for STUN when UDP path is blocked. | +| http_ip_detect_urls | `String[]` | `["https://ifconfig.me/ip", "https://api.ipify.org"]` | none | HTTP fallback endpoints for public IP detection when STUN is unavailable. | +| cache_public_ip_path | `String` | `"cache/public_ip.txt"` | — | File path for caching detected public IP. | +| dns_overrides | `String[]` | `[]` | Must match `host:port:ip`; IPv6 must be bracketed. | Runtime DNS overrides in `host:port:ip` format. | ## [server] -| Parameter | Type | Description | -|---|---|---| -| port | `u16` | Main proxy listen port. | -| listen_addr_ipv4 | `String` | IPv4 bind address for TCP listener. | -| listen_addr_ipv6 | `String` | IPv6 bind address for TCP listener. | -| listen_unix_sock | `String` | Unix socket path for listener. | -| listen_unix_sock_perm | `String` | Unix socket permissions in octal string (e.g., `"0666"`). | -| listen_tcp | `bool` | Explicit TCP listener enable/disable override. | -| proxy_protocol | `bool` | Enables HAProxy PROXY protocol parsing on incoming client connections. | -| proxy_protocol_header_timeout_ms | `u64` | Timeout for PROXY protocol header read/parse (ms). | -| metrics_port | `u16` | Metrics endpoint port (enables metrics listener). | -| metrics_listen | `String` | Full metrics bind address (`IP:PORT`), overrides `metrics_port`. | -| metrics_whitelist | `IpNetwork[]` | CIDR whitelist for metrics endpoint access. | -| max_connections | `u32` | Max concurrent client connections (`0` = unlimited). | +| Parameter | Type | Default | Constraints / validation | Description | +|---|---|---|---|---| +| port | `u16` | `443` | — | Main proxy listen port. | +| listen_addr_ipv4 | `String \| null` | `"0.0.0.0"` | — | IPv4 bind address for TCP listener. | +| listen_addr_ipv6 | `String \| null` | `"::"` | — | IPv6 bind address for TCP listener. | +| listen_unix_sock | `String \| null` | `null` | — | Unix socket path for listener. | +| listen_unix_sock_perm | `String \| null` | `null` | — | Unix socket permissions in octal string (e.g., `"0666"`). | +| listen_tcp | `bool \| null` | `null` (auto) | — | Explicit TCP listener enable/disable override. | +| proxy_protocol | `bool` | `false` | — | Enables HAProxy PROXY protocol parsing on incoming client connections. | +| proxy_protocol_header_timeout_ms | `u64` | `500` | Must be `> 0`. | Timeout for PROXY protocol header read/parse (ms). | +| metrics_port | `u16 \| null` | `null` | — | Metrics endpoint port (enables metrics listener). | +| metrics_listen | `String \| null` | `null` | — | Full metrics bind address (`IP:PORT`), overrides `metrics_port`. | +| metrics_whitelist | `IpNetwork[]` | `["127.0.0.1/32", "::1/128"]` | — | CIDR whitelist for metrics endpoint access. | +| max_connections | `u32` | `10000` | — | Max concurrent client connections (`0` = unlimited). | ## [server.api] -| Parameter | Type | Description | -|---|---|---| -| enabled | `bool` | Enables control-plane REST API. | -| listen | `String` | API bind address in `IP:PORT` format. | -| whitelist | `IpNetwork[]` | CIDR whitelist allowed to access API. | -| auth_header | `String` | Exact expected `Authorization` header value (empty = disabled). | -| request_body_limit_bytes | `usize` | Maximum accepted HTTP request body size. | -| minimal_runtime_enabled | `bool` | Enables minimal runtime snapshots endpoint logic. | -| minimal_runtime_cache_ttl_ms | `u64` | Cache TTL for minimal runtime snapshots (ms; `0` disables cache). | -| runtime_edge_enabled | `bool` | Enables runtime edge endpoints. | -| runtime_edge_cache_ttl_ms | `u64` | Cache TTL for runtime edge aggregation payloads (ms). | -| runtime_edge_top_n | `usize` | Top-N size for edge connection leaderboard. | -| runtime_edge_events_capacity | `usize` | Ring-buffer capacity for runtime edge events. | -| read_only | `bool` | Rejects mutating API endpoints when enabled. | +| Parameter | Type | Default | Constraints / validation | Description | +|---|---|---|---|---| +| enabled | `bool` | `true` | — | Enables control-plane REST API. | +| listen | `String` | `"0.0.0.0:9091"` | Must be valid `IP:PORT`. | API bind address in `IP:PORT` format. | +| whitelist | `IpNetwork[]` | `["127.0.0.0/8"]` | — | CIDR whitelist allowed to access API. | +| auth_header | `String` | `""` | — | Exact expected `Authorization` header value (empty = disabled). | +| request_body_limit_bytes | `usize` | `65536` | Must be `> 0`. | Maximum accepted HTTP request body size. | +| minimal_runtime_enabled | `bool` | `true` | — | Enables minimal runtime snapshots endpoint logic. | +| minimal_runtime_cache_ttl_ms | `u64` | `1000` | `0..=60000`. | Cache TTL for minimal runtime snapshots (ms; `0` disables cache). | +| runtime_edge_enabled | `bool` | `false` | — | Enables runtime edge endpoints. | +| runtime_edge_cache_ttl_ms | `u64` | `1000` | `0..=60000`. | Cache TTL for runtime edge aggregation payloads (ms). | +| runtime_edge_top_n | `usize` | `10` | `1..=1000`. | Top-N size for edge connection leaderboard. | +| runtime_edge_events_capacity | `usize` | `256` | `16..=4096`. | Ring-buffer capacity for runtime edge events. | +| read_only | `bool` | `false` | — | Rejects mutating API endpoints when enabled. | ## [[server.listeners]] -| Parameter | Type | Description | -|---|---|---| -| ip | `IpAddr` | Listener bind IP. | -| announce | `String` | Public IP/domain announced in proxy links (priority over `announce_ip`). | -| announce_ip | `IpAddr` | Deprecated legacy announce IP (migrated to `announce` if needed). | -| proxy_protocol | `bool` | Per-listener override for PROXY protocol enable flag. | -| reuse_allow | `bool` | Enables `SO_REUSEPORT` for multi-instance bind sharing. | +| Parameter | Type | Default | Constraints / validation | Description | +|---|---|---|---|---| +| ip | `IpAddr` | — | — | Listener bind IP. | +| announce | `String \| null` | — | — | Public IP/domain announced in proxy links (priority over `announce_ip`). | +| announce_ip | `IpAddr \| null` | — | — | Deprecated legacy announce IP (migrated to `announce` if needed). | +| proxy_protocol | `bool \| null` | `null` | — | Per-listener override for PROXY protocol enable flag. | +| reuse_allow | `bool` | `false` | — | Enables `SO_REUSEPORT` for multi-instance bind sharing. | ## [timeouts] -| Parameter | Type | Description | -|---|---|---| -| client_handshake | `u64` | Client handshake timeout. | -| tg_connect | `u64` | Upstream Telegram connect timeout. | -| client_keepalive | `u64` | Client keepalive timeout. | -| client_ack | `u64` | Client ACK timeout. | -| me_one_retry | `u8` | Quick ME reconnect attempts for single-address DC. | -| me_one_timeout_ms | `u64` | Timeout per quick attempt for single-address DC (ms). | +| Parameter | Type | Default | Constraints / validation | Description | +|---|---|---|---|---| +| client_handshake | `u64` | `30` | — | Client handshake timeout. | +| tg_connect | `u64` | `10` | — | Upstream Telegram connect timeout. | +| client_keepalive | `u64` | `15` | — | Client keepalive timeout. | +| client_ack | `u64` | `90` | — | Client ACK timeout. | +| me_one_retry | `u8` | `12` | none | Fast reconnect attempts budget for single-endpoint DC scenarios. | +| me_one_timeout_ms | `u64` | `1200` | none | Timeout in milliseconds for each quick single-endpoint reconnect attempt. | ## [censorship] -| Parameter | Type | Description | -|---|---|---| -| tls_domain | `String` | Primary TLS domain used in fake TLS handshake profile. | -| tls_domains | `String[]` | Additional TLS domains for generating multiple links. | -| mask | `bool` | Enables masking/fronting relay mode. | -| mask_host | `String` | Upstream mask host for TLS fronting relay. | -| mask_port | `u16` | Upstream mask port for TLS fronting relay. | -| mask_unix_sock | `String` | Unix socket path for mask backend instead of TCP host/port. | -| fake_cert_len | `usize` | Length of synthetic certificate payload when emulation data is unavailable. | -| tls_emulation | `bool` | Enables certificate/TLS behavior emulation from cached real fronts. | -| tls_front_dir | `String` | Directory path for TLS front cache storage. | -| server_hello_delay_min_ms | `u64` | Minimum server_hello delay for anti-fingerprint behavior (ms). | -| server_hello_delay_max_ms | `u64` | Maximum server_hello delay for anti-fingerprint behavior (ms). | -| tls_new_session_tickets | `u8` | Number of `NewSessionTicket` messages to emit after handshake. | -| tls_full_cert_ttl_secs | `u64` | TTL for sending full cert payload per (domain, client IP) tuple. | -| alpn_enforce | `bool` | Enforces ALPN echo behavior based on client preference. | -| mask_proxy_protocol | `u8` | PROXY protocol mode for mask backend (`0` disabled, `1` v1, `2` v2). | +| Parameter | Type | Default | Constraints / validation | Description | +|---|---|---|---|---| +| tls_domain | `String` | `"petrovich.ru"` | — | Primary TLS domain used in fake TLS handshake profile. | +| tls_domains | `String[]` | `[]` | — | Additional TLS domains for generating multiple links. | +| mask | `bool` | `true` | — | Enables masking/fronting relay mode. | +| mask_host | `String \| null` | `null` | — | Upstream mask host for TLS fronting relay. | +| mask_port | `u16` | `443` | — | Upstream mask port for TLS fronting relay. | +| mask_unix_sock | `String \| null` | `null` | — | Unix socket path for mask backend instead of TCP host/port. | +| fake_cert_len | `usize` | `2048` | — | Length of synthetic certificate payload when emulation data is unavailable. | +| tls_emulation | `bool` | `true` | — | Enables certificate/TLS behavior emulation from cached real fronts. | +| tls_front_dir | `String` | `"tlsfront"` | — | Directory path for TLS front cache storage. | +| server_hello_delay_min_ms | `u64` | `0` | — | Minimum server_hello delay for anti-fingerprint behavior (ms). | +| server_hello_delay_max_ms | `u64` | `0` | — | Maximum server_hello delay for anti-fingerprint behavior (ms). | +| tls_new_session_tickets | `u8` | `0` | — | Number of `NewSessionTicket` messages to emit after handshake. | +| tls_full_cert_ttl_secs | `u64` | `90` | — | TTL for sending full cert payload per (domain, client IP) tuple. | +| alpn_enforce | `bool` | `true` | — | Enforces ALPN echo behavior based on client preference. | +| mask_proxy_protocol | `u8` | `0` | — | PROXY protocol mode for mask backend (`0` disabled, `1` v1, `2` v2). | ## [access] -| Parameter | Type | Description | -|---|---|---| -| users | `Map` | Username -> 32-hex secret mapping. | -| user_ad_tags | `Map` | Per-user ad tags (32 hex chars). | -| user_max_tcp_conns | `Map` | Per-user maximum concurrent TCP connections. | -| user_expirations | `Map>` | Per-user account expiration timestamps. | -| user_data_quota | `Map` | Per-user data quota limits. | -| user_max_unique_ips | `Map` | Per-user unique source IP limits. | -| user_max_unique_ips_global_each | `usize` | Global fallback per-user unique IP limit when no per-user override exists. | -| user_max_unique_ips_mode | `"active_window" \| "time_window" \| "combined"` | Unique source IP limit accounting mode. | -| user_max_unique_ips_window_secs | `u64` | Recent-window size for unique IP accounting (seconds). | -| replay_check_len | `usize` | Replay check storage length. | -| replay_window_secs | `u64` | Replay protection time window in seconds. | -| ignore_time_skew | `bool` | Ignores client/server timestamp skew in replay validation. | +| Parameter | Type | Default | Constraints / validation | TOML shape example | Description | +|---|---|---|---|---|---| +| users | `Map` | `{"default": "000…000"}` | Secret must be 32 hex characters. | `[access.users]`
`user = "32-hex secret"`
`user2 = "32-hex secret"` | User credentials map used for client authentication. | +| user_ad_tags | `Map` | `{}` | Every value must be exactly 32 hex characters. | `[access.user_ad_tags]`
`user = "32-hex ad_tag"` | Per-user ad tags used as override over `general.ad_tag`. | +| user_max_tcp_conns | `Map` | `{}` | — | `[access.user_max_tcp_conns]`
`user = 500` | Per-user maximum concurrent TCP connections. | +| user_expirations | `Map>` | `{}` | Timestamp must be valid RFC3339/ISO-8601 datetime. | `[access.user_expirations]`
`user = "2026-12-31T23:59:59Z"` | Per-user account expiration timestamps. | +| user_data_quota | `Map` | `{}` | — | `[access.user_data_quota]`
`user = 1073741824` | Per-user traffic quota in bytes. | +| user_max_unique_ips | `Map` | `{}` | — | `[access.user_max_unique_ips]`
`user = 16` | Per-user unique source IP limits. | +| user_max_unique_ips_global_each | `usize` | `0` | — | `user_max_unique_ips_global_each = 0` | Global fallback used when `[access.user_max_unique_ips]` has no per-user override. | +| user_max_unique_ips_mode | `"active_window" \| "time_window" \| "combined"` | `"active_window"` | — | `user_max_unique_ips_mode = "active_window"` | Unique source IP limit accounting mode. | +| user_max_unique_ips_window_secs | `u64` | `30` | Must be `> 0`. | `user_max_unique_ips_window_secs = 30` | Window size (seconds) used by unique-IP accounting modes that use time windows. | +| replay_check_len | `usize` | `65536` | — | `replay_check_len = 65536` | Replay-protection storage length. | +| replay_window_secs | `u64` | `1800` | — | `replay_window_secs = 1800` | Replay-protection window in seconds. | +| ignore_time_skew | `bool` | `false` | — | `ignore_time_skew = false` | Disables client/server timestamp skew checks in replay validation when enabled. | ## [[upstreams]] -| Parameter | Type | Description | -|---|---|---| -| type | `"direct" \| "socks4" \| "socks5"` | Upstream transport type selector. | -| weight | `u16` | Weighted selection coefficient for this upstream. | -| enabled | `bool` | Enables/disables this upstream entry. | -| scopes | `String` | Comma-separated scope tags for routing. | -| interface | `String` | Optional outgoing interface name (`direct`, `socks4`, `socks5`). | -| bind_addresses | `String[]` | Optional source bind addresses for `direct` upstream. | -| address | `String` | Upstream proxy address (`host:port`) for SOCKS upstreams. | -| user_id | `String` | SOCKS4 user ID (only for `type = "socks4"`). | -| username | `String` | SOCKS5 username (only for `type = "socks5"`). | -| password | `String` | SOCKS5 password (only for `type = "socks5"`). | +| Parameter | Type | Default | Constraints / validation | Description | +|---|---|---|---|---| +| type | `"direct" \| "socks4" \| "socks5"` | — | Required field. | Upstream transport type selector. | +| weight | `u16` | `1` | none | Base weight used by weighted-random upstream selection. | +| enabled | `bool` | `true` | none | Disabled entries are excluded from upstream selection at runtime. | +| scopes | `String` | `""` | none | Comma-separated scope tags used for request-level upstream filtering. | +| interface | `String \| null` | `null` | Optional; type-specific runtime rules apply. | Optional outbound interface/local bind hint (supported with type-specific rules). | +| bind_addresses | `String[] \| null` | `null` | Applies to `type = "direct"`. | Optional explicit local source bind addresses for `type = "direct"`. | +| address | `String` | — | Required for `type = "socks4"` and `type = "socks5"`. | SOCKS server endpoint (`host:port` or `ip:port`) for SOCKS upstream types. | +| user_id | `String \| null` | `null` | Only for `type = "socks4"`. | SOCKS4 CONNECT user ID (`type = "socks4"` only). | +| username | `String \| null` | `null` | Only for `type = "socks5"`. | SOCKS5 username (`type = "socks5"` only). | +| password | `String \| null` | `null` | Only for `type = "socks5"`. | SOCKS5 password (`type = "socks5"` only). | From 89e5668c7e000ac7ffd5c3512280236f64862902 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 18 Mar 2026 22:33:41 +0300 Subject: [PATCH 04/11] Runtime guardrails Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/defaults.rs | 20 ++++ src/config/hot_reload.rs | 9 ++ src/config/load.rs | 24 +++++ src/config/types.rs | 23 +++++ src/maestro/helpers.rs | 1 + src/maestro/listeners.rs | 98 +++++++++++++++---- src/maestro/me_startup.rs | 2 + src/proxy/middle_relay.rs | 82 +++++++++++++--- src/transport/middle_proxy/health.rs | 2 + .../middle_proxy/health_adversarial_tests.rs | 2 + .../middle_proxy/health_integration_tests.rs | 2 + .../middle_proxy/health_regression_tests.rs | 2 + src/transport/middle_proxy/pool.rs | 8 ++ src/transport/middle_proxy/pool_writer.rs | 63 ++++++------ src/transport/middle_proxy/send.rs | 84 ++++++++++++++-- 15 files changed, 345 insertions(+), 77 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 7b5b4a8..54a53b3 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -36,12 +36,16 @@ const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000; const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000; const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000; const DEFAULT_ME_WARN_RATE_LIMIT_MS: u64 = 5000; +const DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS: u64 = 3000; +const DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS: u64 = 250; +const DEFAULT_ME_C2ME_SEND_TIMEOUT_MS: u64 = 4000; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000; const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30; +const DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS: u64 = 250; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5; const DEFAULT_UPSTREAM_CONNECT_BUDGET_MS: u64 = 3000; @@ -156,6 +160,10 @@ pub(crate) fn default_server_max_connections() -> u32 { 10_000 } +pub(crate) fn default_accept_permit_timeout_ms() -> u64 { + DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS +} + pub(crate) fn default_prefer_4() -> u8 { 4 } @@ -380,6 +388,18 @@ pub(crate) fn default_me_warn_rate_limit_ms() -> u64 { DEFAULT_ME_WARN_RATE_LIMIT_MS } +pub(crate) fn default_me_route_hybrid_max_wait_ms() -> u64 { + DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS +} + +pub(crate) fn default_me_route_blocking_send_timeout_ms() -> u64 { + DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS +} + +pub(crate) fn default_me_c2me_send_timeout_ms() -> u64 { + DEFAULT_ME_C2ME_SEND_TIMEOUT_MS +} + pub(crate) fn default_upstream_connect_retry_attempts() -> u32 { DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index fdf06fa..7b94999 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -612,6 +612,8 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b || old.server.listen_tcp != new.server.listen_tcp || old.server.listen_unix_sock != new.server.listen_unix_sock || old.server.listen_unix_sock_perm != new.server.listen_unix_sock_perm + || old.server.max_connections != new.server.max_connections + || old.server.accept_permit_timeout_ms != new.server.accept_permit_timeout_ms { warned = true; warn!("config reload: server listener settings changed; restart required"); @@ -671,6 +673,9 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b } if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode || old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms + || old.general.me_route_hybrid_max_wait_ms != new.general.me_route_hybrid_max_wait_ms + || old.general.me_route_blocking_send_timeout_ms + != new.general.me_route_blocking_send_timeout_ms || old.general.me_route_inline_recovery_attempts != new.general.me_route_inline_recovery_attempts || old.general.me_route_inline_recovery_wait_ms @@ -679,6 +684,10 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b warned = true; warn!("config reload: general.me_route_no_writer_* changed; restart required"); } + if old.general.me_c2me_send_timeout_ms != new.general.me_c2me_send_timeout_ms { + warned = true; + warn!("config reload: general.me_c2me_send_timeout_ms changed; restart required"); + } if old.general.unknown_dc_log_path != new.general.unknown_dc_log_path || old.general.unknown_dc_file_log_enabled != new.general.unknown_dc_file_log_enabled { diff --git a/src/config/load.rs b/src/config/load.rs index 6fcbea3..0635f80 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -346,6 +346,12 @@ impl ProxyConfig { )); } + if config.general.me_c2me_send_timeout_ms > 60_000 { + return Err(ProxyError::Config( + "general.me_c2me_send_timeout_ms must be within [0, 60000]".to_string(), + )); + } + if config.general.me_reader_route_data_wait_ms > 20 { return Err(ProxyError::Config( "general.me_reader_route_data_wait_ms must be within [0, 20]".to_string(), @@ -627,6 +633,18 @@ impl ProxyConfig { )); } + if !(50..=60_000).contains(&config.general.me_route_hybrid_max_wait_ms) { + return Err(ProxyError::Config( + "general.me_route_hybrid_max_wait_ms must be within [50, 60000]".to_string(), + )); + } + + if config.general.me_route_blocking_send_timeout_ms > 5000 { + return Err(ProxyError::Config( + "general.me_route_blocking_send_timeout_ms must be within [0, 5000]".to_string(), + )); + } + if !(2..=4).contains(&config.general.me_writer_pick_sample_size) { return Err(ProxyError::Config( "general.me_writer_pick_sample_size must be within [2, 4]".to_string(), @@ -687,6 +705,12 @@ impl ProxyConfig { )); } + if config.server.accept_permit_timeout_ms > 60_000 { + return Err(ProxyError::Config( + "server.accept_permit_timeout_ms must be within [0, 60000]".to_string(), + )); + } + if config.general.effective_me_pool_force_close_secs() > 0 && config.general.effective_me_pool_force_close_secs() < config.general.me_pool_drain_ttl_secs diff --git a/src/config/types.rs b/src/config/types.rs index e507044..047f3c2 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -462,6 +462,11 @@ pub struct GeneralConfig { #[serde(default = "default_me_c2me_channel_capacity")] pub me_c2me_channel_capacity: usize, + /// Maximum wait in milliseconds for enqueueing C2ME commands when the queue is full. + /// `0` keeps legacy unbounded wait behavior. + #[serde(default = "default_me_c2me_send_timeout_ms")] + pub me_c2me_send_timeout_ms: u64, + /// Bounded wait in milliseconds for routing ME DATA to per-connection queue. /// `0` keeps legacy no-wait behavior. #[serde(default = "default_me_reader_route_data_wait_ms")] @@ -716,6 +721,15 @@ pub struct GeneralConfig { #[serde(default = "default_me_route_no_writer_wait_ms")] pub me_route_no_writer_wait_ms: u64, + /// Maximum cumulative wait in milliseconds for hybrid no-writer mode before failfast. + #[serde(default = "default_me_route_hybrid_max_wait_ms")] + pub me_route_hybrid_max_wait_ms: u64, + + /// Maximum wait in milliseconds for blocking ME writer channel send fallback. + /// `0` keeps legacy unbounded wait behavior. + #[serde(default = "default_me_route_blocking_send_timeout_ms")] + pub me_route_blocking_send_timeout_ms: u64, + /// Number of inline recovery attempts in legacy mode. #[serde(default = "default_me_route_inline_recovery_attempts")] pub me_route_inline_recovery_attempts: u32, @@ -921,6 +935,7 @@ impl Default for GeneralConfig { me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(), me_route_channel_capacity: default_me_route_channel_capacity(), me_c2me_channel_capacity: default_me_c2me_channel_capacity(), + me_c2me_send_timeout_ms: default_me_c2me_send_timeout_ms(), me_reader_route_data_wait_ms: default_me_reader_route_data_wait_ms(), me_d2c_flush_batch_max_frames: default_me_d2c_flush_batch_max_frames(), me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(), @@ -975,6 +990,8 @@ impl Default for GeneralConfig { me_warn_rate_limit_ms: default_me_warn_rate_limit_ms(), me_route_no_writer_mode: MeRouteNoWriterMode::default(), me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(), + me_route_hybrid_max_wait_ms: default_me_route_hybrid_max_wait_ms(), + me_route_blocking_send_timeout_ms: default_me_route_blocking_send_timeout_ms(), me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(), me_route_inline_recovery_wait_ms: default_me_route_inline_recovery_wait_ms(), links: LinksConfig::default(), @@ -1207,6 +1224,11 @@ pub struct ServerConfig { /// 0 means unlimited. #[serde(default = "default_server_max_connections")] pub max_connections: u32, + + /// Maximum wait in milliseconds while acquiring a connection slot permit. + /// `0` keeps legacy unbounded wait behavior. + #[serde(default = "default_accept_permit_timeout_ms")] + pub accept_permit_timeout_ms: u64, } impl Default for ServerConfig { @@ -1226,6 +1248,7 @@ impl Default for ServerConfig { api: ApiConfig::default(), listeners: Vec::new(), max_connections: default_server_max_connections(), + accept_permit_timeout_ms: default_accept_permit_timeout_ms(), } } } diff --git a/src/maestro/helpers.rs b/src/maestro/helpers.rs index 78f3ec4..f43e308 100644 --- a/src/maestro/helpers.rs +++ b/src/maestro/helpers.rs @@ -205,6 +205,7 @@ pub(crate) fn format_uptime(total_secs: u64) -> String { format!("{} / {} seconds", parts.join(", "), total_secs) } +#[allow(dead_code)] pub(crate) async fn wait_until_admission_open(admission_rx: &mut watch::Receiver) -> bool { loop { if *admission_rx.borrow() { diff --git a/src/maestro/listeners.rs b/src/maestro/listeners.rs index 6296fd7..fe041d9 100644 --- a/src/maestro/listeners.rs +++ b/src/maestro/listeners.rs @@ -24,7 +24,7 @@ use crate::transport::{ ListenOptions, UpstreamManager, create_listener, find_listener_processes, }; -use super::helpers::{is_expected_handshake_eof, print_proxy_links, wait_until_admission_open}; +use super::helpers::{is_expected_handshake_eof, print_proxy_links}; pub(crate) struct BoundListeners { pub(crate) listeners: Vec<(TcpListener, bool)>, @@ -195,7 +195,7 @@ pub(crate) async fn bind_listeners( has_unix_listener = true; let mut config_rx_unix: watch::Receiver> = config_rx.clone(); - let mut admission_rx_unix = admission_rx.clone(); + let admission_rx_unix = admission_rx.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); @@ -212,17 +212,44 @@ pub(crate) async fn bind_listeners( let unix_conn_counter = Arc::new(std::sync::atomic::AtomicU64::new(1)); loop { - if !wait_until_admission_open(&mut admission_rx_unix).await { - warn!("Conditional-admission gate channel closed for unix listener"); - break; - } match unix_listener.accept().await { Ok((stream, _)) => { - let permit = match max_connections_unix.clone().acquire_owned().await { - Ok(permit) => permit, - Err(_) => { - error!("Connection limiter is closed"); - break; + if !*admission_rx_unix.borrow() { + drop(stream); + continue; + } + let accept_permit_timeout_ms = config_rx_unix + .borrow() + .server + .accept_permit_timeout_ms; + let permit = if accept_permit_timeout_ms == 0 { + match max_connections_unix.clone().acquire_owned().await { + Ok(permit) => permit, + Err(_) => { + error!("Connection limiter is closed"); + break; + } + } + } else { + match tokio::time::timeout( + Duration::from_millis(accept_permit_timeout_ms), + max_connections_unix.clone().acquire_owned(), + ) + .await + { + Ok(Ok(permit)) => permit, + Ok(Err(_)) => { + error!("Connection limiter is closed"); + break; + } + Err(_) => { + debug!( + timeout_ms = accept_permit_timeout_ms, + "Dropping accepted unix connection: permit wait timeout" + ); + drop(stream); + continue; + } } }; let conn_id = @@ -312,7 +339,7 @@ pub(crate) fn spawn_tcp_accept_loops( ) { for (listener, listener_proxy_protocol) in listeners { let mut config_rx: watch::Receiver> = config_rx.clone(); - let mut admission_rx_tcp = admission_rx.clone(); + let admission_rx_tcp = admission_rx.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); @@ -327,17 +354,46 @@ pub(crate) fn spawn_tcp_accept_loops( tokio::spawn(async move { loop { - if !wait_until_admission_open(&mut admission_rx_tcp).await { - warn!("Conditional-admission gate channel closed for tcp listener"); - break; - } match listener.accept().await { Ok((stream, peer_addr)) => { - let permit = match max_connections_tcp.clone().acquire_owned().await { - Ok(permit) => permit, - Err(_) => { - error!("Connection limiter is closed"); - break; + if !*admission_rx_tcp.borrow() { + debug!(peer = %peer_addr, "Admission gate closed, dropping connection"); + drop(stream); + continue; + } + let accept_permit_timeout_ms = config_rx + .borrow() + .server + .accept_permit_timeout_ms; + let permit = if accept_permit_timeout_ms == 0 { + match max_connections_tcp.clone().acquire_owned().await { + Ok(permit) => permit, + Err(_) => { + error!("Connection limiter is closed"); + break; + } + } + } else { + match tokio::time::timeout( + Duration::from_millis(accept_permit_timeout_ms), + max_connections_tcp.clone().acquire_owned(), + ) + .await + { + Ok(Ok(permit)) => permit, + Ok(Err(_)) => { + error!("Connection limiter is closed"); + break; + } + Err(_) => { + debug!( + peer = %peer_addr, + timeout_ms = accept_permit_timeout_ms, + "Dropping accepted connection: permit wait timeout" + ); + drop(stream); + continue; + } } }; let config = config_rx.borrow_and_update().clone(); diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 94ae884..827b00c 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -267,6 +267,8 @@ pub(crate) async fn initialize_me_pool( config.general.me_warn_rate_limit_ms, config.general.me_route_no_writer_mode, config.general.me_route_no_writer_wait_ms, + config.general.me_route_hybrid_max_wait_ms, + config.general.me_route_blocking_send_timeout_ms, config.general.me_route_inline_recovery_attempts, config.general.me_route_inline_recovery_wait_ms, ); diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 4f70a17..102b06c 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -222,6 +222,7 @@ fn should_yield_c2me_sender(sent_since_yield: usize, has_backlog: bool) -> bool async fn enqueue_c2me_command( tx: &mpsc::Sender, cmd: C2MeCommand, + send_timeout: Duration, ) -> std::result::Result<(), mpsc::error::SendError> { match tx.try_send(cmd) { Ok(()) => Ok(()), @@ -231,7 +232,17 @@ async fn enqueue_c2me_command( if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS { tokio::task::yield_now().await; } - tx.send(cmd).await + if send_timeout.is_zero() { + return tx.send(cmd).await; + } + match tokio::time::timeout(send_timeout, tx.reserve()).await { + Ok(Ok(permit)) => { + permit.send(cmd); + Ok(()) + } + Ok(Err(_)) => Err(mpsc::error::SendError(cmd)), + Err(_) => Err(mpsc::error::SendError(cmd)), + } } } } @@ -355,6 +366,7 @@ where .general .me_c2me_channel_capacity .max(C2ME_CHANNEL_CAPACITY_FALLBACK); + let c2me_send_timeout = Duration::from_millis(config.general.me_c2me_send_timeout_ms); let (c2me_tx, mut c2me_rx) = mpsc::channel::(c2me_channel_capacity); let me_pool_c2me = me_pool.clone(); let effective_tag = effective_tag; @@ -363,15 +375,42 @@ where while let Some(cmd) = c2me_rx.recv().await { match cmd { C2MeCommand::Data { payload, flags } => { - me_pool_c2me.send_proxy_req( - conn_id, - success.dc_idx, - peer, - translated_local_addr, - payload.as_ref(), - flags, - effective_tag.as_deref(), - ).await?; + if c2me_send_timeout.is_zero() { + me_pool_c2me + .send_proxy_req( + conn_id, + success.dc_idx, + peer, + translated_local_addr, + payload.as_ref(), + flags, + effective_tag.as_deref(), + ) + .await?; + } else { + match tokio::time::timeout( + c2me_send_timeout, + me_pool_c2me.send_proxy_req( + conn_id, + success.dc_idx, + peer, + translated_local_addr, + payload.as_ref(), + flags, + effective_tag.as_deref(), + ), + ) + .await + { + Ok(send_result) => send_result?, + Err(_) => { + return Err(ProxyError::Proxy(format!( + "ME send timeout after {}ms", + c2me_send_timeout.as_millis() + ))); + } + } + } sent_since_yield = sent_since_yield.saturating_add(1); if should_yield_c2me_sender(sent_since_yield, !c2me_rx.is_empty()) { sent_since_yield = 0; @@ -555,7 +594,7 @@ where loop { if session_lease.is_stale() { stats.increment_reconnect_stale_close_total(); - let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await; + let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await; main_result = Err(ProxyError::Proxy("Session evicted by reconnect".to_string())); break; } @@ -573,7 +612,7 @@ where "Cutover affected middle session, closing client connection" ); tokio::time::sleep(delay).await; - let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await; + let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await; main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string())); break; } @@ -607,9 +646,13 @@ where flags |= RPC_FLAG_NOT_ENCRYPTED; } // Keep client read loop lightweight: route heavy ME send path via a dedicated task. - if enqueue_c2me_command(&c2me_tx, C2MeCommand::Data { payload, flags }) - .await - .is_err() + if enqueue_c2me_command( + &c2me_tx, + C2MeCommand::Data { payload, flags }, + c2me_send_timeout, + ) + .await + .is_err() { main_result = Err(ProxyError::Proxy("ME sender channel closed".into())); break; @@ -618,7 +661,12 @@ where Ok(None) => { debug!(conn_id, "Client EOF"); client_closed = true; - let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await; + let _ = enqueue_c2me_command( + &c2me_tx, + C2MeCommand::Close, + c2me_send_timeout, + ) + .await; break; } Err(e) => { @@ -993,6 +1041,7 @@ mod tests { payload: Bytes::from_static(&[1, 2, 3]), flags: 0, }, + TokioDuration::from_millis(50), ) .await .unwrap(); @@ -1028,6 +1077,7 @@ mod tests { payload: Bytes::from_static(&[7, 7]), flags: 7, }, + TokioDuration::from_millis(100), ) .await .unwrap(); diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 862e58a..0b9b749 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -1574,6 +1574,8 @@ mod tests { general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ) diff --git a/src/transport/middle_proxy/health_adversarial_tests.rs b/src/transport/middle_proxy/health_adversarial_tests.rs index dc1a0eb..3f182e4 100644 --- a/src/transport/middle_proxy/health_adversarial_tests.rs +++ b/src/transport/middle_proxy/health_adversarial_tests.rs @@ -111,6 +111,8 @@ async fn make_pool( general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ); diff --git a/src/transport/middle_proxy/health_integration_tests.rs b/src/transport/middle_proxy/health_integration_tests.rs index 4724851..7f99d2a 100644 --- a/src/transport/middle_proxy/health_integration_tests.rs +++ b/src/transport/middle_proxy/health_integration_tests.rs @@ -110,6 +110,8 @@ async fn make_pool( general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ); diff --git a/src/transport/middle_proxy/health_regression_tests.rs b/src/transport/middle_proxy/health_regression_tests.rs index 45a1eee..606f7e5 100644 --- a/src/transport/middle_proxy/health_regression_tests.rs +++ b/src/transport/middle_proxy/health_regression_tests.rs @@ -103,6 +103,8 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc { general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ) diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index f3cc817..d09f07c 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -193,6 +193,8 @@ pub struct MePool { pub(super) me_reader_route_data_wait_ms: Arc, pub(super) me_route_no_writer_mode: AtomicU8, pub(super) me_route_no_writer_wait: Duration, + pub(super) me_route_hybrid_max_wait: Duration, + pub(super) me_route_blocking_send_timeout: Duration, pub(super) me_route_inline_recovery_attempts: u32, pub(super) me_route_inline_recovery_wait: Duration, pub(super) me_health_interval_ms_unhealthy: AtomicU64, @@ -307,6 +309,8 @@ impl MePool { me_warn_rate_limit_ms: u64, me_route_no_writer_mode: MeRouteNoWriterMode, me_route_no_writer_wait_ms: u64, + me_route_hybrid_max_wait_ms: u64, + me_route_blocking_send_timeout_ms: u64, me_route_inline_recovery_attempts: u32, me_route_inline_recovery_wait_ms: u64, ) -> Arc { @@ -490,6 +494,10 @@ impl MePool { me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)), me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), + me_route_hybrid_max_wait: Duration::from_millis(me_route_hybrid_max_wait_ms), + me_route_blocking_send_timeout: Duration::from_millis( + me_route_blocking_send_timeout_ms, + ), me_route_inline_recovery_attempts, me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms), me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)), diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 8ce3de3..4035111 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -312,41 +312,28 @@ impl MePool { let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_PING_U32.to_le_bytes()); p.extend_from_slice(&sent_id.to_le_bytes()); - { - let mut tracker = ping_tracker_ping.lock().await; - let now_epoch_ms = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64; - let mut run_cleanup = false; - if let Some(pool) = pool_ping.upgrade() { - let last_cleanup_ms = pool + let now_epoch_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + let mut run_cleanup = false; + if let Some(pool) = pool_ping.upgrade() { + let last_cleanup_ms = pool + .ping_tracker_last_cleanup_epoch_ms + .load(Ordering::Relaxed); + if now_epoch_ms.saturating_sub(last_cleanup_ms) >= 30_000 + && pool .ping_tracker_last_cleanup_epoch_ms - .load(Ordering::Relaxed); - if now_epoch_ms.saturating_sub(last_cleanup_ms) >= 30_000 - && pool - .ping_tracker_last_cleanup_epoch_ms - .compare_exchange( - last_cleanup_ms, - now_epoch_ms, - Ordering::AcqRel, - Ordering::Relaxed, - ) - .is_ok() - { - run_cleanup = true; - } + .compare_exchange( + last_cleanup_ms, + now_epoch_ms, + Ordering::AcqRel, + Ordering::Relaxed, + ) + .is_ok() + { + run_cleanup = true; } - - if run_cleanup { - let before = tracker.len(); - tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120)); - let expired = before.saturating_sub(tracker.len()); - if expired > 0 { - stats_ping.increment_me_keepalive_timeout_by(expired as u64); - } - } - tracker.insert(sent_id, (std::time::Instant::now(), writer_id)); } ping_id = ping_id.wrapping_add(1); stats_ping.increment_me_keepalive_sent(); @@ -367,6 +354,16 @@ impl MePool { } break; } + let mut tracker = ping_tracker_ping.lock().await; + if run_cleanup { + let before = tracker.len(); + tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120)); + let expired = before.saturating_sub(tracker.len()); + if expired > 0 { + stats_ping.increment_me_keepalive_timeout_by(expired as u64); + } + } + tracker.insert(sent_id, (std::time::Instant::now(), writer_id)); } }); diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 0f9fed6..1c255ef 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -6,6 +6,7 @@ use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; use bytes::Bytes; +use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, warn}; @@ -29,6 +30,29 @@ const PICK_PENALTY_DRAINING: u64 = 600; const PICK_PENALTY_STALE: u64 = 300; const PICK_PENALTY_DEGRADED: u64 = 250; +enum TimedSendError { + Closed(T), + Timeout(T), +} + +async fn send_writer_command_with_timeout( + tx: &mpsc::Sender, + cmd: WriterCommand, + timeout: Duration, +) -> std::result::Result<(), TimedSendError> { + if timeout.is_zero() { + return tx.send(cmd).await.map_err(|err| TimedSendError::Closed(err.0)); + } + match tokio::time::timeout(timeout, tx.reserve()).await { + Ok(Ok(permit)) => { + permit.send(cmd); + Ok(()) + } + Ok(Err(_)) => Err(TimedSendError::Closed(cmd)), + Err(_) => Err(TimedSendError::Timeout(cmd)), + } +} + impl MePool { /// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default. pub async fn send_proxy_req( @@ -78,8 +102,18 @@ impl MePool { let mut hybrid_last_recovery_at: Option = None; let hybrid_wait_step = self.me_route_no_writer_wait.max(Duration::from_millis(50)); let mut hybrid_wait_current = hybrid_wait_step; + let hybrid_deadline = Instant::now() + self.me_route_hybrid_max_wait; loop { + if matches!(no_writer_mode, MeRouteNoWriterMode::HybridAsyncPersistent) + && Instant::now() >= hybrid_deadline + { + self.stats.increment_me_no_writer_failfast_total(); + return Err(ProxyError::Proxy( + "No ME writer available in hybrid wait window".into(), + )); + } + let mut skip_writer_id: Option = None; let current_meta = self .registry .get_meta(conn_id) @@ -90,12 +124,30 @@ impl MePool { match current.tx.try_send(WriterCommand::Data(current_payload.clone())) { Ok(()) => return Ok(()), Err(TrySendError::Full(cmd)) => { - if current.tx.send(cmd).await.is_ok() { - return Ok(()); + match send_writer_command_with_timeout( + ¤t.tx, + cmd, + self.me_route_blocking_send_timeout, + ) + .await + { + Ok(()) => return Ok(()), + Err(TimedSendError::Closed(_)) => { + warn!(writer_id = current.writer_id, "ME writer channel closed"); + self.remove_writer_and_close_clients(current.writer_id).await; + continue; + } + Err(TimedSendError::Timeout(_)) => { + debug!( + conn_id, + writer_id = current.writer_id, + timeout_ms = self.me_route_blocking_send_timeout.as_millis() + as u64, + "ME writer send timed out for bound writer, trying reroute" + ); + skip_writer_id = Some(current.writer_id); + } } - warn!(writer_id = current.writer_id, "ME writer channel closed"); - self.remove_writer_and_close_clients(current.writer_id).await; - continue; } Err(TrySendError::Closed(_)) => { warn!(writer_id = current.writer_id, "ME writer channel closed"); @@ -200,6 +252,9 @@ impl MePool { .candidate_indices_for_dc(&writers_snapshot, routed_dc, true) .await; } + if let Some(skip_writer_id) = skip_writer_id { + candidate_indices.retain(|idx| writers_snapshot[*idx].id != skip_writer_id); + } if candidate_indices.is_empty() { let pick_mode = self.writer_pick_mode(); match no_writer_mode { @@ -422,7 +477,13 @@ impl MePool { self.stats.increment_me_writer_pick_blocking_fallback_total(); let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port()); let (payload, meta) = build_routed_payload(effective_our_addr); - match w.tx.send(WriterCommand::Data(payload.clone())).await { + match send_writer_command_with_timeout( + &w.tx, + WriterCommand::Data(payload.clone()), + self.me_route_blocking_send_timeout, + ) + .await + { Ok(()) => { self.stats .increment_me_writer_pick_success_fallback_total(pick_mode); @@ -439,11 +500,20 @@ impl MePool { } return Ok(()); } - Err(_) => { + Err(TimedSendError::Closed(_)) => { self.stats.increment_me_writer_pick_closed_total(pick_mode); warn!(writer_id = w.id, "ME writer channel closed (blocking)"); self.remove_writer_and_close_clients(w.id).await; } + Err(TimedSendError::Timeout(_)) => { + self.stats.increment_me_writer_pick_full_total(pick_mode); + debug!( + conn_id, + writer_id = w.id, + timeout_ms = self.me_route_blocking_send_timeout.as_millis() as u64, + "ME writer blocking fallback send timed out" + ); + } } } } From 03891db0c91ad7b91bfa01da20dc1cf4de565cd1 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 18 Mar 2026 22:36:33 +0300 Subject: [PATCH 05/11] Update Cargo.toml Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index eba2e2e..b289231 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.3.22" +version = "3.3.23" edition = "2024" [dependencies] From c47495d6713f83d937a98c0272f2c0bbfc4ce176 Mon Sep 17 00:00:00 2001 From: Dimasssss Date: Thu, 19 Mar 2026 00:36:02 +0300 Subject: [PATCH 06/11] Update install.sh MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Вернул старый функционал + добавил новый: - Вернул автоматическое создание конфига с секретом - Вернул автоматическое создание службы - Добавил удаление службы и telemt через `install.sh uninstall` - Полное удаление вместе с конфигом через `install.sh --purge` - Добавил установку нужной версии `install.sh 3.3.15` --- install.sh | 568 +++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 489 insertions(+), 79 deletions(-) diff --git a/install.sh b/install.sh index 2dd207b..330bc3e 100644 --- a/install.sh +++ b/install.sh @@ -1,115 +1,525 @@ #!/bin/sh set -eu +# --- Global Configurations --- REPO="${REPO:-telemt/telemt}" BIN_NAME="${BIN_NAME:-telemt}" -VERSION="${1:-${VERSION:-latest}}" -INSTALL_DIR="${INSTALL_DIR:-/usr/local/bin}" +INSTALL_DIR="${INSTALL_DIR:-/bin}" +CONFIG_DIR="${CONFIG_DIR:-/etc/telemt}" +CONFIG_FILE="${CONFIG_FILE:-${CONFIG_DIR}/telemt.toml}" +WORK_DIR="${WORK_DIR:-/opt/telemt}" +SERVICE_NAME="telemt" +TEMP_DIR="" +SUDO="" -say() { - printf '%s\n' "$*" -} +# --- Argument Parsing --- +ACTION="install" +TARGET_VERSION="${VERSION:-latest}" -die() { - printf 'Error: %s\n' "$*" >&2 - exit 1 -} - -need_cmd() { - command -v "$1" >/dev/null 2>&1 || die "required command not found: $1" -} - -detect_os() { - os="$(uname -s)" - case "$os" in - Linux) printf 'linux\n' ;; - OpenBSD) printf 'openbsd\n' ;; - *) printf '%s\n' "$os" ;; +while [ $# -gt 0 ]; do + case "$1" in + -h|--help) + ACTION="help" + shift + ;; + uninstall|--uninstall) + [ "$ACTION" != "purge" ] && ACTION="uninstall" + shift + ;; + --purge) + ACTION="purge" + shift + ;; + install|--install) + ACTION="install" + shift + ;; + -*) + printf '[ERROR] Unknown option: %s\n' "$1" >&2 + exit 1 + ;; + *) + if [ "$ACTION" = "install" ]; then + TARGET_VERSION="$1" + fi + shift + ;; esac +done + +# --- Core Functions --- +say() { printf '[INFO] %s\n' "$*"; } +die() { printf '[ERROR] %s\n' "$*" >&2; exit 1; } + +cleanup() { + if [ -n "${TEMP_DIR:-}" ] && [ -d "$TEMP_DIR" ]; then + rm -rf -- "$TEMP_DIR" + fi +} + +trap cleanup EXIT INT TERM + +show_help() { + say "Usage: $0 [version | install | uninstall | --purge | --help]" + say " version Install specific version (e.g. 1.0.0, default: latest)" + say " uninstall Remove the binary and service (keeps config)" + say " --purge Remove everything including configuration" + exit 0 +} + +user_exists() { + if command -v getent >/dev/null 2>&1; then + getent passwd "$1" >/dev/null 2>&1 + else + grep -q "^${1}:" /etc/passwd 2>/dev/null + fi +} + +group_exists() { + if command -v getent >/dev/null 2>&1; then + getent group "$1" >/dev/null 2>&1 + else + grep -q "^${1}:" /etc/group 2>/dev/null + fi +} + +verify_common() { + [ -z "$BIN_NAME" ] && die "BIN_NAME cannot be empty." + [ -z "$INSTALL_DIR" ] && die "INSTALL_DIR cannot be empty." + [ -z "$CONFIG_DIR" ] && die "CONFIG_DIR cannot be empty." + + if [ "$(id -u)" -eq 0 ]; then + SUDO="" + else + if ! command -v sudo >/dev/null 2>&1; then + die "This script requires root or sudo. Neither found." + fi + SUDO="sudo" + say "sudo is available. Caching credentials..." + if ! sudo -v; then + die "Failed to cache sudo credentials" + fi + fi + + case "${INSTALL_DIR}${CONFIG_DIR}${WORK_DIR}" in + *[!a-zA-Z0-9_./-]*) + die "Invalid characters in path variables. Only alphanumeric, _, ., -, and / are allowed." + ;; + esac + + case "$BIN_NAME" in + *[!a-zA-Z0-9_-]*) die "Invalid characters in BIN_NAME: $BIN_NAME" ;; + esac + + for path in "$CONFIG_DIR" "$WORK_DIR"; do + check_path="$path" + + while [ "$check_path" != "/" ] && [ "${check_path%"/"}" != "$check_path" ]; do + check_path="${check_path%"/"}" + done + [ -z "$check_path" ] && check_path="/" + + case "$check_path" in + /|/bin|/sbin|/usr|/usr/bin|/usr/local|/etc|/opt|/var|/home|/root|/tmp) + die "Safety check failed: '$path' is a critical system directory." + ;; + esac + done + + for cmd in uname grep find rm chown chmod mv head mktemp; do + command -v "$cmd" >/dev/null 2>&1 || die "Required command not found: $cmd" + done +} + +verify_install_deps() { + if ! command -v curl >/dev/null 2>&1 && ! command -v wget >/dev/null 2>&1; then + die "Neither curl nor wget is installed." + fi + command -v tar >/dev/null 2>&1 || die "Required command not found: tar" + command -v gzip >/dev/null 2>&1 || die "Required command not found: gzip" + command -v cp >/dev/null 2>&1 || command -v install >/dev/null 2>&1 || die "Need cp or install" + + if ! command -v setcap >/dev/null 2>&1; then + say "setcap is missing. Installing required capability tools..." + if command -v apk >/dev/null 2>&1; then + $SUDO apk add --no-cache libcap || die "Failed to install libcap" + elif command -v apt-get >/dev/null 2>&1; then + $SUDO apt-get update -qq && $SUDO apt-get install -y -qq libcap2-bin || die "Failed to install libcap2-bin" + elif command -v dnf >/dev/null 2>&1 || command -v yum >/dev/null 2>&1; then + $SUDO ${YUM_CMD:-yum} install -y -q libcap || die "Failed to install libcap" + else + die "Cannot install 'setcap'. Package manager not found. Please install libcap manually." + fi + fi } detect_arch() { - arch="$(uname -m)" - case "$arch" in - x86_64|amd64) printf 'x86_64\n' ;; - aarch64|arm64) printf 'aarch64\n' ;; - *) die "unsupported architecture: $arch" ;; + sys_arch="$(uname -m)" + case "$sys_arch" in + x86_64|amd64) echo "x86_64" ;; + aarch64|arm64) echo "aarch64" ;; + *) die "Unsupported architecture: $sys_arch" ;; esac } detect_libc() { - case "$(ldd --version 2>&1 || true)" in - *musl*) printf 'musl\n' ;; - *) printf 'gnu\n' ;; - esac + if command -v ldd >/dev/null 2>&1 && ldd --version 2>&1 | grep -qi musl; then + echo "musl"; return 0 + fi + + if grep -q '^ID=alpine' /etc/os-release 2>/dev/null || grep -q '^ID="alpine"' /etc/os-release 2>/dev/null; then + echo "musl"; return 0 + fi + for f in /lib/ld-musl-*.so.* /lib64/ld-musl-*.so.*; do + if [ -e "$f" ]; then + echo "musl"; return 0 + fi + done + echo "gnu" } -fetch_to_stdout() { - url="$1" +fetch_file() { + fetch_url="$1" + fetch_out="$2" + if command -v curl >/dev/null 2>&1; then - curl -fsSL "$url" + curl -fsSL "$fetch_url" -o "$fetch_out" || return 1 elif command -v wget >/dev/null 2>&1; then - wget -qO- "$url" + wget -qO "$fetch_out" "$fetch_url" || return 1 else - die "neither curl nor wget is installed" + die "curl or wget required" + fi +} + +ensure_user_group() { + nologin_bin="/bin/false" + + cmd_nologin="$(command -v nologin 2>/dev/null || true)" + if [ -n "$cmd_nologin" ] && [ -x "$cmd_nologin" ]; then + nologin_bin="$cmd_nologin" + else + for bin in /sbin/nologin /usr/sbin/nologin; do + if [ -x "$bin" ]; then + nologin_bin="$bin" + break + fi + done + fi + + if ! group_exists telemt; then + if command -v groupadd >/dev/null 2>&1; then + $SUDO groupadd -r telemt || die "Failed to create group via groupadd" + elif command -v addgroup >/dev/null 2>&1; then + $SUDO addgroup -S telemt || die "Failed to create group via addgroup" + else + die "Cannot create group: neither groupadd nor addgroup found" + fi + fi + + if ! user_exists telemt; then + if command -v useradd >/dev/null 2>&1; then + $SUDO useradd -r -g telemt -d "$WORK_DIR" -s "$nologin_bin" -c "Telemt Proxy" telemt || die "Failed to create user via useradd" + elif command -v adduser >/dev/null 2>&1; then + $SUDO adduser -S -D -H -h "$WORK_DIR" -s "$nologin_bin" -G telemt telemt || die "Failed to create user via adduser" + else + die "Cannot create user: neither useradd nor adduser found" + fi + fi +} + +setup_dirs() { + say "Setting up directories..." + $SUDO mkdir -p "$WORK_DIR" "$CONFIG_DIR" || die "Failed to create directories" + $SUDO chown telemt:telemt "$WORK_DIR" || die "Failed to set owner on WORK_DIR" + $SUDO chmod 750 "$WORK_DIR" || die "Failed to set permissions on WORK_DIR" +} + +stop_service() { + say "Stopping service if running..." + if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then + $SUDO systemctl stop "$SERVICE_NAME" 2>/dev/null || true + elif command -v rc-service >/dev/null 2>&1; then + $SUDO rc-service "$SERVICE_NAME" stop 2>/dev/null || true fi } install_binary() { - src="$1" - dst="$2" + bin_src="$1" + bin_dst="$2" - if [ -w "$INSTALL_DIR" ] || { [ ! -e "$INSTALL_DIR" ] && [ -w "$(dirname "$INSTALL_DIR")" ]; }; then - mkdir -p "$INSTALL_DIR" - install -m 0755 "$src" "$dst" - elif command -v sudo >/dev/null 2>&1; then - sudo mkdir -p "$INSTALL_DIR" - sudo install -m 0755 "$src" "$dst" + $SUDO mkdir -p "$INSTALL_DIR" || die "Failed to create install directory" + if command -v install >/dev/null 2>&1; then + $SUDO install -m 0755 "$bin_src" "$bin_dst" || die "Failed to install binary" else - die "cannot write to $INSTALL_DIR and sudo is not available" + $SUDO rm -f "$bin_dst" + $SUDO cp "$bin_src" "$bin_dst" || die "Failed to copy binary" + $SUDO chmod 0755 "$bin_dst" || die "Failed to set permissions" + fi + + if [ ! -x "$bin_dst" ]; then + die "Failed to install binary or it is not executable: $bin_dst" + fi + + say "Granting network bind capabilities to bind port 443..." + if ! $SUDO setcap cap_net_bind_service=+ep "$bin_dst" 2>/dev/null; then + say "[WARNING] Failed to apply setcap. The service will NOT be able to open port 443!" + say "[WARNING] This usually happens inside unprivileged Docker/LXC containers." fi } -need_cmd uname -need_cmd tar -need_cmd mktemp -need_cmd grep -need_cmd install +generate_secret() { + if command -v openssl >/dev/null 2>&1; then + secret="$(openssl rand -hex 16 2>/dev/null)" && [ -n "$secret" ] && { echo "$secret"; return 0; } + fi + if command -v xxd >/dev/null 2>&1; then + secret="$(dd if=/dev/urandom bs=1 count=16 2>/dev/null | xxd -p | tr -d '\n')" && [ -n "$secret" ] && { echo "$secret"; return 0; } + fi + secret="$(dd if=/dev/urandom bs=1 count=16 2>/dev/null | od -An -tx1 | tr -d ' \n')" && [ -n "$secret" ] && { echo "$secret"; return 0; } + return 1 +} -ARCH="$(detect_arch)" -OS="$(detect_os)" +generate_config_content() { + cat </dev/null && config_exists=1 || true + else + [ -f "$CONFIG_FILE" ] && config_exists=1 || true + fi + + if [ "$config_exists" -eq 1 ]; then + say "Config already exists, skipping generation." + return 0 + fi + + toml_secret="$(generate_secret)" || die "Failed to generate secret" + say "Creating config at $CONFIG_FILE..." + + tmp_conf="$(mktemp "${TEMP_DIR:-/tmp}/telemt_conf.XXXXXX")" || die "Failed to create temp config" + generate_config_content "$toml_secret" > "$tmp_conf" || die "Failed to write temp config" + + $SUDO mv "$tmp_conf" "$CONFIG_FILE" || die "Failed to install config file" + $SUDO chown root:telemt "$CONFIG_FILE" || die "Failed to set owner" + $SUDO chmod 640 "$CONFIG_FILE" || die "Failed to set config permissions" + + say "Secret for user 'hello': $toml_secret" +} + +generate_systemd_content() { + cat </dev/null 2>&1 && [ -d /run/systemd/system ]; then + say "Installing systemd service..." + tmp_svc="$(mktemp "${TEMP_DIR:-/tmp}/${SERVICE_NAME}.service.XXXXXX")" || die "Failed to create temp service" + generate_systemd_content > "$tmp_svc" || die "Failed to generate service content" + + $SUDO mv "$tmp_svc" "/etc/systemd/system/${SERVICE_NAME}.service" || die "Failed to move service file" + $SUDO chown root:root "/etc/systemd/system/${SERVICE_NAME}.service" + $SUDO chmod 644 "/etc/systemd/system/${SERVICE_NAME}.service" + + $SUDO systemctl daemon-reload || die "Failed to reload systemd" + $SUDO systemctl enable "$SERVICE_NAME" || die "Failed to enable service" + $SUDO systemctl start "$SERVICE_NAME" || die "Failed to start service" + + elif command -v rc-update >/dev/null 2>&1; then + say "Installing OpenRC service..." + tmp_svc="$(mktemp "${TEMP_DIR:-/tmp}/${SERVICE_NAME}.init.XXXXXX")" || die "Failed to create temp file" + generate_openrc_content > "$tmp_svc" || die "Failed to generate init content" + + $SUDO mv "$tmp_svc" "/etc/init.d/${SERVICE_NAME}" || die "Failed to move service file" + $SUDO chown root:root "/etc/init.d/${SERVICE_NAME}" + $SUDO chmod 0755 "/etc/init.d/${SERVICE_NAME}" + + $SUDO rc-update add "$SERVICE_NAME" default 2>/dev/null || die "Failed to register service" + $SUDO rc-service "$SERVICE_NAME" start 2>/dev/null || die "Failed to start OpenRC service" + else + say "No service manager found. You can start it manually with:" + if [ -n "$SUDO" ]; then + say " sudo -u telemt ${INSTALL_DIR}/${BIN_NAME} ${CONFIG_FILE}" + else + say " su -s /bin/sh telemt -c '${INSTALL_DIR}/${BIN_NAME} ${CONFIG_FILE}'" + fi + fi +} + +kill_user_procs() { + say "Ensuring $BIN_NAME processes are killed..." + + if pkill_cmd="$(command -v pkill 2>/dev/null)"; then + $SUDO "$pkill_cmd" -u telemt "$BIN_NAME" 2>/dev/null || true + sleep 1 + $SUDO "$pkill_cmd" -9 -u telemt "$BIN_NAME" 2>/dev/null || true + elif killall_cmd="$(command -v killall 2>/dev/null)"; then + $SUDO "$killall_cmd" "$BIN_NAME" 2>/dev/null || true + sleep 1 + $SUDO "$killall_cmd" -9 "$BIN_NAME" 2>/dev/null || true + fi +} + +uninstall() { + purge_data=0 + [ "$ACTION" = "purge" ] && purge_data=1 + + say "Uninstalling $BIN_NAME..." + stop_service + + if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then + $SUDO systemctl disable "$SERVICE_NAME" 2>/dev/null || true + $SUDO rm -f "/etc/systemd/system/${SERVICE_NAME}.service" + $SUDO systemctl daemon-reload || true + elif command -v rc-update >/dev/null 2>&1; then + $SUDO rc-update del "$SERVICE_NAME" 2>/dev/null || true + $SUDO rm -f "/etc/init.d/${SERVICE_NAME}" + fi + + kill_user_procs + + $SUDO rm -f "${INSTALL_DIR}/${BIN_NAME}" + + $SUDO userdel telemt 2>/dev/null || $SUDO deluser telemt 2>/dev/null || true + $SUDO groupdel telemt 2>/dev/null || $SUDO delgroup telemt 2>/dev/null || true + + if [ "$purge_data" -eq 1 ]; then + say "Purging configuration and data..." + $SUDO rm -rf "$CONFIG_DIR" "$WORK_DIR" + else + say "Note: Configuration in $CONFIG_DIR was kept. Run with '--purge' to remove it." + fi + + say "Uninstallation complete." + exit 0 +} + +# ============================================================================ +# Main Entry Point +# ============================================================================ + +case "$ACTION" in + help) + show_help ;; - *) - URL="https://github.com/$REPO/releases/download/${VERSION}/${BIN_NAME}-${ARCH}-linux-${LIBC}.tar.gz" + uninstall|purge) + verify_common + uninstall + ;; + install) + say "Starting installation..." + verify_common + verify_install_deps + + ARCH="$(detect_arch)" + LIBC="$(detect_libc)" + say "Detected system: $ARCH-linux-$LIBC" + + FILE_NAME="${BIN_NAME}-${ARCH}-linux-${LIBC}.tar.gz" + FILE_NAME="$(printf '%s' "$FILE_NAME" | tr -d ' \t\n\r')" + + if [ "$TARGET_VERSION" = "latest" ]; then + DL_URL="https://github.com/${REPO}/releases/latest/download/${FILE_NAME}" + else + DL_URL="https://github.com/${REPO}/releases/download/${TARGET_VERSION}/${FILE_NAME}" + fi + + TEMP_DIR="$(mktemp -d)" || die "Failed to create temp directory" + if [ -z "$TEMP_DIR" ] || [ ! -d "$TEMP_DIR" ]; then + die "Temp directory creation failed" + fi + + say "Downloading from $DL_URL..." + fetch_file "$DL_URL" "${TEMP_DIR}/archive.tar.gz" || die "Download failed (check version or network)" + + gzip -dc "${TEMP_DIR}/archive.tar.gz" | tar -xf - -C "$TEMP_DIR" || die "Extraction failed" + + EXTRACTED_BIN="$(find "$TEMP_DIR" -type f -name "$BIN_NAME" -print 2>/dev/null | head -n 1)" + [ -z "$EXTRACTED_BIN" ] && die "Binary '$BIN_NAME' not found in archive" + + ensure_user_group + setup_dirs + stop_service + + say "Installing binary..." + install_binary "$EXTRACTED_BIN" "${INSTALL_DIR}/${BIN_NAME}" + + install_config + install_service + + say "" + say "=============================================" + say "Installation complete!" + say "=============================================" + if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then + say "To check the logs, run:" + say " journalctl -u $SERVICE_NAME -f" + say "" + fi + say "To get user connection links, run:" + if command -v jq >/dev/null 2>&1; then + say " curl -s http://127.0.0.1:9091/v1/users | jq -r '.data[] | \"User: \\(.username)\\n\\(.links.tls[0] // empty)\"'" + else + say " curl -s http://127.0.0.1:9091/v1/users" + say " (Note: Install 'jq' package to see the links nicely formatted)" + fi ;; esac - -TMPDIR="$(mktemp -d)" -trap 'rm -rf "$TMPDIR"' EXIT INT TERM - -say "Installing $BIN_NAME ($VERSION) for $ARCH-linux-$LIBC..." -fetch_to_stdout "$URL" | tar -xzf - -C "$TMPDIR" - -[ -f "$TMPDIR/$BIN_NAME" ] || die "archive did not contain $BIN_NAME" - -install_binary "$TMPDIR/$BIN_NAME" "$INSTALL_DIR/$BIN_NAME" - -say "Installed: $INSTALL_DIR/$BIN_NAME" -"$INSTALL_DIR/$BIN_NAME" --version 2>/dev/null || true From bdac6e348003cb739dbb413183dc4517f00ceb60 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Mar 2026 00:59:37 +0300 Subject: [PATCH 07/11] Create CODE_OF_CONDUCT.md --- CODE_OF_CONDUCT.md | 163 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 CODE_OF_CONDUCT.md diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 0000000..6d10cb0 --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,163 @@ +# TELEMT Code of Conduct + +## 1. Purpose + +Telemt exists to solve technical problems. + +It is not a platform for ideology, politics, or personal agendas. + +All interaction here is defined by systems, constraints, and outcomes. + +Technology has consequences. +Responsibility is inherent. + +> **Zweck bestimmt die Form.** +> Purpose defines form. + +--- + +## 2. Principles + +* **Technical over emotional** + Arguments are grounded in data, logs, reproducible cases, or clear reasoning. + +* **Clarity over noise** + Communication is structured, concise, and relevant. + +* **Independence** + Telemt does not represent any state, ideology, or organization. + +* **Open participation** + Access is open. Standards are not. + +* **Responsibility over capability** + Capability does not justify careless use. + +* **Cooperation over friction** + Progress is achieved through coordination and mutual support. + +> **Fakten sind nicht verhandelbar.** +> Facts are not negotiable. + +--- + +## 3. Expected Behavior + +Participants are expected to: + +* Communicate directly and respectfully +* Support claims with evidence +* Stay within technical scope +* Accept critique and provide it constructively +* Reduce noise, duplication, and ambiguity +* Help others reach correct and reproducible outcomes +* Act in a way that improves the system as a whole + +> **Wer behauptet, belegt.** +> Whoever claims, proves. + +--- + +## 4. Unacceptable Behavior + +The following is not allowed: + +* Personal attacks, insults, harassment, intimidation +* Political discourse, propaganda, ideological conflict +* Off-topic or disruptive discussion +* Spam, flooding, or repeated low-quality input +* Misinformation presented as fact +* Attempts to degrade or destabilize Telemt +* Use of Telemt or its space to enable harm + +> **Störung ist kein Beitrag.** +> Disruption is not contribution. + +--- + +## 5. Security and Misuse + +Telemt is intended for lawful and responsible use. + +* Do not use it to plan, coordinate, or execute harm +* Do not publish vulnerabilities without responsible disclosure +* Report security issues privately where possible + +Security is both technical and behavioral. + +> **Verantwortung endet nicht am Code.** +> Responsibility does not end at the code. + +--- + +## 6. Scope + +This Code of Conduct applies to all official spaces: + +* Source repositories (issues, pull requests, discussions) +* Documentation +* Communication channels associated with Telemt + +--- + +## 7. Enforcement + +Maintainers may act to preserve the integrity of Telemt: + +* Remove content +* Lock discussions +* Reject contributions +* Restrict or ban participants + +Actions are taken to maintain function, continuity, and signal quality. + +> **Ordnung ist Voraussetzung der Funktion.** +> Order is the precondition of function. + +--- + +## 8. Maintainer Authority + +Maintainers have final authority in interpretation and enforcement. + +Authority exists to ensure continuity, consistency, and technical direction. + +--- + +## 9. Final + +Telemt is built on discipline, structure, and shared intent. + +Signal over noise. +Facts over opinion. +Systems over rhetoric. + +Work here is collective. +Outcomes are shared. +Responsibility is distributed. + +> **Ordnung ist Voraussetzung der Freiheit.** + +If you contribute — contribute with precision. +If you speak — speak with substance. +If you engage — engage constructively. + +--- + +## 10. After All + +Systems outlive intentions. + +What is built will be used. +What is released will propagate. +What is maintained will define the future state. + +There is no neutral infrastructure. + +> **Jedes System trägt Verantwortung.** +> Every system carries responsibility. + +Stability requires discipline. +Freedom requires structure. + +In the end, the system reflects its contributors. From c8ffc23cf7dd909e96d31e56aa74fe95f7c69f79 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Mar 2026 01:18:02 +0300 Subject: [PATCH 08/11] Update CODE_OF_CONDUCT.md --- CODE_OF_CONDUCT.md | 113 +++++++++++++++++++++++++++++++-------------- 1 file changed, 79 insertions(+), 34 deletions(-) diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index 6d10cb0..84c5f77 100644 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -1,17 +1,19 @@ -# TELEMT Code of Conduct +# Code of Conduct ## 1. Purpose Telemt exists to solve technical problems. -It is not a platform for ideology, politics, or personal agendas. +Telemt is open to contributors who want to learn, improve and build meaningful systems together. -All interaction here is defined by systems, constraints, and outcomes. +It is a place for building, testing, reasoning, documenting, and improving systems. -Technology has consequences. -Responsibility is inherent. +Discussions that advance this work are in scope. Discussions that divert it are not. + +Technology has consequences. Responsibility is inherent. > **Zweck bestimmt die Form.** + > Purpose defines form. --- @@ -24,20 +26,24 @@ Responsibility is inherent. * **Clarity over noise** Communication is structured, concise, and relevant. -* **Independence** - Telemt does not represent any state, ideology, or organization. +* **Openness with standards** + Participation is open. The work remains disciplined. -* **Open participation** - Access is open. Standards are not. +* **Independence of judgment** + Claims are evaluated on technical merit, not affiliation or posture. * **Responsibility over capability** Capability does not justify careless use. * **Cooperation over friction** - Progress is achieved through coordination and mutual support. + Progress depends on coordination, mutual support, and honest review. -> **Fakten sind nicht verhandelbar.** -> Facts are not negotiable. +* **Good intent, rigorous method** + Assume good intent, but require rigor. + +> **Aussagen gelten nach ihrer Begründung.** + +> Claims are weighed by evidence. --- @@ -53,7 +59,12 @@ Participants are expected to: * Help others reach correct and reproducible outcomes * Act in a way that improves the system as a whole +Precision is learned. + +New contributors are welcome. They are expected to grow into these standards. Existing contributors are expected to make that growth possible. + > **Wer behauptet, belegt.** + > Whoever claims, proves. --- @@ -62,22 +73,25 @@ Participants are expected to: The following is not allowed: -* Personal attacks, insults, harassment, intimidation -* Political discourse, propaganda, ideological conflict -* Off-topic or disruptive discussion +* Personal attacks, insults, harassment, or intimidation +* Repeatedly derailing discussion away from Telemt’s purpose * Spam, flooding, or repeated low-quality input * Misinformation presented as fact -* Attempts to degrade or destabilize Telemt -* Use of Telemt or its space to enable harm +* Attempts to degrade, destabilize, or exhaust Telemt or its participants +* Use of Telemt or its spaces to enable harm + +Telemt is not a venue for disputes that displace technical work. +Such discussions may be closed, removed, or redirected. > **Störung ist kein Beitrag.** + > Disruption is not contribution. --- ## 5. Security and Misuse -Telemt is intended for lawful and responsible use. +Telemt is intended for responsible use. * Do not use it to plan, coordinate, or execute harm * Do not publish vulnerabilities without responsible disclosure @@ -86,11 +100,24 @@ Telemt is intended for lawful and responsible use. Security is both technical and behavioral. > **Verantwortung endet nicht am Code.** + > Responsibility does not end at the code. --- -## 6. Scope +## 6. Openness + +Telemt is open to contributors of different backgrounds, experience levels, and working styles. + +Standards are public, legible, and applied to the work itself. + +Questions are welcome. Careful disagreement is welcome. Honest correction is welcome. + +Gatekeeping by obscurity, status signaling, or hostility is not. + +--- + +## 7. Scope This Code of Conduct applies to all official spaces: @@ -100,31 +127,43 @@ This Code of Conduct applies to all official spaces: --- -## 7. Enforcement +## 8. Maintainer Stewardship -Maintainers may act to preserve the integrity of Telemt: +Maintainers are responsible for final decisions in matters of conduct, scope, and direction. -* Remove content -* Lock discussions -* Reject contributions -* Restrict or ban participants +This responsibility is stewardship: preserving continuity, protecting signal, maintaining standards, and keeping Telemt workable for others. -Actions are taken to maintain function, continuity, and signal quality. +Judgment should be exercised with restraint, consistency, and institutional responsibility. + +Not every decision requires extended debate. +Not every intervention requires public explanation. + +All decisions are expected to serve the durability, clarity, and integrity of Telemt. > **Ordnung ist Voraussetzung der Funktion.** + > Order is the precondition of function. --- -## 8. Maintainer Authority +## 9. Enforcement -Maintainers have final authority in interpretation and enforcement. +Maintainers may act to preserve the integrity of Telemt, including by: -Authority exists to ensure continuity, consistency, and technical direction. +* Removing content +* Locking discussions +* Rejecting contributions +* Restricting or banning participants + +Actions are taken to maintain function, continuity, and signal quality. + +Where possible, correction is preferred to exclusion. + +Where necessary, exclusion is preferred to decay. --- -## 9. Final +## 10. Final Telemt is built on discipline, structure, and shared intent. @@ -132,19 +171,23 @@ Signal over noise. Facts over opinion. Systems over rhetoric. -Work here is collective. +Work is collective. Outcomes are shared. Responsibility is distributed. +Precision is learned. +Rigor is expected. +Help is part of the work. + > **Ordnung ist Voraussetzung der Freiheit.** -If you contribute — contribute with precision. +If you contribute — contribute with care. If you speak — speak with substance. If you engage — engage constructively. --- -## 10. After All +## 11. After All Systems outlive intentions. @@ -152,12 +195,14 @@ What is built will be used. What is released will propagate. What is maintained will define the future state. -There is no neutral infrastructure. +There is no neutral infrastructure, only infrastructure shaped well or poorly. > **Jedes System trägt Verantwortung.** + > Every system carries responsibility. Stability requires discipline. Freedom requires structure. +Trust requires honesty. In the end, the system reflects its contributors. From 6f9aef7bb458d5447acc29d494151f183602e43f Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Mar 2026 13:08:35 +0300 Subject: [PATCH 09/11] ME Writer stuck-up in draining-state fixes Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/metrics.rs | 58 +++++++++++++ src/stats/mod.rs | 32 +++++++ src/transport/middle_proxy/health.rs | 4 + .../middle_proxy/health_regression_tests.rs | 84 +++++++++++++++++++ src/transport/middle_proxy/pool_writer.rs | 26 +++++- 5 files changed, 203 insertions(+), 1 deletion(-) diff --git a/src/metrics.rs b/src/metrics.rs index 3de9896..4f7f4b6 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1692,6 +1692,57 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_writer_close_signal_drop_total Close-signal drops for already-removed ME writers" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_close_signal_drop_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_close_signal_drop_total {}", + if me_allows_normal { + stats.get_me_writer_close_signal_drop_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_close_signal_channel_full_total Close-signal drops caused by full writer command channels" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_writer_close_signal_channel_full_total counter" + ); + let _ = writeln!( + out, + "telemt_me_writer_close_signal_channel_full_total {}", + if me_allows_normal { + stats.get_me_writer_close_signal_channel_full_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_draining_writers_reap_progress_total Draining-writer removals processed by reap cleanup" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_draining_writers_reap_progress_total counter" + ); + let _ = writeln!( + out, + "telemt_me_draining_writers_reap_progress_total {}", + if me_allows_normal { + stats.get_me_draining_writers_reap_progress_total() + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals"); let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter"); let _ = writeln!( @@ -2124,6 +2175,13 @@ mod tests { assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter")); assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter")); + assert!(output.contains( + "# TYPE telemt_me_writer_close_signal_channel_full_total counter" + )); + assert!(output.contains( + "# TYPE telemt_me_draining_writers_reap_progress_total counter" + )); assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter")); assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter")); assert!(output.contains( diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 83cd03d..ad1d16b 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -123,6 +123,9 @@ pub struct Stats { pool_drain_soft_evict_total: AtomicU64, pool_drain_soft_evict_writer_total: AtomicU64, pool_stale_pick_total: AtomicU64, + me_writer_close_signal_drop_total: AtomicU64, + me_writer_close_signal_channel_full_total: AtomicU64, + me_draining_writers_reap_progress_total: AtomicU64, me_writer_removed_total: AtomicU64, me_writer_removed_unexpected_total: AtomicU64, me_refill_triggered_total: AtomicU64, @@ -734,6 +737,24 @@ impl Stats { self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_writer_close_signal_drop_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_close_signal_drop_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_close_signal_channel_full_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_close_signal_channel_full_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_draining_writers_reap_progress_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_draining_writers_reap_progress_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_writer_removed_total(&self) { if self.telemetry_me_allows_debug() { self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed); @@ -1259,6 +1280,17 @@ impl Stats { pub fn get_pool_stale_pick_total(&self) -> u64 { self.pool_stale_pick_total.load(Ordering::Relaxed) } + pub fn get_me_writer_close_signal_drop_total(&self) -> u64 { + self.me_writer_close_signal_drop_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_close_signal_channel_full_total(&self) -> u64 { + self.me_writer_close_signal_channel_full_total + .load(Ordering::Relaxed) + } + pub fn get_me_draining_writers_reap_progress_total(&self) -> u64 { + self.me_draining_writers_reap_progress_total + .load(Ordering::Relaxed) + } pub fn get_me_writer_removed_total(&self) -> u64 { self.me_writer_removed_total.load(Ordering::Relaxed) } diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 0b9b749..6d0af64 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -314,6 +314,8 @@ pub(super) async fn reap_draining_writers( } pool.stats.increment_pool_force_close_total(); pool.remove_writer_and_close_clients(writer_id).await; + pool.stats + .increment_me_draining_writers_reap_progress_total(); closed_total = closed_total.saturating_add(1); } for writer_id in empty_writer_ids { @@ -324,6 +326,8 @@ pub(super) async fn reap_draining_writers( continue; } pool.remove_writer_and_close_clients(writer_id).await; + pool.stats + .increment_me_draining_writers_reap_progress_total(); closed_total = closed_total.saturating_add(1); } diff --git a/src/transport/middle_proxy/health_regression_tests.rs b/src/transport/middle_proxy/health_regression_tests.rs index 606f7e5..565ac74 100644 --- a/src/transport/middle_proxy/health_regression_tests.rs +++ b/src/transport/middle_proxy/health_regression_tests.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; use std::time::{Duration, Instant}; +use bytes::Bytes; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; @@ -209,6 +210,89 @@ async fn reap_draining_writers_removes_empty_draining_writers() { assert_eq!(current_writer_ids(&pool).await, vec![3]); } +#[tokio::test] +async fn reap_draining_writers_does_not_block_on_stuck_writer_close_signal() { + let pool = make_pool(128).await; + let now_epoch_secs = MePool::now_epoch_secs(); + + let (blocked_tx, blocked_rx) = mpsc::channel::(1); + assert!( + blocked_tx + .try_send(WriterCommand::Data(Bytes::from_static(b"stuck"))) + .is_ok() + ); + let blocked_rx_guard = tokio::spawn(async move { + let _hold_rx = blocked_rx; + tokio::time::sleep(Duration::from_secs(30)).await; + }); + + let blocked_writer_id = 90u64; + let blocked_writer = MeWriter { + id: blocked_writer_id, + addr: SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + 4500 + blocked_writer_id as u16, + ), + source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST), + writer_dc: 2, + generation: 1, + contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())), + created_at: Instant::now() - Duration::from_secs(blocked_writer_id), + tx: blocked_tx.clone(), + cancel: CancellationToken::new(), + degraded: Arc::new(AtomicBool::new(false)), + rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)), + draining: Arc::new(AtomicBool::new(true)), + draining_started_at_epoch_secs: Arc::new(AtomicU64::new( + now_epoch_secs.saturating_sub(120), + )), + drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)), + allow_drain_fallback: Arc::new(AtomicBool::new(false)), + }; + pool.writers.write().await.push(blocked_writer); + pool.registry + .register_writer(blocked_writer_id, blocked_tx) + .await; + pool.conn_count.fetch_add(1, Ordering::Relaxed); + + insert_draining_writer(&pool, 91, now_epoch_secs.saturating_sub(110), 0, 0).await; + + let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); + + let reap_res = tokio::time::timeout( + Duration::from_millis(500), + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed), + ) + .await; + blocked_rx_guard.abort(); + + assert!(reap_res.is_ok(), "reap should not block on close signal"); + assert!(current_writer_ids(&pool).await.is_empty()); + assert_eq!(pool.stats.get_me_writer_close_signal_drop_total(), 2); + assert_eq!(pool.stats.get_me_writer_close_signal_channel_full_total(), 1); + assert_eq!(pool.stats.get_me_draining_writers_reap_progress_total(), 2); + let activity = pool.registry.writer_activity_snapshot().await; + assert!(!activity.bound_clients_by_writer.contains_key(&blocked_writer_id)); + assert!(!activity.bound_clients_by_writer.contains_key(&91)); + let (probe_conn_id, _rx) = pool.registry.register().await; + assert!( + !pool.registry + .bind_writer( + probe_conn_id, + blocked_writer_id, + ConnMeta { + target_dc: 2, + client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6400), + our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443), + proto_flags: 0, + }, + ) + .await + ); + let _ = pool.registry.unregister(probe_conn_id).await; +} + #[tokio::test] async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() { let pool = make_pool(2).await; diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 4035111..a6186b6 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -8,6 +8,7 @@ use bytes::Bytes; use bytes::BytesMut; use rand::Rng; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; @@ -525,6 +526,11 @@ impl MePool { self.conn_count.fetch_sub(1, Ordering::Relaxed); } } + // State invariant: + // - writer is removed from `self.writers` (pool visibility), + // - writer is removed from registry routing/binding maps via `writer_lost`. + // The close command below is only a best-effort accelerator for task shutdown. + // Cleanup progress must never depend on command-channel availability. let conns = self.registry.writer_lost(writer_id).await; { let mut tracker = self.ping_tracker.lock().await; @@ -532,7 +538,25 @@ impl MePool { } self.rtt_stats.lock().await.remove(&writer_id); if let Some(tx) = close_tx { - let _ = tx.send(WriterCommand::Close).await; + match tx.try_send(WriterCommand::Close) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + self.stats.increment_me_writer_close_signal_drop_total(); + self.stats + .increment_me_writer_close_signal_channel_full_total(); + debug!( + writer_id, + "Skipping close signal for removed writer: command channel is full" + ); + } + Err(TrySendError::Closed(_)) => { + self.stats.increment_me_writer_close_signal_drop_total(); + debug!( + writer_id, + "Skipping close signal for removed writer: command channel is closed" + ); + } + } } if trigger_refill && let Some(addr) = removed_addr From 3279f6d46a5744b754741abf828e6b4d50c3a3ac Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Mar 2026 14:07:20 +0300 Subject: [PATCH 10/11] Cleanup-path as non-blocking Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/load.rs | 51 ++++++++++++++ src/transport/middle_proxy/pool_writer.rs | 8 +-- src/transport/middle_proxy/reader.rs | 32 ++++++--- src/transport/middle_proxy/registry.rs | 85 +++++++++++++++++------ src/transport/middle_proxy/send.rs | 28 +++++--- 5 files changed, 158 insertions(+), 46 deletions(-) diff --git a/src/config/load.rs b/src/config/load.rs index 0635f80..c296697 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -612,6 +612,11 @@ impl ProxyConfig { "general.me_route_backpressure_base_timeout_ms must be > 0".to_string(), )); } + if config.general.me_route_backpressure_base_timeout_ms > 5000 { + return Err(ProxyError::Config( + "general.me_route_backpressure_base_timeout_ms must be within [1, 5000]".to_string(), + )); + } if config.general.me_route_backpressure_high_timeout_ms < config.general.me_route_backpressure_base_timeout_ms @@ -620,6 +625,11 @@ impl ProxyConfig { "general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(), )); } + if config.general.me_route_backpressure_high_timeout_ms > 5000 { + return Err(ProxyError::Config( + "general.me_route_backpressure_high_timeout_ms must be within [1, 5000]".to_string(), + )); + } if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) { return Err(ProxyError::Config( @@ -1624,6 +1634,47 @@ mod tests { let _ = std::fs::remove_file(path_valid); } + #[test] + fn me_route_backpressure_base_timeout_ms_out_of_range_is_rejected() { + let toml = r#" + [general] + me_route_backpressure_base_timeout_ms = 5001 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_route_backpressure_base_timeout_ms_out_of_range_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.me_route_backpressure_base_timeout_ms must be within [1, 5000]")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn me_route_backpressure_high_timeout_ms_out_of_range_is_rejected() { + let toml = r#" + [general] + me_route_backpressure_base_timeout_ms = 100 + me_route_backpressure_high_timeout_ms = 5001 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_route_backpressure_high_timeout_ms_out_of_range_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.me_route_backpressure_high_timeout_ms must be within [1, 5000]")); + let _ = std::fs::remove_file(path); + } + #[test] fn me_route_no_writer_wait_ms_out_of_range_is_rejected() { let toml = r#" diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index a6186b6..7d78b84 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -492,11 +492,9 @@ impl MePool { } pub(crate) async fn remove_writer_and_close_clients(self: &Arc, writer_id: u64) { - let conns = self.remove_writer_only(writer_id).await; - for bound in conns { - let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await; - let _ = self.registry.unregister(bound.conn_id).await; - } + // Full client cleanup now happens inside `registry.writer_lost` to keep + // writer reap/remove paths strictly non-blocking per connection. + let _ = self.remove_writer_only(writer_id).await; } async fn remove_writer_only(self: &Arc, writer_id: u64) -> Vec { diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index 785bc2c..8b15fc1 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -8,6 +8,7 @@ use bytes::{Bytes, BytesMut}; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; use tokio::sync::{Mutex, mpsc}; +use tokio::sync::mpsc::error::TrySendError; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; @@ -173,12 +174,12 @@ pub(crate) async fn reader_loop( } else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 { let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); debug!(cid, "RPC_CLOSE_EXT from ME"); - reg.route(cid, MeResponse::Close).await; + let _ = reg.route_nowait(cid, MeResponse::Close).await; reg.unregister(cid).await; } else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 { let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); debug!(cid, "RPC_CLOSE_CONN from ME"); - reg.route(cid, MeResponse::Close).await; + let _ = reg.route_nowait(cid, MeResponse::Close).await; reg.unregister(cid).await; } else if pt == RPC_PING_U32 && body.len() >= 8 { let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); @@ -186,13 +187,15 @@ pub(crate) async fn reader_loop( let mut pong = Vec::with_capacity(12); pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes()); pong.extend_from_slice(&ping_id.to_le_bytes()); - if tx - .send(WriterCommand::DataAndFlush(Bytes::from(pong))) - .await - .is_err() - { - warn!("PONG send failed"); - break; + match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(pong))) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + debug!(ping_id, "PONG dropped: writer command channel is full"); + } + Err(TrySendError::Closed(_)) => { + warn!("PONG send failed: writer channel closed"); + break; + } } } else if pt == RPC_PONG_U32 && body.len() >= 8 { let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); @@ -232,6 +235,13 @@ async fn send_close_conn(tx: &mpsc::Sender, conn_id: u64) { let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes()); - - let _ = tx.send(WriterCommand::DataAndFlush(Bytes::from(p))).await; + match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + debug!(conn_id, "ME close_conn signal skipped: writer command channel is full"); + } + Err(TrySendError::Closed(_)) => { + debug!(conn_id, "ME close_conn signal skipped: writer command channel is closed"); + } + } } diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index b8a926e..2ee55c1 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -169,6 +169,7 @@ impl ConnRegistry { None } + #[allow(dead_code)] pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult { let tx = { let inner = self.inner.read().await; @@ -445,30 +446,38 @@ impl ConnRegistry { } pub async fn writer_lost(&self, writer_id: u64) -> Vec { - let mut inner = self.inner.write().await; - inner.writers.remove(&writer_id); - inner.last_meta_for_writer.remove(&writer_id); - inner.writer_idle_since_epoch_secs.remove(&writer_id); - let conns = inner - .conns_for_writer - .remove(&writer_id) - .unwrap_or_default() - .into_iter() - .collect::>(); - + let mut close_txs = Vec::>::new(); let mut out = Vec::new(); - for conn_id in conns { - if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) { - continue; - } - inner.writer_for_conn.remove(&conn_id); - if let Some(m) = inner.meta.get(&conn_id) { - out.push(BoundConn { - conn_id, - meta: m.clone(), - }); + { + let mut inner = self.inner.write().await; + inner.writers.remove(&writer_id); + inner.last_meta_for_writer.remove(&writer_id); + inner.writer_idle_since_epoch_secs.remove(&writer_id); + let conns = inner + .conns_for_writer + .remove(&writer_id) + .unwrap_or_default() + .into_iter() + .collect::>(); + + for conn_id in conns { + if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) { + continue; + } + inner.writer_for_conn.remove(&conn_id); + if let Some(client_tx) = inner.map.remove(&conn_id) { + close_txs.push(client_tx); + } + if let Some(meta) = inner.meta.remove(&conn_id) { + out.push(BoundConn { conn_id, meta }); + } } } + + for client_tx in close_txs { + let _ = client_tx.try_send(MeResponse::Close); + } + out } @@ -491,6 +500,7 @@ impl ConnRegistry { #[cfg(test)] mod tests { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::time::Duration; use super::ConnMeta; use super::ConnRegistry; @@ -663,6 +673,39 @@ mod tests { assert!(registry.is_writer_empty(20).await); } + #[tokio::test] + async fn writer_lost_removes_bound_conn_from_registry_and_signals_close() { + let registry = ConnRegistry::new(); + let (conn_id, mut rx) = registry.register().await; + let (writer_tx, _writer_rx) = tokio::sync::mpsc::channel(8); + registry.register_writer(10, writer_tx).await; + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443); + + assert!( + registry + .bind_writer( + conn_id, + 10, + ConnMeta { + target_dc: 2, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await + ); + + let lost = registry.writer_lost(10).await; + assert_eq!(lost.len(), 1); + assert_eq!(lost[0].conn_id, conn_id); + assert!(registry.get_writer(conn_id).await.is_none()); + assert!(registry.get_meta(conn_id).await.is_none()); + assert_eq!(registry.unregister(conn_id).await, None); + let close = tokio::time::timeout(Duration::from_millis(50), rx.recv()).await; + assert!(matches!(close, Ok(Some(MeResponse::Close)))); + } + #[tokio::test] async fn bind_writer_rejects_unregistered_writer() { let registry = ConnRegistry::new(); diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 1c255ef..6791064 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -643,13 +643,19 @@ impl MePool { let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes()); - if w.tx - .send(WriterCommand::DataAndFlush(Bytes::from(p))) - .await - .is_err() - { - debug!("ME close write failed"); - self.remove_writer_and_close_clients(w.writer_id).await; + match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + debug!( + conn_id, + writer_id = w.writer_id, + "ME close skipped: writer command channel is full" + ); + } + Err(TrySendError::Closed(_)) => { + debug!("ME close write failed"); + self.remove_writer_and_close_clients(w.writer_id).await; + } } } else { debug!(conn_id, "ME close skipped (writer missing)"); @@ -666,8 +672,12 @@ impl MePool { p.extend_from_slice(&conn_id.to_le_bytes()); match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) { Ok(()) => {} - Err(TrySendError::Full(cmd)) => { - let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await; + Err(TrySendError::Full(_)) => { + debug!( + conn_id, + writer_id = w.writer_id, + "ME close_conn skipped: writer command channel is full" + ); } Err(TrySendError::Closed(_)) => { debug!(conn_id, "ME close_conn skipped: writer channel closed"); From ae3ced8e7c41f20ac2ed9943f275f8769372ad7e Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Mar 2026 14:42:59 +0300 Subject: [PATCH 11/11] Update Cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index b289231..d4ef990 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.3.23" +version = "3.3.24" edition = "2024" [dependencies]