Compare commits

..

No commits in common. "dd8ef4d996f155ac6478d032ddca56c90416ebb2" and "062464175effb3567552894984056bdebba294cd" have entirely different histories.

23 changed files with 449 additions and 1761 deletions

View File

@ -1,208 +0,0 @@
# Code of Conduct
## 1. Purpose
Telemt exists to solve technical problems.
Telemt is open to contributors who want to learn, improve and build meaningful systems together.
It is a place for building, testing, reasoning, documenting, and improving systems.
Discussions that advance this work are in scope. Discussions that divert it are not.
Technology has consequences. Responsibility is inherent.
> **Zweck bestimmt die Form.**
> Purpose defines form.
---
## 2. Principles
* **Technical over emotional**
Arguments are grounded in data, logs, reproducible cases, or clear reasoning.
* **Clarity over noise**
Communication is structured, concise, and relevant.
* **Openness with standards**
Participation is open. The work remains disciplined.
* **Independence of judgment**
Claims are evaluated on technical merit, not affiliation or posture.
* **Responsibility over capability**
Capability does not justify careless use.
* **Cooperation over friction**
Progress depends on coordination, mutual support, and honest review.
* **Good intent, rigorous method**
Assume good intent, but require rigor.
> **Aussagen gelten nach ihrer Begründung.**
> Claims are weighed by evidence.
---
## 3. Expected Behavior
Participants are expected to:
* Communicate directly and respectfully
* Support claims with evidence
* Stay within technical scope
* Accept critique and provide it constructively
* Reduce noise, duplication, and ambiguity
* Help others reach correct and reproducible outcomes
* Act in a way that improves the system as a whole
Precision is learned.
New contributors are welcome. They are expected to grow into these standards. Existing contributors are expected to make that growth possible.
> **Wer behauptet, belegt.**
> Whoever claims, proves.
---
## 4. Unacceptable Behavior
The following is not allowed:
* Personal attacks, insults, harassment, or intimidation
* Repeatedly derailing discussion away from Telemts purpose
* Spam, flooding, or repeated low-quality input
* Misinformation presented as fact
* Attempts to degrade, destabilize, or exhaust Telemt or its participants
* Use of Telemt or its spaces to enable harm
Telemt is not a venue for disputes that displace technical work.
Such discussions may be closed, removed, or redirected.
> **Störung ist kein Beitrag.**
> Disruption is not contribution.
---
## 5. Security and Misuse
Telemt is intended for responsible use.
* Do not use it to plan, coordinate, or execute harm
* Do not publish vulnerabilities without responsible disclosure
* Report security issues privately where possible
Security is both technical and behavioral.
> **Verantwortung endet nicht am Code.**
> Responsibility does not end at the code.
---
## 6. Openness
Telemt is open to contributors of different backgrounds, experience levels, and working styles.
Standards are public, legible, and applied to the work itself.
Questions are welcome. Careful disagreement is welcome. Honest correction is welcome.
Gatekeeping by obscurity, status signaling, or hostility is not.
---
## 7. Scope
This Code of Conduct applies to all official spaces:
* Source repositories (issues, pull requests, discussions)
* Documentation
* Communication channels associated with Telemt
---
## 8. Maintainer Stewardship
Maintainers are responsible for final decisions in matters of conduct, scope, and direction.
This responsibility is stewardship: preserving continuity, protecting signal, maintaining standards, and keeping Telemt workable for others.
Judgment should be exercised with restraint, consistency, and institutional responsibility.
Not every decision requires extended debate.
Not every intervention requires public explanation.
All decisions are expected to serve the durability, clarity, and integrity of Telemt.
> **Ordnung ist Voraussetzung der Funktion.**
> Order is the precondition of function.
---
## 9. Enforcement
Maintainers may act to preserve the integrity of Telemt, including by:
* Removing content
* Locking discussions
* Rejecting contributions
* Restricting or banning participants
Actions are taken to maintain function, continuity, and signal quality.
Where possible, correction is preferred to exclusion.
Where necessary, exclusion is preferred to decay.
---
## 10. Final
Telemt is built on discipline, structure, and shared intent.
Signal over noise.
Facts over opinion.
Systems over rhetoric.
Work is collective.
Outcomes are shared.
Responsibility is distributed.
Precision is learned.
Rigor is expected.
Help is part of the work.
> **Ordnung ist Voraussetzung der Freiheit.**
If you contribute — contribute with care.
If you speak — speak with substance.
If you engage — engage constructively.
---
## 11. After All
Systems outlive intentions.
What is built will be used.
What is released will propagate.
What is maintained will define the future state.
There is no neutral infrastructure, only infrastructure shaped well or poorly.
> **Jedes System trägt Verantwortung.**
> Every system carries responsibility.
Stability requires discipline.
Freedom requires structure.
Trust requires honesty.
In the end, the system reflects its contributors.

View File

@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.3.24" version = "3.3.21"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -8,287 +8,282 @@ This document lists all configuration keys accepted by `config.toml`.
## Top-level keys ## Top-level keys
| Parameter | Type | Default | Constraints / validation | Description | | Parameter | Type | Description |
|---|---|---|---|---| |---|---|---|
| include | `String` (special directive) | `null` | — | Includes another TOML file with `include = "relative/or/absolute/path.toml"`; includes are processed recursively before parsing. | | include | `String` (special directive) | Includes another TOML file with `include = "relative/or/absolute/path.toml"`; includes are processed recursively before parsing. |
| show_link | `"*" \| String[]` | `[]` (`ShowLink::None`) | — | Legacy top-level link visibility selector (`"*"` for all users or explicit usernames list). | | show_link | `"*" \| String[]` | Legacy top-level link visibility selector (`"*"` for all users or explicit usernames list). |
| dc_overrides | `Map<String, String[]>` | `{}` | — | Overrides DC endpoints for non-standard DCs; key is DC id string, value is `ip:port` list. | | dc_overrides | `Map<String, String[]>` | Overrides DC endpoints for non-standard DCs; key is DC id string, value is `ip:port` list. |
| default_dc | `u8 \| null` | `null` (effective fallback: `2` in ME routing) | — | Default DC index used for unmapped non-standard DCs. | | default_dc | `u8` | Default DC index used for unmapped non-standard DCs. |
## [general] ## [general]
| Parameter | Type | Default | Constraints / validation | Description | | Parameter | Type | Description |
|---|---|---|---|---| |---|---|---|
| data_path | `String \| null` | `null` | — | Optional runtime data directory path. | | data_path | `String` | Optional runtime data directory path. |
| prefer_ipv6 | `bool` | `false` | — | Prefer IPv6 where applicable in runtime logic. | | prefer_ipv6 | `bool` | Prefer IPv6 where applicable in runtime logic. |
| fast_mode | `bool` | `true` | — | Enables fast-path optimizations for traffic processing. | | fast_mode | `bool` | Enables fast-path optimizations for traffic processing. |
| use_middle_proxy | `bool` | `true` | none | Enables ME transport mode; if `false`, runtime falls back to direct DC routing. | | use_middle_proxy | `bool` | Enables Middle Proxy mode. |
| proxy_secret_path | `String \| null` | `"proxy-secret"` | Path may be `null`. | Path to Telegram infrastructure proxy-secret file used by ME handshake logic. | | proxy_secret_path | `String` | Path to proxy secret binary; can be auto-downloaded if absent. |
| proxy_config_v4_cache_path | `String \| null` | `"cache/proxy-config-v4.txt"` | — | Optional cache path for raw `getProxyConfig` (IPv4) snapshot. | | proxy_config_v4_cache_path | `String` | Optional cache path for raw `getProxyConfig` (IPv4) snapshot. |
| proxy_config_v6_cache_path | `String \| null` | `"cache/proxy-config-v6.txt"` | — | Optional cache path for raw `getProxyConfigV6` (IPv6) snapshot. | | proxy_config_v6_cache_path | `String` | Optional cache path for raw `getProxyConfigV6` (IPv6) snapshot. |
| ad_tag | `String \| null` | `null` | — | Global fallback ad tag (32 hex characters). | | ad_tag | `String` | Global fallback ad tag (32 hex characters). |
| middle_proxy_nat_ip | `IpAddr \| null` | `null` | Must be a valid IP when set. | Manual public NAT IP override used as ME address material when set. | | middle_proxy_nat_ip | `IpAddr` | Explicit public IP override for NAT environments. |
| middle_proxy_nat_probe | `bool` | `true` | Auto-forced to `true` when `use_middle_proxy = true`. | Enables ME NAT probing; runtime may force it on when ME mode is active. | | middle_proxy_nat_probe | `bool` | Enables NAT probing for Middle Proxy KDF/public address discovery. |
| middle_proxy_nat_stun | `String \| null` | `null` | Deprecated. Use `network.stun_servers`. | Deprecated legacy single STUN server for NAT probing. | | middle_proxy_nat_stun | `String` | Deprecated legacy single STUN server for NAT probing. |
| middle_proxy_nat_stun_servers | `String[]` | `[]` | Deprecated. Use `network.stun_servers`. | Deprecated legacy STUN list for NAT probing fallback. | | middle_proxy_nat_stun_servers | `String[]` | Deprecated legacy STUN list for NAT probing fallback. |
| stun_nat_probe_concurrency | `usize` | `8` | Must be `> 0`. | Maximum number of parallel STUN probes during NAT/public endpoint discovery. | | stun_nat_probe_concurrency | `usize` | Maximum concurrent STUN probes during NAT detection. |
| middle_proxy_pool_size | `usize` | `8` | none | Target size of active ME writer pool. | | middle_proxy_pool_size | `usize` | Target size of active Middle Proxy writer pool. |
| middle_proxy_warm_standby | `usize` | `16` | none | Reserved compatibility field in current runtime revision. | | middle_proxy_warm_standby | `usize` | Number of warm standby Middle-End connections. |
| me_init_retry_attempts | `u32` | `0` | `0..=1_000_000`. | Startup retries for ME pool initialization (`0` means unlimited). | | me_init_retry_attempts | `u32` | Startup retries for ME pool initialization (`0` means unlimited). |
| me2dc_fallback | `bool` | `true` | — | Allows fallback from ME mode to direct DC when ME startup fails. | | me2dc_fallback | `bool` | Allows fallback from ME mode to direct DC when ME startup fails. |
| me_keepalive_enabled | `bool` | `true` | none | Enables periodic ME keepalive/ping traffic. | | me_keepalive_enabled | `bool` | Enables ME keepalive padding frames. |
| me_keepalive_interval_secs | `u64` | `8` | none | Base ME keepalive interval in seconds. | | me_keepalive_interval_secs | `u64` | Keepalive interval in seconds. |
| me_keepalive_jitter_secs | `u64` | `2` | none | Keepalive jitter in seconds to reduce synchronized bursts. | | me_keepalive_jitter_secs | `u64` | Keepalive jitter in seconds. |
| me_keepalive_payload_random | `bool` | `true` | none | Randomizes keepalive payload bytes instead of fixed zero payload. | | me_keepalive_payload_random | `bool` | Randomizes keepalive payload bytes instead of zero payload. |
| rpc_proxy_req_every | `u64` | `0` | `0` or `10..=300`. | Interval for service `RPC_PROXY_REQ` activity signals (`0` disables). | | rpc_proxy_req_every | `u64` | Interval for service `RPC_PROXY_REQ` activity signals (`0` disables). |
| me_writer_cmd_channel_capacity | `usize` | `4096` | Must be `> 0`. | Capacity of per-writer command channel. | | me_writer_cmd_channel_capacity | `usize` | Capacity of per-writer command channel. |
| me_route_channel_capacity | `usize` | `768` | Must be `> 0`. | Capacity of per-connection ME response route channel. | | me_route_channel_capacity | `usize` | Capacity of per-connection ME response route channel. |
| me_c2me_channel_capacity | `usize` | `1024` | Must be `> 0`. | Capacity of per-client command queue (client reader -> ME sender). | | me_c2me_channel_capacity | `usize` | Capacity of per-client command queue (client reader -> ME sender). |
| me_reader_route_data_wait_ms | `u64` | `2` | `0..=20`. | Bounded wait for routing ME DATA to per-connection queue (`0` = no wait). | | me_reader_route_data_wait_ms | `u64` | Bounded wait for routing ME DATA to per-connection queue (`0` = no wait). |
| me_d2c_flush_batch_max_frames | `usize` | `32` | `1..=512`. | Max ME->client frames coalesced before flush. | | me_d2c_flush_batch_max_frames | `usize` | Max ME->client frames coalesced before flush. |
| me_d2c_flush_batch_max_bytes | `usize` | `131072` | `4096..=2_097_152`. | Max ME->client payload bytes coalesced before flush. | | me_d2c_flush_batch_max_bytes | `usize` | Max ME->client payload bytes coalesced before flush. |
| me_d2c_flush_batch_max_delay_us | `u64` | `500` | `0..=5000`. | Max microsecond wait for coalescing more ME->client frames (`0` disables timed coalescing). | | me_d2c_flush_batch_max_delay_us | `u64` | Max microsecond wait for coalescing more ME->client frames (`0` disables timed coalescing). |
| me_d2c_ack_flush_immediate | `bool` | `true` | — | Flushes client writer immediately after quick-ack write. | | me_d2c_ack_flush_immediate | `bool` | Flushes client writer immediately after quick-ack write. |
| direct_relay_copy_buf_c2s_bytes | `usize` | `65536` | `4096..=1_048_576`. | Copy buffer size for client->DC direction in direct relay. | | direct_relay_copy_buf_c2s_bytes | `usize` | Copy buffer size for client->DC direction in direct relay. |
| direct_relay_copy_buf_s2c_bytes | `usize` | `262144` | `8192..=2_097_152`. | Copy buffer size for DC->client direction in direct relay. | | direct_relay_copy_buf_s2c_bytes | `usize` | Copy buffer size for DC->client direction in direct relay. |
| crypto_pending_buffer | `usize` | `262144` | — | Max pending ciphertext buffer per client writer (bytes). | | crypto_pending_buffer | `usize` | Max pending ciphertext buffer per client writer (bytes). |
| max_client_frame | `usize` | `16777216` | — | Maximum allowed client MTProto frame size (bytes). | | max_client_frame | `usize` | Maximum allowed client MTProto frame size (bytes). |
| desync_all_full | `bool` | `false` | — | Emits full crypto-desync forensic logs for every event. | | desync_all_full | `bool` | Emits full crypto-desync forensic logs for every event. |
| beobachten | `bool` | `true` | — | Enables per-IP forensic observation buckets. | | beobachten | `bool` | Enables per-IP forensic observation buckets. |
| beobachten_minutes | `u64` | `10` | Must be `> 0`. | Retention window (minutes) for per-IP observation buckets. | | beobachten_minutes | `u64` | Retention window (minutes) for per-IP observation buckets. |
| beobachten_flush_secs | `u64` | `15` | Must be `> 0`. | Snapshot flush interval (seconds) for observation output file. | | beobachten_flush_secs | `u64` | Snapshot flush interval (seconds) for observation output file. |
| beobachten_file | `String` | `"cache/beobachten.txt"` | — | Observation snapshot output file path. | | beobachten_file | `String` | Observation snapshot output file path. |
| hardswap | `bool` | `true` | none | Enables generation-based ME hardswap strategy. | | hardswap | `bool` | Enables hard-swap generation switching for ME pool updates. |
| me_warmup_stagger_enabled | `bool` | `true` | none | Staggers extra ME warmup dials to avoid connection spikes. | | me_warmup_stagger_enabled | `bool` | Enables staggered warmup for extra ME writers. |
| me_warmup_step_delay_ms | `u64` | `500` | none | Base delay in milliseconds between warmup dial steps. | | me_warmup_step_delay_ms | `u64` | Base delay between warmup connections (ms). |
| me_warmup_step_jitter_ms | `u64` | `300` | none | Additional random delay in milliseconds for warmup steps. | | me_warmup_step_jitter_ms | `u64` | Jitter for warmup delay (ms). |
| me_reconnect_max_concurrent_per_dc | `u32` | `8` | none | Limits concurrent reconnect workers per DC during health recovery. | | me_reconnect_max_concurrent_per_dc | `u32` | Max concurrent reconnect attempts per DC. |
| me_reconnect_backoff_base_ms | `u64` | `500` | none | Initial reconnect backoff in milliseconds. | | me_reconnect_backoff_base_ms | `u64` | Base reconnect backoff in ms. |
| me_reconnect_backoff_cap_ms | `u64` | `30000` | none | Maximum reconnect backoff cap in milliseconds. | | me_reconnect_backoff_cap_ms | `u64` | Cap reconnect backoff in ms. |
| me_reconnect_fast_retry_count | `u32` | `16` | none | Immediate retry budget before long backoff behavior applies. | | me_reconnect_fast_retry_count | `u32` | Number of fast retry attempts before backoff. |
| me_single_endpoint_shadow_writers | `u8` | `2` | `0..=32`. | Additional reserve writers for one-endpoint DC groups. | | me_single_endpoint_shadow_writers | `u8` | Additional reserve writers for one-endpoint DC groups. |
| me_single_endpoint_outage_mode_enabled | `bool` | `true` | — | Enables aggressive outage recovery for one-endpoint DC groups. | | me_single_endpoint_outage_mode_enabled | `bool` | Enables aggressive outage recovery for one-endpoint DC groups. |
| me_single_endpoint_outage_disable_quarantine | `bool` | `true` | — | Ignores endpoint quarantine in one-endpoint outage mode. | | me_single_endpoint_outage_disable_quarantine | `bool` | Ignores endpoint quarantine in one-endpoint outage mode. |
| me_single_endpoint_outage_backoff_min_ms | `u64` | `250` | Must be `> 0`; also `<= me_single_endpoint_outage_backoff_max_ms`. | Minimum reconnect backoff in outage mode (ms). | | me_single_endpoint_outage_backoff_min_ms | `u64` | Minimum reconnect backoff in outage mode (ms). |
| me_single_endpoint_outage_backoff_max_ms | `u64` | `3000` | Must be `> 0`; also `>= me_single_endpoint_outage_backoff_min_ms`. | Maximum reconnect backoff in outage mode (ms). | | me_single_endpoint_outage_backoff_max_ms | `u64` | Maximum reconnect backoff in outage mode (ms). |
| me_single_endpoint_shadow_rotate_every_secs | `u64` | `900` | — | Periodic shadow writer rotation interval (`0` disables). | | me_single_endpoint_shadow_rotate_every_secs | `u64` | Periodic shadow writer rotation interval (`0` disables). |
| me_floor_mode | `"static" \| "adaptive"` | `"adaptive"` | — | Writer floor policy mode. | | me_floor_mode | `"static" \| "adaptive"` | Writer floor policy mode. |
| me_adaptive_floor_idle_secs | `u64` | `90` | — | Idle time before adaptive floor may reduce one-endpoint target. | | me_adaptive_floor_idle_secs | `u64` | Idle time before adaptive floor may reduce one-endpoint target. |
| me_adaptive_floor_min_writers_single_endpoint | `u8` | `1` | `1..=32`. | Minimum adaptive writer target for one-endpoint DC groups. | | me_adaptive_floor_min_writers_single_endpoint | `u8` | Minimum adaptive writer target for one-endpoint DC groups. |
| me_adaptive_floor_min_writers_multi_endpoint | `u8` | `1` | `1..=32`. | Minimum adaptive writer target for multi-endpoint DC groups. | | me_adaptive_floor_min_writers_multi_endpoint | `u8` | Minimum adaptive writer target for multi-endpoint DC groups. |
| me_adaptive_floor_recover_grace_secs | `u64` | `180` | — | Grace period to hold static floor after activity. | | me_adaptive_floor_recover_grace_secs | `u64` | Grace period to hold static floor after activity. |
| me_adaptive_floor_writers_per_core_total | `u16` | `48` | Must be `> 0`. | Global writer budget per logical CPU core in adaptive mode. | | me_adaptive_floor_writers_per_core_total | `u16` | Global writer budget per logical CPU core in adaptive mode. |
| me_adaptive_floor_cpu_cores_override | `u16` | `0` | — | Manual CPU core count override (`0` uses auto-detection). | | me_adaptive_floor_cpu_cores_override | `u16` | Manual CPU core count override (`0` uses auto-detection). |
| me_adaptive_floor_max_extra_writers_single_per_core | `u16` | `1` | — | Per-core max extra writers above base floor for one-endpoint DCs. | | me_adaptive_floor_max_extra_writers_single_per_core | `u16` | Per-core max extra writers above base floor for one-endpoint DCs. |
| me_adaptive_floor_max_extra_writers_multi_per_core | `u16` | `2` | — | Per-core max extra writers above base floor for multi-endpoint DCs. | | me_adaptive_floor_max_extra_writers_multi_per_core | `u16` | Per-core max extra writers above base floor for multi-endpoint DCs. |
| me_adaptive_floor_max_active_writers_per_core | `u16` | `64` | Must be `> 0`. | Hard cap for active ME writers per logical CPU core. | | me_adaptive_floor_max_active_writers_per_core | `u16` | Hard cap for active ME writers per logical CPU core. |
| me_adaptive_floor_max_warm_writers_per_core | `u16` | `64` | Must be `> 0`. | Hard cap for warm ME writers per logical CPU core. | | me_adaptive_floor_max_warm_writers_per_core | `u16` | Hard cap for warm ME writers per logical CPU core. |
| me_adaptive_floor_max_active_writers_global | `u32` | `256` | Must be `> 0`. | Hard global cap for active ME writers. | | me_adaptive_floor_max_active_writers_global | `u32` | Hard global cap for active ME writers. |
| me_adaptive_floor_max_warm_writers_global | `u32` | `256` | Must be `> 0`. | Hard global cap for warm ME writers. | | me_adaptive_floor_max_warm_writers_global | `u32` | Hard global cap for warm ME writers. |
| upstream_connect_retry_attempts | `u32` | `2` | Must be `> 0`. | Connect attempts for selected upstream before error/fallback. | | upstream_connect_retry_attempts | `u32` | Connect attempts for selected upstream before error/fallback. |
| upstream_connect_retry_backoff_ms | `u64` | `100` | — | Delay between upstream connect attempts (ms). | | upstream_connect_retry_backoff_ms | `u64` | Delay between upstream connect attempts (ms). |
| upstream_connect_budget_ms | `u64` | `3000` | Must be `> 0`. | Total wall-clock budget for one upstream connect request (ms). | | upstream_connect_budget_ms | `u64` | Total wall-clock budget for one upstream connect request (ms). |
| upstream_unhealthy_fail_threshold | `u32` | `5` | Must be `> 0`. | Consecutive failed requests before upstream is marked unhealthy. | | upstream_unhealthy_fail_threshold | `u32` | Consecutive failed requests before upstream is marked unhealthy. |
| upstream_connect_failfast_hard_errors | `bool` | `false` | — | Skips additional retries for hard non-transient connect errors. | | upstream_connect_failfast_hard_errors | `bool` | Skips additional retries for hard non-transient connect errors. |
| stun_iface_mismatch_ignore | `bool` | `false` | none | Reserved compatibility flag in current runtime revision. | | stun_iface_mismatch_ignore | `bool` | Ignores STUN/interface mismatch and keeps Middle Proxy mode. |
| unknown_dc_log_path | `String \| null` | `"unknown-dc.txt"` | — | File path for unknown-DC request logging (`null` disables file path). | | unknown_dc_log_path | `String` | File path for unknown-DC request logging (`null` disables file path). |
| unknown_dc_file_log_enabled | `bool` | `false` | — | Enables unknown-DC file logging. | | unknown_dc_file_log_enabled | `bool` | Enables unknown-DC file logging. |
| log_level | `"debug" \| "verbose" \| "normal" \| "silent"` | `"normal"` | — | Runtime logging verbosity. | | log_level | `"debug" \| "verbose" \| "normal" \| "silent"` | Runtime logging verbosity. |
| disable_colors | `bool` | `false` | — | Disables ANSI colors in logs. | | disable_colors | `bool` | Disables ANSI colors in logs. |
| me_socks_kdf_policy | `"strict" \| "compat"` | `"strict"` | — | SOCKS-bound KDF fallback policy for ME handshake. | | me_socks_kdf_policy | `"strict" \| "compat"` | SOCKS-bound KDF fallback policy for ME handshake. |
| me_route_backpressure_base_timeout_ms | `u64` | `25` | Must be `> 0`. | Base backpressure timeout for route-channel send (ms). | | me_route_backpressure_base_timeout_ms | `u64` | Base backpressure timeout for route-channel send (ms). |
| me_route_backpressure_high_timeout_ms | `u64` | `120` | Must be `>= me_route_backpressure_base_timeout_ms`. | High backpressure timeout when queue occupancy exceeds watermark (ms). | | me_route_backpressure_high_timeout_ms | `u64` | High backpressure timeout when queue occupancy exceeds watermark (ms). |
| me_route_backpressure_high_watermark_pct | `u8` | `80` | `1..=100`. | Queue occupancy threshold (%) for high timeout mode. | | me_route_backpressure_high_watermark_pct | `u8` | Queue occupancy threshold (%) for high timeout mode. |
| me_health_interval_ms_unhealthy | `u64` | `1000` | Must be `> 0`. | Health monitor interval while writer coverage is degraded (ms). | | me_health_interval_ms_unhealthy | `u64` | Health monitor interval while writer coverage is degraded (ms). |
| me_health_interval_ms_healthy | `u64` | `3000` | Must be `> 0`. | Health monitor interval while writer coverage is healthy (ms). | | me_health_interval_ms_healthy | `u64` | Health monitor interval while writer coverage is healthy (ms). |
| me_admission_poll_ms | `u64` | `1000` | Must be `> 0`. | Poll interval for conditional-admission checks (ms). | | me_admission_poll_ms | `u64` | Poll interval for conditional-admission checks (ms). |
| me_warn_rate_limit_ms | `u64` | `5000` | Must be `> 0`. | Cooldown for repetitive ME warning logs (ms). | | me_warn_rate_limit_ms | `u64` | Cooldown for repetitive ME warning logs (ms). |
| me_route_no_writer_mode | `"async_recovery_failfast" \| "inline_recovery_legacy" \| "hybrid_async_persistent"` | `"hybrid_async_persistent"` | — | Route behavior when no writer is immediately available. | | me_route_no_writer_mode | `"async_recovery_failfast" \| "inline_recovery_legacy" \| "hybrid_async_persistent"` | Route behavior when no writer is immediately available. |
| me_route_no_writer_wait_ms | `u64` | `250` | `10..=5000`. | Max wait in async-recovery failfast mode (ms). | | me_route_no_writer_wait_ms | `u64` | Max wait in async-recovery failfast mode (ms). |
| me_route_inline_recovery_attempts | `u32` | `3` | Must be `> 0`. | Inline recovery attempts in legacy mode. | | me_route_inline_recovery_attempts | `u32` | Inline recovery attempts in legacy mode. |
| me_route_inline_recovery_wait_ms | `u64` | `3000` | `10..=30000`. | Max inline recovery wait in legacy mode (ms). | | me_route_inline_recovery_wait_ms | `u64` | Max inline recovery wait in legacy mode (ms). |
| fast_mode_min_tls_record | `usize` | `0` | — | Minimum TLS record size when fast-mode coalescing is enabled (`0` disables). | | fast_mode_min_tls_record | `usize` | Minimum TLS record size when fast-mode coalescing is enabled (`0` disables). |
| update_every | `u64 \| null` | `300` | If set: must be `> 0`; if `null`: legacy fallback path is used. | Unified refresh interval for ME config and proxy-secret updater tasks. | | update_every | `u64` | Unified interval for config/secret updater tasks. |
| me_reinit_every_secs | `u64` | `900` | Must be `> 0`. | Periodic interval for zero-downtime ME reinit cycle. | | me_reinit_every_secs | `u64` | Periodic ME pool reinitialization interval (seconds). |
| me_hardswap_warmup_delay_min_ms | `u64` | `1000` | Must be `<= me_hardswap_warmup_delay_max_ms`. | Lower bound for hardswap warmup dial spacing. | | me_hardswap_warmup_delay_min_ms | `u64` | Minimum delay between hardswap warmup connects (ms). |
| me_hardswap_warmup_delay_max_ms | `u64` | `2000` | Must be `> 0`. | Upper bound for hardswap warmup dial spacing. | | me_hardswap_warmup_delay_max_ms | `u64` | Maximum delay between hardswap warmup connects (ms). |
| me_hardswap_warmup_extra_passes | `u8` | `3` | Must be within `[0, 10]`. | Additional warmup passes after the base pass in one hardswap cycle. | | me_hardswap_warmup_extra_passes | `u8` | Additional warmup passes per hardswap cycle. |
| me_hardswap_warmup_pass_backoff_base_ms | `u64` | `500` | Must be `> 0`. | Base backoff between extra hardswap warmup passes. | | me_hardswap_warmup_pass_backoff_base_ms | `u64` | Base backoff between hardswap warmup passes (ms). |
| me_config_stable_snapshots | `u8` | `2` | Must be `> 0`. | Number of identical ME config snapshots required before apply. | | me_config_stable_snapshots | `u8` | Number of identical config snapshots required before apply. |
| me_config_apply_cooldown_secs | `u64` | `300` | none | Cooldown between applied ME endpoint-map updates. | | me_config_apply_cooldown_secs | `u64` | Cooldown between applied ME map updates (seconds). |
| me_snapshot_require_http_2xx | `bool` | `true` | — | Requires 2xx HTTP responses for applying config snapshots. | | me_snapshot_require_http_2xx | `bool` | Requires 2xx HTTP responses for applying config snapshots. |
| me_snapshot_reject_empty_map | `bool` | `true` | — | Rejects empty config snapshots. | | me_snapshot_reject_empty_map | `bool` | Rejects empty config snapshots. |
| me_snapshot_min_proxy_for_lines | `u32` | `1` | Must be `> 0`. | Minimum parsed `proxy_for` rows required to accept snapshot. | | me_snapshot_min_proxy_for_lines | `u32` | Minimum parsed `proxy_for` rows required to accept snapshot. |
| proxy_secret_stable_snapshots | `u8` | `2` | Must be `> 0`. | Number of identical proxy-secret snapshots required before rotation. | | proxy_secret_stable_snapshots | `u8` | Number of identical secret snapshots required before runtime rotation. |
| proxy_secret_rotate_runtime | `bool` | `true` | none | Enables runtime proxy-secret rotation from updater snapshots. | | proxy_secret_rotate_runtime | `bool` | Enables runtime proxy-secret rotation from remote source. |
| me_secret_atomic_snapshot | `bool` | `true` | — | Keeps selector and secret bytes from the same snapshot atomically. | | me_secret_atomic_snapshot | `bool` | Keeps selector and secret bytes from the same snapshot atomically. |
| proxy_secret_len_max | `usize` | `256` | Must be within `[32, 4096]`. | Upper length limit for accepted proxy-secret bytes. | | proxy_secret_len_max | `usize` | Maximum allowed proxy-secret length (bytes). |
| me_pool_drain_ttl_secs | `u64` | `90` | none | Time window where stale writers remain fallback-eligible after map change. | | me_pool_drain_ttl_secs | `u64` | Drain TTL for stale ME writers after endpoint-map changes (seconds). |
| me_pool_drain_threshold | `u64` | `128` | — | Max draining stale writers before batch force-close (`0` disables threshold cleanup). | | me_pool_drain_threshold | `u64` | Max draining stale writers before batch force-close (`0` disables threshold cleanup). |
| me_pool_drain_soft_evict_enabled | `bool` | `true` | — | Enables gradual soft-eviction of stale writers during drain/reinit instead of immediate hard close. | | me_bind_stale_mode | `"never" \| "ttl" \| "always"` | Policy for new binds on stale draining writers. |
| me_pool_drain_soft_evict_grace_secs | `u64` | `30` | `0..=3600`. | Grace period before stale writers become soft-evict candidates. | | me_bind_stale_ttl_secs | `u64` | TTL for stale bind allowance when stale mode is `ttl`. |
| me_pool_drain_soft_evict_per_writer | `u8` | `1` | `1..=16`. | Maximum stale routes soft-evicted per writer in one eviction pass. | | me_pool_min_fresh_ratio | `f32` | Minimum desired-DC fresh coverage ratio before draining stale writers. |
| me_pool_drain_soft_evict_budget_per_core | `u16` | `8` | `1..=64`. | Per-core budget limiting aggregate soft-eviction work per pass. | | me_reinit_drain_timeout_secs | `u64` | Force-close timeout for stale writers after endpoint-map changes (`0` disables force-close). |
| me_pool_drain_soft_evict_cooldown_ms | `u64` | `5000` | Must be `> 0`. | Cooldown between consecutive soft-eviction passes (ms). | | proxy_secret_auto_reload_secs | `u64` | Deprecated legacy secret reload interval (fallback when `update_every` is not set). |
| me_bind_stale_mode | `"never" \| "ttl" \| "always"` | `"ttl"` | — | Policy for new binds on stale draining writers. | | proxy_config_auto_reload_secs | `u64` | Deprecated legacy config reload interval (fallback when `update_every` is not set). |
| me_bind_stale_ttl_secs | `u64` | `90` | — | TTL for stale bind allowance when stale mode is `ttl`. | | me_reinit_singleflight | `bool` | Serializes ME reinit cycles across trigger sources. |
| me_pool_min_fresh_ratio | `f32` | `0.8` | Must be within `[0.0, 1.0]`. | Minimum fresh desired-DC coverage ratio before stale writers are drained. | | me_reinit_trigger_channel | `usize` | Trigger queue capacity for reinit scheduler. |
| me_reinit_drain_timeout_secs | `u64` | `120` | `0` disables force-close; if `> 0` and `< me_pool_drain_ttl_secs`, runtime bumps it to TTL. | Force-close timeout for draining stale writers (`0` keeps indefinite draining). | | me_reinit_coalesce_window_ms | `u64` | Trigger coalescing window before starting reinit (ms). |
| proxy_secret_auto_reload_secs | `u64` | `3600` | Deprecated. Use `general.update_every`. | Deprecated legacy secret reload interval (fallback when `update_every` is not set). | | me_deterministic_writer_sort | `bool` | Enables deterministic candidate sort for writer binding path. |
| proxy_config_auto_reload_secs | `u64` | `3600` | Deprecated. Use `general.update_every`. | Deprecated legacy config reload interval (fallback when `update_every` is not set). | | me_writer_pick_mode | `"sorted_rr" \| "p2c"` | Writer selection mode for route bind path. |
| me_reinit_singleflight | `bool` | `true` | — | Serializes ME reinit cycles across trigger sources. | | me_writer_pick_sample_size | `u8` | Number of candidates sampled by picker in `p2c` mode. |
| me_reinit_trigger_channel | `usize` | `64` | Must be `> 0`. | Trigger queue capacity for reinit scheduler. | | ntp_check | `bool` | Enables NTP drift check at startup. |
| me_reinit_coalesce_window_ms | `u64` | `200` | — | Trigger coalescing window before starting reinit (ms). | | ntp_servers | `String[]` | NTP servers used for drift check. |
| me_deterministic_writer_sort | `bool` | `true` | — | Enables deterministic candidate sort for writer binding path. | | auto_degradation_enabled | `bool` | Enables automatic degradation from ME to direct DC. |
| me_writer_pick_mode | `"sorted_rr" \| "p2c"` | `"p2c"` | — | Writer selection mode for route bind path. | | degradation_min_unavailable_dc_groups | `u8` | Minimum unavailable ME DC groups required before degrading. |
| me_writer_pick_sample_size | `u8` | `3` | `2..=4`. | Number of candidates sampled by picker in `p2c` mode. |
| ntp_check | `bool` | `true` | — | Enables NTP drift check at startup. |
| ntp_servers | `String[]` | `["pool.ntp.org"]` | — | NTP servers used for drift check. |
| auto_degradation_enabled | `bool` | `true` | none | Reserved compatibility flag in current runtime revision. |
| degradation_min_unavailable_dc_groups | `u8` | `2` | none | Reserved compatibility threshold in current runtime revision. |
## [general.modes] ## [general.modes]
| Parameter | Type | Default | Constraints / validation | Description | | Parameter | Type | Description |
|---|---|---|---|---| |---|---|---|
| classic | `bool` | `false` | — | Enables classic MTProxy mode. | | classic | `bool` | Enables classic MTProxy mode. |
| secure | `bool` | `false` | — | Enables secure mode. | | secure | `bool` | Enables secure mode. |
| tls | `bool` | `true` | — | Enables TLS mode. | | tls | `bool` | Enables TLS mode. |
## [general.links] ## [general.links]
| Parameter | Type | Default | Constraints / validation | Description | | Parameter | Type | Description |
|---|---|---|---|---| |---|---|---|
| show | `"*" \| String[]` | `"*"` | — | Selects users whose tg:// links are shown at startup. | | show | `"*" \| String[]` | Selects users whose tg:// links are shown at startup. |
| public_host | `String \| null` | `null` | — | Public hostname/IP override for generated tg:// links. | | public_host | `String` | Public hostname/IP override for generated tg:// links. |
| public_port | `u16 \| null` | `null` | — | Public port override for generated tg:// links. | | public_port | `u16` | Public port override for generated tg:// links. |
## [general.telemetry] ## [general.telemetry]
| Parameter | Type | Default | Constraints / validation | Description | | Parameter | Type | Description |
|---|---|---|---|---| |---|---|---|
| core_enabled | `bool` | `true` | — | Enables core hot-path telemetry counters. | | core_enabled | `bool` | Enables core hot-path telemetry counters. |
| user_enabled | `bool` | `true` | — | Enables per-user telemetry counters. | | user_enabled | `bool` | Enables per-user telemetry counters. |
| me_level | `"silent" \| "normal" \| "debug"` | `"normal"` | — | Middle-End telemetry verbosity level. | | me_level | `"silent" \| "normal" \| "debug"` | Middle-End telemetry verbosity level. |
## [network] ## [network]
| Parameter | Type | Default | Constraints / validation | Description | | Parameter | Type | Description |
|---|---|---|---|---| |---|---|---|
| ipv4 | `bool` | `true` | — | Enables IPv4 networking. | | ipv4 | `bool` | Enables IPv4 networking. |
| ipv6 | `bool` | `false` | — | Enables/disables IPv6 when set | | ipv6 | `bool` | Enables/disables IPv6 (`null` = auto-detect availability). |
| prefer | `u8` | `4` | Must be `4` or `6`. | Preferred IP family for selection (`4` or `6`). | | prefer | `u8` | Preferred IP family for selection (`4` or `6`). |
| multipath | `bool` | `false` | — | Enables multipath behavior where supported. | | multipath | `bool` | Enables multipath behavior where supported. |
| stun_use | `bool` | `true` | none | Global STUN switch; when `false`, STUN probing path is disabled. | | stun_use | `bool` | Global switch for STUN probing. |
| stun_servers | `String[]` | Built-in STUN list (13 hosts) | Deduplicated; empty values are removed. | Primary STUN server list for NAT/public endpoint discovery. | | stun_servers | `String[]` | STUN server list for public IP detection. |
| stun_tcp_fallback | `bool` | `true` | none | Enables TCP fallback for STUN when UDP path is blocked. | | stun_tcp_fallback | `bool` | Enables TCP STUN fallback when UDP STUN is blocked. |
| http_ip_detect_urls | `String[]` | `["https://ifconfig.me/ip", "https://api.ipify.org"]` | none | HTTP fallback endpoints for public IP detection when STUN is unavailable. | | http_ip_detect_urls | `String[]` | HTTP endpoints used as fallback public IP detectors. |
| cache_public_ip_path | `String` | `"cache/public_ip.txt"` | — | File path for caching detected public IP. | | cache_public_ip_path | `String` | File path for caching detected public IP. |
| dns_overrides | `String[]` | `[]` | Must match `host:port:ip`; IPv6 must be bracketed. | Runtime DNS overrides in `host:port:ip` format. | | dns_overrides | `String[]` | Runtime DNS overrides in `host:port:ip` format. |
## [server] ## [server]
| Parameter | Type | Default | Constraints / validation | Description | | Parameter | Type | Description |
|---|---|---|---|---| |---|---|---|
| port | `u16` | `443` | — | Main proxy listen port. | | port | `u16` | Main proxy listen port. |
| listen_addr_ipv4 | `String \| null` | `"0.0.0.0"` | — | IPv4 bind address for TCP listener. | | listen_addr_ipv4 | `String` | IPv4 bind address for TCP listener. |
| listen_addr_ipv6 | `String \| null` | `"::"` | — | IPv6 bind address for TCP listener. | | listen_addr_ipv6 | `String` | IPv6 bind address for TCP listener. |
| listen_unix_sock | `String \| null` | `null` | — | Unix socket path for listener. | | listen_unix_sock | `String` | Unix socket path for listener. |
| listen_unix_sock_perm | `String \| null` | `null` | — | Unix socket permissions in octal string (e.g., `"0666"`). | | listen_unix_sock_perm | `String` | Unix socket permissions in octal string (e.g., `"0666"`). |
| listen_tcp | `bool \| null` | `null` (auto) | — | Explicit TCP listener enable/disable override. | | listen_tcp | `bool` | Explicit TCP listener enable/disable override. |
| proxy_protocol | `bool` | `false` | — | Enables HAProxy PROXY protocol parsing on incoming client connections. | | proxy_protocol | `bool` | Enables HAProxy PROXY protocol parsing on incoming client connections. |
| proxy_protocol_header_timeout_ms | `u64` | `500` | Must be `> 0`. | Timeout for PROXY protocol header read/parse (ms). | | proxy_protocol_header_timeout_ms | `u64` | Timeout for PROXY protocol header read/parse (ms). |
| metrics_port | `u16 \| null` | `null` | — | Metrics endpoint port (enables metrics listener). | | metrics_port | `u16` | Metrics endpoint port (enables metrics listener). |
| metrics_listen | `String \| null` | `null` | — | Full metrics bind address (`IP:PORT`), overrides `metrics_port`. | | metrics_listen | `String` | Full metrics bind address (`IP:PORT`), overrides `metrics_port`. |
| metrics_whitelist | `IpNetwork[]` | `["127.0.0.1/32", "::1/128"]` | — | CIDR whitelist for metrics endpoint access. | | metrics_whitelist | `IpNetwork[]` | CIDR whitelist for metrics endpoint access. |
| max_connections | `u32` | `10000` | — | Max concurrent client connections (`0` = unlimited). | | max_connections | `u32` | Max concurrent client connections (`0` = unlimited). |
## [server.api] ## [server.api]
| Parameter | Type | Default | Constraints / validation | Description | | Parameter | Type | Description |
|---|---|---|---|---| |---|---|---|
| enabled | `bool` | `true` | — | Enables control-plane REST API. | | enabled | `bool` | Enables control-plane REST API. |
| listen | `String` | `"0.0.0.0:9091"` | Must be valid `IP:PORT`. | API bind address in `IP:PORT` format. | | listen | `String` | API bind address in `IP:PORT` format. |
| whitelist | `IpNetwork[]` | `["127.0.0.0/8"]` | — | CIDR whitelist allowed to access API. | | whitelist | `IpNetwork[]` | CIDR whitelist allowed to access API. |
| auth_header | `String` | `""` | — | Exact expected `Authorization` header value (empty = disabled). | | auth_header | `String` | Exact expected `Authorization` header value (empty = disabled). |
| request_body_limit_bytes | `usize` | `65536` | Must be `> 0`. | Maximum accepted HTTP request body size. | | request_body_limit_bytes | `usize` | Maximum accepted HTTP request body size. |
| minimal_runtime_enabled | `bool` | `true` | — | Enables minimal runtime snapshots endpoint logic. | | minimal_runtime_enabled | `bool` | Enables minimal runtime snapshots endpoint logic. |
| minimal_runtime_cache_ttl_ms | `u64` | `1000` | `0..=60000`. | Cache TTL for minimal runtime snapshots (ms; `0` disables cache). | | minimal_runtime_cache_ttl_ms | `u64` | Cache TTL for minimal runtime snapshots (ms; `0` disables cache). |
| runtime_edge_enabled | `bool` | `false` | — | Enables runtime edge endpoints. | | runtime_edge_enabled | `bool` | Enables runtime edge endpoints. |
| runtime_edge_cache_ttl_ms | `u64` | `1000` | `0..=60000`. | Cache TTL for runtime edge aggregation payloads (ms). | | runtime_edge_cache_ttl_ms | `u64` | Cache TTL for runtime edge aggregation payloads (ms). |
| runtime_edge_top_n | `usize` | `10` | `1..=1000`. | Top-N size for edge connection leaderboard. | | runtime_edge_top_n | `usize` | Top-N size for edge connection leaderboard. |
| runtime_edge_events_capacity | `usize` | `256` | `16..=4096`. | Ring-buffer capacity for runtime edge events. | | runtime_edge_events_capacity | `usize` | Ring-buffer capacity for runtime edge events. |
| read_only | `bool` | `false` | — | Rejects mutating API endpoints when enabled. | | read_only | `bool` | Rejects mutating API endpoints when enabled. |
## [[server.listeners]] ## [[server.listeners]]
| Parameter | Type | Default | Constraints / validation | Description | | Parameter | Type | Description |
|---|---|---|---|---| |---|---|---|
| ip | `IpAddr` | — | — | Listener bind IP. | | ip | `IpAddr` | Listener bind IP. |
| announce | `String \| null` | — | — | Public IP/domain announced in proxy links (priority over `announce_ip`). | | announce | `String` | Public IP/domain announced in proxy links (priority over `announce_ip`). |
| announce_ip | `IpAddr \| null` | — | — | Deprecated legacy announce IP (migrated to `announce` if needed). | | announce_ip | `IpAddr` | Deprecated legacy announce IP (migrated to `announce` if needed). |
| proxy_protocol | `bool \| null` | `null` | — | Per-listener override for PROXY protocol enable flag. | | proxy_protocol | `bool` | Per-listener override for PROXY protocol enable flag. |
| reuse_allow | `bool` | `false` | — | Enables `SO_REUSEPORT` for multi-instance bind sharing. | | reuse_allow | `bool` | Enables `SO_REUSEPORT` for multi-instance bind sharing. |
## [timeouts] ## [timeouts]
| Parameter | Type | Default | Constraints / validation | Description | | Parameter | Type | Description |
|---|---|---|---|---| |---|---|---|
| client_handshake | `u64` | `30` | — | Client handshake timeout. | | client_handshake | `u64` | Client handshake timeout. |
| tg_connect | `u64` | `10` | — | Upstream Telegram connect timeout. | | tg_connect | `u64` | Upstream Telegram connect timeout. |
| client_keepalive | `u64` | `15` | — | Client keepalive timeout. | | client_keepalive | `u64` | Client keepalive timeout. |
| client_ack | `u64` | `90` | — | Client ACK timeout. | | client_ack | `u64` | Client ACK timeout. |
| me_one_retry | `u8` | `12` | none | Fast reconnect attempts budget for single-endpoint DC scenarios. | | me_one_retry | `u8` | Quick ME reconnect attempts for single-address DC. |
| me_one_timeout_ms | `u64` | `1200` | none | Timeout in milliseconds for each quick single-endpoint reconnect attempt. | | me_one_timeout_ms | `u64` | Timeout per quick attempt for single-address DC (ms). |
## [censorship] ## [censorship]
| Parameter | Type | Default | Constraints / validation | Description | | Parameter | Type | Description |
|---|---|---|---|---| |---|---|---|
| tls_domain | `String` | `"petrovich.ru"` | — | Primary TLS domain used in fake TLS handshake profile. | | tls_domain | `String` | Primary TLS domain used in fake TLS handshake profile. |
| tls_domains | `String[]` | `[]` | — | Additional TLS domains for generating multiple links. | | tls_domains | `String[]` | Additional TLS domains for generating multiple links. |
| mask | `bool` | `true` | — | Enables masking/fronting relay mode. | | mask | `bool` | Enables masking/fronting relay mode. |
| mask_host | `String \| null` | `null` | — | Upstream mask host for TLS fronting relay. | | mask_host | `String` | Upstream mask host for TLS fronting relay. |
| mask_port | `u16` | `443` | — | Upstream mask port for TLS fronting relay. | | mask_port | `u16` | Upstream mask port for TLS fronting relay. |
| mask_unix_sock | `String \| null` | `null` | — | Unix socket path for mask backend instead of TCP host/port. | | mask_unix_sock | `String` | Unix socket path for mask backend instead of TCP host/port. |
| fake_cert_len | `usize` | `2048` | — | Length of synthetic certificate payload when emulation data is unavailable. | | fake_cert_len | `usize` | Length of synthetic certificate payload when emulation data is unavailable. |
| tls_emulation | `bool` | `true` | — | Enables certificate/TLS behavior emulation from cached real fronts. | | tls_emulation | `bool` | Enables certificate/TLS behavior emulation from cached real fronts. |
| tls_front_dir | `String` | `"tlsfront"` | — | Directory path for TLS front cache storage. | | tls_front_dir | `String` | Directory path for TLS front cache storage. |
| server_hello_delay_min_ms | `u64` | `0` | — | Minimum server_hello delay for anti-fingerprint behavior (ms). | | server_hello_delay_min_ms | `u64` | Minimum server_hello delay for anti-fingerprint behavior (ms). |
| server_hello_delay_max_ms | `u64` | `0` | — | Maximum server_hello delay for anti-fingerprint behavior (ms). | | server_hello_delay_max_ms | `u64` | Maximum server_hello delay for anti-fingerprint behavior (ms). |
| tls_new_session_tickets | `u8` | `0` | — | Number of `NewSessionTicket` messages to emit after handshake. | | tls_new_session_tickets | `u8` | Number of `NewSessionTicket` messages to emit after handshake. |
| tls_full_cert_ttl_secs | `u64` | `90` | — | TTL for sending full cert payload per (domain, client IP) tuple. | | tls_full_cert_ttl_secs | `u64` | TTL for sending full cert payload per (domain, client IP) tuple. |
| alpn_enforce | `bool` | `true` | — | Enforces ALPN echo behavior based on client preference. | | alpn_enforce | `bool` | Enforces ALPN echo behavior based on client preference. |
| mask_proxy_protocol | `u8` | `0` | — | PROXY protocol mode for mask backend (`0` disabled, `1` v1, `2` v2). | | mask_proxy_protocol | `u8` | PROXY protocol mode for mask backend (`0` disabled, `1` v1, `2` v2). |
## [access] ## [access]
| Parameter | Type | Default | Constraints / validation | TOML shape example | Description | | Parameter | Type | Description |
|---|---|---|---|---|---| |---|---|---|
| users | `Map<String, String>` | `{"default": "000…000"}` | Secret must be 32 hex characters. | `[access.users]`<br>`user = "32-hex secret"`<br>`user2 = "32-hex secret"` | User credentials map used for client authentication. | | users | `Map<String, String>` | Username -> 32-hex secret mapping. |
| user_ad_tags | `Map<String, String>` | `{}` | Every value must be exactly 32 hex characters. | `[access.user_ad_tags]`<br>`user = "32-hex ad_tag"` | Per-user ad tags used as override over `general.ad_tag`. | | user_ad_tags | `Map<String, String>` | Per-user ad tags (32 hex chars). |
| user_max_tcp_conns | `Map<String, usize>` | `{}` | — | `[access.user_max_tcp_conns]`<br>`user = 500` | Per-user maximum concurrent TCP connections. | | user_max_tcp_conns | `Map<String, usize>` | Per-user maximum concurrent TCP connections. |
| user_expirations | `Map<String, DateTime<Utc>>` | `{}` | Timestamp must be valid RFC3339/ISO-8601 datetime. | `[access.user_expirations]`<br>`user = "2026-12-31T23:59:59Z"` | Per-user account expiration timestamps. | | user_expirations | `Map<String, DateTime<Utc>>` | Per-user account expiration timestamps. |
| user_data_quota | `Map<String, u64>` | `{}` | — | `[access.user_data_quota]`<br>`user = 1073741824` | Per-user traffic quota in bytes. | | user_data_quota | `Map<String, u64>` | Per-user data quota limits. |
| user_max_unique_ips | `Map<String, usize>` | `{}` | — | `[access.user_max_unique_ips]`<br>`user = 16` | Per-user unique source IP limits. | | user_max_unique_ips | `Map<String, usize>` | Per-user unique source IP limits. |
| user_max_unique_ips_global_each | `usize` | `0` | — | `user_max_unique_ips_global_each = 0` | Global fallback used when `[access.user_max_unique_ips]` has no per-user override. | | user_max_unique_ips_global_each | `usize` | Global fallback per-user unique IP limit when no per-user override exists. |
| user_max_unique_ips_mode | `"active_window" \| "time_window" \| "combined"` | `"active_window"` | — | `user_max_unique_ips_mode = "active_window"` | Unique source IP limit accounting mode. | | user_max_unique_ips_mode | `"active_window" \| "time_window" \| "combined"` | Unique source IP limit accounting mode. |
| user_max_unique_ips_window_secs | `u64` | `30` | Must be `> 0`. | `user_max_unique_ips_window_secs = 30` | Window size (seconds) used by unique-IP accounting modes that use time windows. | | user_max_unique_ips_window_secs | `u64` | Recent-window size for unique IP accounting (seconds). |
| replay_check_len | `usize` | `65536` | — | `replay_check_len = 65536` | Replay-protection storage length. | | replay_check_len | `usize` | Replay check storage length. |
| replay_window_secs | `u64` | `1800` | — | `replay_window_secs = 1800` | Replay-protection window in seconds. | | replay_window_secs | `u64` | Replay protection time window in seconds. |
| ignore_time_skew | `bool` | `false` | — | `ignore_time_skew = false` | Disables client/server timestamp skew checks in replay validation when enabled. | | ignore_time_skew | `bool` | Ignores client/server timestamp skew in replay validation. |
## [[upstreams]] ## [[upstreams]]
| Parameter | Type | Default | Constraints / validation | Description | | Parameter | Type | Description |
|---|---|---|---|---| |---|---|---|
| type | `"direct" \| "socks4" \| "socks5"` | — | Required field. | Upstream transport type selector. | | type | `"direct" \| "socks4" \| "socks5"` | Upstream transport type selector. |
| weight | `u16` | `1` | none | Base weight used by weighted-random upstream selection. | | weight | `u16` | Weighted selection coefficient for this upstream. |
| enabled | `bool` | `true` | none | Disabled entries are excluded from upstream selection at runtime. | | enabled | `bool` | Enables/disables this upstream entry. |
| scopes | `String` | `""` | none | Comma-separated scope tags used for request-level upstream filtering. | | scopes | `String` | Comma-separated scope tags for routing. |
| interface | `String \| null` | `null` | Optional; type-specific runtime rules apply. | Optional outbound interface/local bind hint (supported with type-specific rules). | | interface | `String` | Optional outgoing interface name (`direct`, `socks4`, `socks5`). |
| bind_addresses | `String[] \| null` | `null` | Applies to `type = "direct"`. | Optional explicit local source bind addresses for `type = "direct"`. | | bind_addresses | `String[]` | Optional source bind addresses for `direct` upstream. |
| address | `String` | — | Required for `type = "socks4"` and `type = "socks5"`. | SOCKS server endpoint (`host:port` or `ip:port`) for SOCKS upstream types. | | address | `String` | Upstream proxy address (`host:port`) for SOCKS upstreams. |
| user_id | `String \| null` | `null` | Only for `type = "socks4"`. | SOCKS4 CONNECT user ID (`type = "socks4"` only). | | user_id | `String` | SOCKS4 user ID (only for `type = "socks4"`). |
| username | `String \| null` | `null` | Only for `type = "socks5"`. | SOCKS5 username (`type = "socks5"` only). | | username | `String` | SOCKS5 username (only for `type = "socks5"`). |
| password | `String \| null` | `null` | Only for `type = "socks5"`. | SOCKS5 password (`type = "socks5"` only). | | password | `String` | SOCKS5 password (only for `type = "socks5"`). |

View File

@ -1,525 +1,115 @@
#!/bin/sh #!/bin/sh
set -eu set -eu
# --- Global Configurations ---
REPO="${REPO:-telemt/telemt}" REPO="${REPO:-telemt/telemt}"
BIN_NAME="${BIN_NAME:-telemt}" BIN_NAME="${BIN_NAME:-telemt}"
INSTALL_DIR="${INSTALL_DIR:-/bin}" VERSION="${1:-${VERSION:-latest}}"
CONFIG_DIR="${CONFIG_DIR:-/etc/telemt}" INSTALL_DIR="${INSTALL_DIR:-/usr/local/bin}"
CONFIG_FILE="${CONFIG_FILE:-${CONFIG_DIR}/telemt.toml}"
WORK_DIR="${WORK_DIR:-/opt/telemt}"
SERVICE_NAME="telemt"
TEMP_DIR=""
SUDO=""
# --- Argument Parsing --- say() {
ACTION="install" printf '%s\n' "$*"
TARGET_VERSION="${VERSION:-latest}" }
while [ $# -gt 0 ]; do die() {
case "$1" in printf 'Error: %s\n' "$*" >&2
-h|--help)
ACTION="help"
shift
;;
uninstall|--uninstall)
[ "$ACTION" != "purge" ] && ACTION="uninstall"
shift
;;
--purge)
ACTION="purge"
shift
;;
install|--install)
ACTION="install"
shift
;;
-*)
printf '[ERROR] Unknown option: %s\n' "$1" >&2
exit 1 exit 1
;; }
*)
if [ "$ACTION" = "install" ]; then need_cmd() {
TARGET_VERSION="$1" command -v "$1" >/dev/null 2>&1 || die "required command not found: $1"
fi }
shift
;; detect_os() {
os="$(uname -s)"
case "$os" in
Linux) printf 'linux\n' ;;
OpenBSD) printf 'openbsd\n' ;;
*) printf '%s\n' "$os" ;;
esac esac
done
# --- Core Functions ---
say() { printf '[INFO] %s\n' "$*"; }
die() { printf '[ERROR] %s\n' "$*" >&2; exit 1; }
cleanup() {
if [ -n "${TEMP_DIR:-}" ] && [ -d "$TEMP_DIR" ]; then
rm -rf -- "$TEMP_DIR"
fi
}
trap cleanup EXIT INT TERM
show_help() {
say "Usage: $0 [version | install | uninstall | --purge | --help]"
say " version Install specific version (e.g. 1.0.0, default: latest)"
say " uninstall Remove the binary and service (keeps config)"
say " --purge Remove everything including configuration"
exit 0
}
user_exists() {
if command -v getent >/dev/null 2>&1; then
getent passwd "$1" >/dev/null 2>&1
else
grep -q "^${1}:" /etc/passwd 2>/dev/null
fi
}
group_exists() {
if command -v getent >/dev/null 2>&1; then
getent group "$1" >/dev/null 2>&1
else
grep -q "^${1}:" /etc/group 2>/dev/null
fi
}
verify_common() {
[ -z "$BIN_NAME" ] && die "BIN_NAME cannot be empty."
[ -z "$INSTALL_DIR" ] && die "INSTALL_DIR cannot be empty."
[ -z "$CONFIG_DIR" ] && die "CONFIG_DIR cannot be empty."
if [ "$(id -u)" -eq 0 ]; then
SUDO=""
else
if ! command -v sudo >/dev/null 2>&1; then
die "This script requires root or sudo. Neither found."
fi
SUDO="sudo"
say "sudo is available. Caching credentials..."
if ! sudo -v; then
die "Failed to cache sudo credentials"
fi
fi
case "${INSTALL_DIR}${CONFIG_DIR}${WORK_DIR}" in
*[!a-zA-Z0-9_./-]*)
die "Invalid characters in path variables. Only alphanumeric, _, ., -, and / are allowed."
;;
esac
case "$BIN_NAME" in
*[!a-zA-Z0-9_-]*) die "Invalid characters in BIN_NAME: $BIN_NAME" ;;
esac
for path in "$CONFIG_DIR" "$WORK_DIR"; do
check_path="$path"
while [ "$check_path" != "/" ] && [ "${check_path%"/"}" != "$check_path" ]; do
check_path="${check_path%"/"}"
done
[ -z "$check_path" ] && check_path="/"
case "$check_path" in
/|/bin|/sbin|/usr|/usr/bin|/usr/local|/etc|/opt|/var|/home|/root|/tmp)
die "Safety check failed: '$path' is a critical system directory."
;;
esac
done
for cmd in uname grep find rm chown chmod mv head mktemp; do
command -v "$cmd" >/dev/null 2>&1 || die "Required command not found: $cmd"
done
}
verify_install_deps() {
if ! command -v curl >/dev/null 2>&1 && ! command -v wget >/dev/null 2>&1; then
die "Neither curl nor wget is installed."
fi
command -v tar >/dev/null 2>&1 || die "Required command not found: tar"
command -v gzip >/dev/null 2>&1 || die "Required command not found: gzip"
command -v cp >/dev/null 2>&1 || command -v install >/dev/null 2>&1 || die "Need cp or install"
if ! command -v setcap >/dev/null 2>&1; then
say "setcap is missing. Installing required capability tools..."
if command -v apk >/dev/null 2>&1; then
$SUDO apk add --no-cache libcap || die "Failed to install libcap"
elif command -v apt-get >/dev/null 2>&1; then
$SUDO apt-get update -qq && $SUDO apt-get install -y -qq libcap2-bin || die "Failed to install libcap2-bin"
elif command -v dnf >/dev/null 2>&1 || command -v yum >/dev/null 2>&1; then
$SUDO ${YUM_CMD:-yum} install -y -q libcap || die "Failed to install libcap"
else
die "Cannot install 'setcap'. Package manager not found. Please install libcap manually."
fi
fi
} }
detect_arch() { detect_arch() {
sys_arch="$(uname -m)" arch="$(uname -m)"
case "$sys_arch" in case "$arch" in
x86_64|amd64) echo "x86_64" ;; x86_64|amd64) printf 'x86_64\n' ;;
aarch64|arm64) echo "aarch64" ;; aarch64|arm64) printf 'aarch64\n' ;;
*) die "Unsupported architecture: $sys_arch" ;; *) die "unsupported architecture: $arch" ;;
esac esac
} }
detect_libc() { detect_libc() {
if command -v ldd >/dev/null 2>&1 && ldd --version 2>&1 | grep -qi musl; then case "$(ldd --version 2>&1 || true)" in
echo "musl"; return 0 *musl*) printf 'musl\n' ;;
fi *) printf 'gnu\n' ;;
esac
if grep -q '^ID=alpine' /etc/os-release 2>/dev/null || grep -q '^ID="alpine"' /etc/os-release 2>/dev/null; then
echo "musl"; return 0
fi
for f in /lib/ld-musl-*.so.* /lib64/ld-musl-*.so.*; do
if [ -e "$f" ]; then
echo "musl"; return 0
fi
done
echo "gnu"
} }
fetch_file() { fetch_to_stdout() {
fetch_url="$1" url="$1"
fetch_out="$2"
if command -v curl >/dev/null 2>&1; then if command -v curl >/dev/null 2>&1; then
curl -fsSL "$fetch_url" -o "$fetch_out" || return 1 curl -fsSL "$url"
elif command -v wget >/dev/null 2>&1; then elif command -v wget >/dev/null 2>&1; then
wget -qO "$fetch_out" "$fetch_url" || return 1 wget -qO- "$url"
else else
die "curl or wget required" die "neither curl nor wget is installed"
fi
}
ensure_user_group() {
nologin_bin="/bin/false"
cmd_nologin="$(command -v nologin 2>/dev/null || true)"
if [ -n "$cmd_nologin" ] && [ -x "$cmd_nologin" ]; then
nologin_bin="$cmd_nologin"
else
for bin in /sbin/nologin /usr/sbin/nologin; do
if [ -x "$bin" ]; then
nologin_bin="$bin"
break
fi
done
fi
if ! group_exists telemt; then
if command -v groupadd >/dev/null 2>&1; then
$SUDO groupadd -r telemt || die "Failed to create group via groupadd"
elif command -v addgroup >/dev/null 2>&1; then
$SUDO addgroup -S telemt || die "Failed to create group via addgroup"
else
die "Cannot create group: neither groupadd nor addgroup found"
fi
fi
if ! user_exists telemt; then
if command -v useradd >/dev/null 2>&1; then
$SUDO useradd -r -g telemt -d "$WORK_DIR" -s "$nologin_bin" -c "Telemt Proxy" telemt || die "Failed to create user via useradd"
elif command -v adduser >/dev/null 2>&1; then
$SUDO adduser -S -D -H -h "$WORK_DIR" -s "$nologin_bin" -G telemt telemt || die "Failed to create user via adduser"
else
die "Cannot create user: neither useradd nor adduser found"
fi
fi
}
setup_dirs() {
say "Setting up directories..."
$SUDO mkdir -p "$WORK_DIR" "$CONFIG_DIR" || die "Failed to create directories"
$SUDO chown telemt:telemt "$WORK_DIR" || die "Failed to set owner on WORK_DIR"
$SUDO chmod 750 "$WORK_DIR" || die "Failed to set permissions on WORK_DIR"
}
stop_service() {
say "Stopping service if running..."
if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then
$SUDO systemctl stop "$SERVICE_NAME" 2>/dev/null || true
elif command -v rc-service >/dev/null 2>&1; then
$SUDO rc-service "$SERVICE_NAME" stop 2>/dev/null || true
fi fi
} }
install_binary() { install_binary() {
bin_src="$1" src="$1"
bin_dst="$2" dst="$2"
$SUDO mkdir -p "$INSTALL_DIR" || die "Failed to create install directory" if [ -w "$INSTALL_DIR" ] || { [ ! -e "$INSTALL_DIR" ] && [ -w "$(dirname "$INSTALL_DIR")" ]; }; then
if command -v install >/dev/null 2>&1; then mkdir -p "$INSTALL_DIR"
$SUDO install -m 0755 "$bin_src" "$bin_dst" || die "Failed to install binary" install -m 0755 "$src" "$dst"
elif command -v sudo >/dev/null 2>&1; then
sudo mkdir -p "$INSTALL_DIR"
sudo install -m 0755 "$src" "$dst"
else else
$SUDO rm -f "$bin_dst" die "cannot write to $INSTALL_DIR and sudo is not available"
$SUDO cp "$bin_src" "$bin_dst" || die "Failed to copy binary"
$SUDO chmod 0755 "$bin_dst" || die "Failed to set permissions"
fi
if [ ! -x "$bin_dst" ]; then
die "Failed to install binary or it is not executable: $bin_dst"
fi
say "Granting network bind capabilities to bind port 443..."
if ! $SUDO setcap cap_net_bind_service=+ep "$bin_dst" 2>/dev/null; then
say "[WARNING] Failed to apply setcap. The service will NOT be able to open port 443!"
say "[WARNING] This usually happens inside unprivileged Docker/LXC containers."
fi fi
} }
generate_secret() { need_cmd uname
if command -v openssl >/dev/null 2>&1; then need_cmd tar
secret="$(openssl rand -hex 16 2>/dev/null)" && [ -n "$secret" ] && { echo "$secret"; return 0; } need_cmd mktemp
fi need_cmd grep
if command -v xxd >/dev/null 2>&1; then need_cmd install
secret="$(dd if=/dev/urandom bs=1 count=16 2>/dev/null | xxd -p | tr -d '\n')" && [ -n "$secret" ] && { echo "$secret"; return 0; }
fi
secret="$(dd if=/dev/urandom bs=1 count=16 2>/dev/null | od -An -tx1 | tr -d ' \n')" && [ -n "$secret" ] && { echo "$secret"; return 0; }
return 1
}
generate_config_content() {
cat <<EOF
[general]
use_middle_proxy = false
[general.modes]
classic = false
secure = false
tls = true
[server]
port = 443
[server.api]
enabled = true
listen = "127.0.0.1:9091"
whitelist = ["127.0.0.1/32"]
[censorship]
tls_domain = "petrovich.ru"
[access.users]
hello = "$1"
EOF
}
install_config() {
config_exists=0
if [ -n "$SUDO" ]; then
$SUDO sh -c "[ -f '$CONFIG_FILE' ]" 2>/dev/null && config_exists=1 || true
else
[ -f "$CONFIG_FILE" ] && config_exists=1 || true
fi
if [ "$config_exists" -eq 1 ]; then
say "Config already exists, skipping generation."
return 0
fi
toml_secret="$(generate_secret)" || die "Failed to generate secret"
say "Creating config at $CONFIG_FILE..."
tmp_conf="$(mktemp "${TEMP_DIR:-/tmp}/telemt_conf.XXXXXX")" || die "Failed to create temp config"
generate_config_content "$toml_secret" > "$tmp_conf" || die "Failed to write temp config"
$SUDO mv "$tmp_conf" "$CONFIG_FILE" || die "Failed to install config file"
$SUDO chown root:telemt "$CONFIG_FILE" || die "Failed to set owner"
$SUDO chmod 640 "$CONFIG_FILE" || die "Failed to set config permissions"
say "Secret for user 'hello': $toml_secret"
}
generate_systemd_content() {
cat <<EOF
[Unit]
Description=Telemt Proxy Service
After=network-online.target
[Service]
Type=simple
User=telemt
Group=telemt
WorkingDirectory=$WORK_DIR
ExecStart=${INSTALL_DIR}/${BIN_NAME} ${CONFIG_FILE}
Restart=on-failure
LimitNOFILE=65536
AmbientCapabilities=CAP_NET_BIND_SERVICE
CapabilityBoundingSet=CAP_NET_BIND_SERVICE
[Install]
WantedBy=multi-user.target
EOF
}
generate_openrc_content() {
cat <<EOF
#!/sbin/openrc-run
name="$SERVICE_NAME"
description="Telemt Proxy Service"
command="${INSTALL_DIR}/${BIN_NAME}"
command_args="${CONFIG_FILE}"
command_background=true
command_user="telemt:telemt"
pidfile="/run/\${RC_SVCNAME}.pid"
directory="${WORK_DIR}"
rc_ulimit="-n 65536"
depend() { need net; use logger; }
EOF
}
install_service() {
if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then
say "Installing systemd service..."
tmp_svc="$(mktemp "${TEMP_DIR:-/tmp}/${SERVICE_NAME}.service.XXXXXX")" || die "Failed to create temp service"
generate_systemd_content > "$tmp_svc" || die "Failed to generate service content"
$SUDO mv "$tmp_svc" "/etc/systemd/system/${SERVICE_NAME}.service" || die "Failed to move service file"
$SUDO chown root:root "/etc/systemd/system/${SERVICE_NAME}.service"
$SUDO chmod 644 "/etc/systemd/system/${SERVICE_NAME}.service"
$SUDO systemctl daemon-reload || die "Failed to reload systemd"
$SUDO systemctl enable "$SERVICE_NAME" || die "Failed to enable service"
$SUDO systemctl start "$SERVICE_NAME" || die "Failed to start service"
elif command -v rc-update >/dev/null 2>&1; then
say "Installing OpenRC service..."
tmp_svc="$(mktemp "${TEMP_DIR:-/tmp}/${SERVICE_NAME}.init.XXXXXX")" || die "Failed to create temp file"
generate_openrc_content > "$tmp_svc" || die "Failed to generate init content"
$SUDO mv "$tmp_svc" "/etc/init.d/${SERVICE_NAME}" || die "Failed to move service file"
$SUDO chown root:root "/etc/init.d/${SERVICE_NAME}"
$SUDO chmod 0755 "/etc/init.d/${SERVICE_NAME}"
$SUDO rc-update add "$SERVICE_NAME" default 2>/dev/null || die "Failed to register service"
$SUDO rc-service "$SERVICE_NAME" start 2>/dev/null || die "Failed to start OpenRC service"
else
say "No service manager found. You can start it manually with:"
if [ -n "$SUDO" ]; then
say " sudo -u telemt ${INSTALL_DIR}/${BIN_NAME} ${CONFIG_FILE}"
else
say " su -s /bin/sh telemt -c '${INSTALL_DIR}/${BIN_NAME} ${CONFIG_FILE}'"
fi
fi
}
kill_user_procs() {
say "Ensuring $BIN_NAME processes are killed..."
if pkill_cmd="$(command -v pkill 2>/dev/null)"; then
$SUDO "$pkill_cmd" -u telemt "$BIN_NAME" 2>/dev/null || true
sleep 1
$SUDO "$pkill_cmd" -9 -u telemt "$BIN_NAME" 2>/dev/null || true
elif killall_cmd="$(command -v killall 2>/dev/null)"; then
$SUDO "$killall_cmd" "$BIN_NAME" 2>/dev/null || true
sleep 1
$SUDO "$killall_cmd" -9 "$BIN_NAME" 2>/dev/null || true
fi
}
uninstall() {
purge_data=0
[ "$ACTION" = "purge" ] && purge_data=1
say "Uninstalling $BIN_NAME..."
stop_service
if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then
$SUDO systemctl disable "$SERVICE_NAME" 2>/dev/null || true
$SUDO rm -f "/etc/systemd/system/${SERVICE_NAME}.service"
$SUDO systemctl daemon-reload || true
elif command -v rc-update >/dev/null 2>&1; then
$SUDO rc-update del "$SERVICE_NAME" 2>/dev/null || true
$SUDO rm -f "/etc/init.d/${SERVICE_NAME}"
fi
kill_user_procs
$SUDO rm -f "${INSTALL_DIR}/${BIN_NAME}"
$SUDO userdel telemt 2>/dev/null || $SUDO deluser telemt 2>/dev/null || true
$SUDO groupdel telemt 2>/dev/null || $SUDO delgroup telemt 2>/dev/null || true
if [ "$purge_data" -eq 1 ]; then
say "Purging configuration and data..."
$SUDO rm -rf "$CONFIG_DIR" "$WORK_DIR"
else
say "Note: Configuration in $CONFIG_DIR was kept. Run with '--purge' to remove it."
fi
say "Uninstallation complete."
exit 0
}
# ============================================================================
# Main Entry Point
# ============================================================================
case "$ACTION" in
help)
show_help
;;
uninstall|purge)
verify_common
uninstall
;;
install)
say "Starting installation..."
verify_common
verify_install_deps
ARCH="$(detect_arch)" ARCH="$(detect_arch)"
LIBC="$(detect_libc)" OS="$(detect_os)"
say "Detected system: $ARCH-linux-$LIBC"
FILE_NAME="${BIN_NAME}-${ARCH}-linux-${LIBC}.tar.gz" if [ "$OS" != "linux" ]; then
FILE_NAME="$(printf '%s' "$FILE_NAME" | tr -d ' \t\n\r')" case "$OS" in
openbsd)
if [ "$TARGET_VERSION" = "latest" ]; then die "install.sh installs only Linux release artifacts. On OpenBSD, build from source (see docs/OPENBSD.en.md)."
DL_URL="https://github.com/${REPO}/releases/latest/download/${FILE_NAME}" ;;
else *)
DL_URL="https://github.com/${REPO}/releases/download/${TARGET_VERSION}/${FILE_NAME}" die "unsupported operating system for install.sh: $OS"
fi
TEMP_DIR="$(mktemp -d)" || die "Failed to create temp directory"
if [ -z "$TEMP_DIR" ] || [ ! -d "$TEMP_DIR" ]; then
die "Temp directory creation failed"
fi
say "Downloading from $DL_URL..."
fetch_file "$DL_URL" "${TEMP_DIR}/archive.tar.gz" || die "Download failed (check version or network)"
gzip -dc "${TEMP_DIR}/archive.tar.gz" | tar -xf - -C "$TEMP_DIR" || die "Extraction failed"
EXTRACTED_BIN="$(find "$TEMP_DIR" -type f -name "$BIN_NAME" -print 2>/dev/null | head -n 1)"
[ -z "$EXTRACTED_BIN" ] && die "Binary '$BIN_NAME' not found in archive"
ensure_user_group
setup_dirs
stop_service
say "Installing binary..."
install_binary "$EXTRACTED_BIN" "${INSTALL_DIR}/${BIN_NAME}"
install_config
install_service
say ""
say "============================================="
say "Installation complete!"
say "============================================="
if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then
say "To check the logs, run:"
say " journalctl -u $SERVICE_NAME -f"
say ""
fi
say "To get user connection links, run:"
if command -v jq >/dev/null 2>&1; then
say " curl -s http://127.0.0.1:9091/v1/users | jq -r '.data[] | \"User: \\(.username)\\n\\(.links.tls[0] // empty)\"'"
else
say " curl -s http://127.0.0.1:9091/v1/users"
say " (Note: Install 'jq' package to see the links nicely formatted)"
fi
;; ;;
esac esac
fi
LIBC="$(detect_libc)"
case "$VERSION" in
latest)
URL="https://github.com/$REPO/releases/latest/download/${BIN_NAME}-${ARCH}-linux-${LIBC}.tar.gz"
;;
*)
URL="https://github.com/$REPO/releases/download/${VERSION}/${BIN_NAME}-${ARCH}-linux-${LIBC}.tar.gz"
;;
esac
TMPDIR="$(mktemp -d)"
trap 'rm -rf "$TMPDIR"' EXIT INT TERM
say "Installing $BIN_NAME ($VERSION) for $ARCH-linux-$LIBC..."
fetch_to_stdout "$URL" | tar -xzf - -C "$TMPDIR"
[ -f "$TMPDIR/$BIN_NAME" ] || die "archive did not contain $BIN_NAME"
install_binary "$TMPDIR/$BIN_NAME" "$INSTALL_DIR/$BIN_NAME"
say "Installed: $INSTALL_DIR/$BIN_NAME"
"$INSTALL_DIR/$BIN_NAME" --version 2>/dev/null || true

View File

@ -36,16 +36,12 @@ const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000;
const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000; const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000;
const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000; const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000;
const DEFAULT_ME_WARN_RATE_LIMIT_MS: u64 = 5000; const DEFAULT_ME_WARN_RATE_LIMIT_MS: u64 = 5000;
const DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS: u64 = 3000;
const DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS: u64 = 250;
const DEFAULT_ME_C2ME_SEND_TIMEOUT_MS: u64 = 4000;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000;
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30; const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
const DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS: u64 = 250;
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
const DEFAULT_UPSTREAM_CONNECT_BUDGET_MS: u64 = 3000; const DEFAULT_UPSTREAM_CONNECT_BUDGET_MS: u64 = 3000;
@ -160,10 +156,6 @@ pub(crate) fn default_server_max_connections() -> u32 {
10_000 10_000
} }
pub(crate) fn default_accept_permit_timeout_ms() -> u64 {
DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS
}
pub(crate) fn default_prefer_4() -> u8 { pub(crate) fn default_prefer_4() -> u8 {
4 4
} }
@ -388,18 +380,6 @@ pub(crate) fn default_me_warn_rate_limit_ms() -> u64 {
DEFAULT_ME_WARN_RATE_LIMIT_MS DEFAULT_ME_WARN_RATE_LIMIT_MS
} }
pub(crate) fn default_me_route_hybrid_max_wait_ms() -> u64 {
DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS
}
pub(crate) fn default_me_route_blocking_send_timeout_ms() -> u64 {
DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS
}
pub(crate) fn default_me_c2me_send_timeout_ms() -> u64 {
DEFAULT_ME_C2ME_SEND_TIMEOUT_MS
}
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 { pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
} }

View File

@ -39,7 +39,6 @@ use super::load::{LoadedConfig, ProxyConfig};
const HOT_RELOAD_STABLE_SNAPSHOTS: u8 = 2; const HOT_RELOAD_STABLE_SNAPSHOTS: u8 = 2;
const HOT_RELOAD_DEBOUNCE: Duration = Duration::from_millis(50); const HOT_RELOAD_DEBOUNCE: Duration = Duration::from_millis(50);
const HOT_RELOAD_STABLE_RECHECK: Duration = Duration::from_millis(75);
// ── Hot fields ──────────────────────────────────────────────────────────────── // ── Hot fields ────────────────────────────────────────────────────────────────
@ -380,14 +379,6 @@ impl ReloadState {
self.applied_snapshot_hash = Some(hash); self.applied_snapshot_hash = Some(hash);
self.reset_candidate(); self.reset_candidate();
} }
fn pending_candidate(&self) -> Option<(u64, u8)> {
let hash = self.candidate_snapshot_hash?;
if self.candidate_hits < HOT_RELOAD_STABLE_SNAPSHOTS {
return Some((hash, self.candidate_hits));
}
None
}
} }
fn normalize_watch_path(path: &Path) -> PathBuf { fn normalize_watch_path(path: &Path) -> PathBuf {
@ -612,8 +603,6 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
|| old.server.listen_tcp != new.server.listen_tcp || old.server.listen_tcp != new.server.listen_tcp
|| old.server.listen_unix_sock != new.server.listen_unix_sock || old.server.listen_unix_sock != new.server.listen_unix_sock
|| old.server.listen_unix_sock_perm != new.server.listen_unix_sock_perm || old.server.listen_unix_sock_perm != new.server.listen_unix_sock_perm
|| old.server.max_connections != new.server.max_connections
|| old.server.accept_permit_timeout_ms != new.server.accept_permit_timeout_ms
{ {
warned = true; warned = true;
warn!("config reload: server listener settings changed; restart required"); warn!("config reload: server listener settings changed; restart required");
@ -673,9 +662,6 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
} }
if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode
|| old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms || old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms
|| old.general.me_route_hybrid_max_wait_ms != new.general.me_route_hybrid_max_wait_ms
|| old.general.me_route_blocking_send_timeout_ms
!= new.general.me_route_blocking_send_timeout_ms
|| old.general.me_route_inline_recovery_attempts || old.general.me_route_inline_recovery_attempts
!= new.general.me_route_inline_recovery_attempts != new.general.me_route_inline_recovery_attempts
|| old.general.me_route_inline_recovery_wait_ms || old.general.me_route_inline_recovery_wait_ms
@ -684,10 +670,6 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
warned = true; warned = true;
warn!("config reload: general.me_route_no_writer_* changed; restart required"); warn!("config reload: general.me_route_no_writer_* changed; restart required");
} }
if old.general.me_c2me_send_timeout_ms != new.general.me_c2me_send_timeout_ms {
warned = true;
warn!("config reload: general.me_c2me_send_timeout_ms changed; restart required");
}
if old.general.unknown_dc_log_path != new.general.unknown_dc_log_path if old.general.unknown_dc_log_path != new.general.unknown_dc_log_path
|| old.general.unknown_dc_file_log_enabled != new.general.unknown_dc_file_log_enabled || old.general.unknown_dc_file_log_enabled != new.general.unknown_dc_file_log_enabled
{ {
@ -1271,73 +1253,6 @@ fn reload_config(
Some(next_manifest) Some(next_manifest)
} }
async fn reload_with_internal_stable_rechecks(
config_path: &PathBuf,
config_tx: &watch::Sender<Arc<ProxyConfig>>,
log_tx: &watch::Sender<LogLevel>,
detected_ip_v4: Option<IpAddr>,
detected_ip_v6: Option<IpAddr>,
reload_state: &mut ReloadState,
) -> Option<WatchManifest> {
let mut next_manifest = reload_config(
config_path,
config_tx,
log_tx,
detected_ip_v4,
detected_ip_v6,
reload_state,
);
let mut rechecks_left = HOT_RELOAD_STABLE_SNAPSHOTS.saturating_sub(1);
while rechecks_left > 0 {
let Some((snapshot_hash, candidate_hits)) = reload_state.pending_candidate() else {
break;
};
info!(
snapshot_hash,
candidate_hits,
required_hits = HOT_RELOAD_STABLE_SNAPSHOTS,
rechecks_left,
recheck_delay_ms = HOT_RELOAD_STABLE_RECHECK.as_millis(),
"config reload: scheduling internal stable recheck"
);
tokio::time::sleep(HOT_RELOAD_STABLE_RECHECK).await;
let recheck_manifest = reload_config(
config_path,
config_tx,
log_tx,
detected_ip_v4,
detected_ip_v6,
reload_state,
);
if recheck_manifest.is_some() {
next_manifest = recheck_manifest;
}
if reload_state.is_applied(snapshot_hash) {
info!(
snapshot_hash,
"config reload: applied after internal stable recheck"
);
break;
}
if reload_state.pending_candidate().is_none() {
info!(
snapshot_hash,
"config reload: internal stable recheck aborted"
);
break;
}
rechecks_left = rechecks_left.saturating_sub(1);
}
next_manifest
}
// ── Public API ──────────────────────────────────────────────────────────────── // ── Public API ────────────────────────────────────────────────────────────────
/// Spawn the hot-reload watcher task. /// Spawn the hot-reload watcher task.
@ -1461,16 +1376,14 @@ pub fn spawn_config_watcher(
tokio::time::sleep(HOT_RELOAD_DEBOUNCE).await; tokio::time::sleep(HOT_RELOAD_DEBOUNCE).await;
while notify_rx.try_recv().is_ok() {} while notify_rx.try_recv().is_ok() {}
if let Some(next_manifest) = reload_with_internal_stable_rechecks( if let Some(next_manifest) = reload_config(
&config_path, &config_path,
&config_tx, &config_tx,
&log_tx, &log_tx,
detected_ip_v4, detected_ip_v4,
detected_ip_v6, detected_ip_v6,
&mut reload_state, &mut reload_state,
) ) {
.await
{
apply_watch_manifest( apply_watch_manifest(
inotify_watcher.as_mut(), inotify_watcher.as_mut(),
poll_watcher.as_mut(), poll_watcher.as_mut(),
@ -1627,35 +1540,6 @@ mod tests {
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }
#[tokio::test]
async fn reload_cycle_applies_after_single_external_event() {
let initial_tag = "10101010101010101010101010101010";
let final_tag = "20202020202020202020202020202020";
let path = temp_config_path("telemt_hot_reload_single_event");
write_reload_config(&path, Some(initial_tag), None);
let initial_cfg = Arc::new(ProxyConfig::load(&path).unwrap());
let initial_hash = ProxyConfig::load_with_metadata(&path).unwrap().rendered_hash;
let (config_tx, _config_rx) = watch::channel(initial_cfg.clone());
let (log_tx, _log_rx) = watch::channel(initial_cfg.general.log_level.clone());
let mut reload_state = ReloadState::new(Some(initial_hash));
write_reload_config(&path, Some(final_tag), None);
reload_with_internal_stable_rechecks(
&path,
&config_tx,
&log_tx,
None,
None,
&mut reload_state,
)
.await
.unwrap();
assert_eq!(config_tx.borrow().general.ad_tag.as_deref(), Some(final_tag));
let _ = std::fs::remove_file(path);
}
#[test] #[test]
fn reload_keeps_hot_apply_when_non_hot_fields_change() { fn reload_keeps_hot_apply_when_non_hot_fields_change() {
let initial_tag = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; let initial_tag = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";

View File

@ -378,12 +378,6 @@ impl ProxyConfig {
)); ));
} }
if config.general.me_c2me_send_timeout_ms > 60_000 {
return Err(ProxyError::Config(
"general.me_c2me_send_timeout_ms must be within [0, 60000]".to_string(),
));
}
if config.general.me_reader_route_data_wait_ms > 20 { if config.general.me_reader_route_data_wait_ms > 20 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.me_reader_route_data_wait_ms must be within [0, 20]".to_string(), "general.me_reader_route_data_wait_ms must be within [0, 20]".to_string(),
@ -646,11 +640,6 @@ impl ProxyConfig {
"general.me_route_backpressure_base_timeout_ms must be > 0".to_string(), "general.me_route_backpressure_base_timeout_ms must be > 0".to_string(),
)); ));
} }
if config.general.me_route_backpressure_base_timeout_ms > 5000 {
return Err(ProxyError::Config(
"general.me_route_backpressure_base_timeout_ms must be within [1, 5000]".to_string(),
));
}
if config.general.me_route_backpressure_high_timeout_ms if config.general.me_route_backpressure_high_timeout_ms
< config.general.me_route_backpressure_base_timeout_ms < config.general.me_route_backpressure_base_timeout_ms
@ -659,11 +648,6 @@ impl ProxyConfig {
"general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(), "general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(),
)); ));
} }
if config.general.me_route_backpressure_high_timeout_ms > 5000 {
return Err(ProxyError::Config(
"general.me_route_backpressure_high_timeout_ms must be within [1, 5000]".to_string(),
));
}
if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) { if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) {
return Err(ProxyError::Config( return Err(ProxyError::Config(
@ -678,18 +662,6 @@ impl ProxyConfig {
)); ));
} }
if !(50..=60_000).contains(&config.general.me_route_hybrid_max_wait_ms) {
return Err(ProxyError::Config(
"general.me_route_hybrid_max_wait_ms must be within [50, 60000]".to_string(),
));
}
if config.general.me_route_blocking_send_timeout_ms > 5000 {
return Err(ProxyError::Config(
"general.me_route_blocking_send_timeout_ms must be within [0, 5000]".to_string(),
));
}
if !(2..=4).contains(&config.general.me_writer_pick_sample_size) { if !(2..=4).contains(&config.general.me_writer_pick_sample_size) {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.me_writer_pick_sample_size must be within [2, 4]".to_string(), "general.me_writer_pick_sample_size must be within [2, 4]".to_string(),
@ -750,12 +722,6 @@ impl ProxyConfig {
)); ));
} }
if config.server.accept_permit_timeout_ms > 60_000 {
return Err(ProxyError::Config(
"server.accept_permit_timeout_ms must be within [0, 60000]".to_string(),
));
}
if config.general.effective_me_pool_force_close_secs() > 0 if config.general.effective_me_pool_force_close_secs() > 0
&& config.general.effective_me_pool_force_close_secs() && config.general.effective_me_pool_force_close_secs()
< config.general.me_pool_drain_ttl_secs < config.general.me_pool_drain_ttl_secs
@ -1678,47 +1644,6 @@ mod tests {
let _ = std::fs::remove_file(path_valid); let _ = std::fs::remove_file(path_valid);
} }
#[test]
fn me_route_backpressure_base_timeout_ms_out_of_range_is_rejected() {
let toml = r#"
[general]
me_route_backpressure_base_timeout_ms = 5001
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_route_backpressure_base_timeout_ms_out_of_range_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_route_backpressure_base_timeout_ms must be within [1, 5000]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn me_route_backpressure_high_timeout_ms_out_of_range_is_rejected() {
let toml = r#"
[general]
me_route_backpressure_base_timeout_ms = 100
me_route_backpressure_high_timeout_ms = 5001
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_route_backpressure_high_timeout_ms_out_of_range_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_route_backpressure_high_timeout_ms must be within [1, 5000]"));
let _ = std::fs::remove_file(path);
}
#[test] #[test]
fn me_route_no_writer_wait_ms_out_of_range_is_rejected() { fn me_route_no_writer_wait_ms_out_of_range_is_rejected() {
let toml = r#" let toml = r#"

View File

@ -462,11 +462,6 @@ pub struct GeneralConfig {
#[serde(default = "default_me_c2me_channel_capacity")] #[serde(default = "default_me_c2me_channel_capacity")]
pub me_c2me_channel_capacity: usize, pub me_c2me_channel_capacity: usize,
/// Maximum wait in milliseconds for enqueueing C2ME commands when the queue is full.
/// `0` keeps legacy unbounded wait behavior.
#[serde(default = "default_me_c2me_send_timeout_ms")]
pub me_c2me_send_timeout_ms: u64,
/// Bounded wait in milliseconds for routing ME DATA to per-connection queue. /// Bounded wait in milliseconds for routing ME DATA to per-connection queue.
/// `0` keeps legacy no-wait behavior. /// `0` keeps legacy no-wait behavior.
#[serde(default = "default_me_reader_route_data_wait_ms")] #[serde(default = "default_me_reader_route_data_wait_ms")]
@ -721,15 +716,6 @@ pub struct GeneralConfig {
#[serde(default = "default_me_route_no_writer_wait_ms")] #[serde(default = "default_me_route_no_writer_wait_ms")]
pub me_route_no_writer_wait_ms: u64, pub me_route_no_writer_wait_ms: u64,
/// Maximum cumulative wait in milliseconds for hybrid no-writer mode before failfast.
#[serde(default = "default_me_route_hybrid_max_wait_ms")]
pub me_route_hybrid_max_wait_ms: u64,
/// Maximum wait in milliseconds for blocking ME writer channel send fallback.
/// `0` keeps legacy unbounded wait behavior.
#[serde(default = "default_me_route_blocking_send_timeout_ms")]
pub me_route_blocking_send_timeout_ms: u64,
/// Number of inline recovery attempts in legacy mode. /// Number of inline recovery attempts in legacy mode.
#[serde(default = "default_me_route_inline_recovery_attempts")] #[serde(default = "default_me_route_inline_recovery_attempts")]
pub me_route_inline_recovery_attempts: u32, pub me_route_inline_recovery_attempts: u32,
@ -935,7 +921,6 @@ impl Default for GeneralConfig {
me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(), me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(),
me_route_channel_capacity: default_me_route_channel_capacity(), me_route_channel_capacity: default_me_route_channel_capacity(),
me_c2me_channel_capacity: default_me_c2me_channel_capacity(), me_c2me_channel_capacity: default_me_c2me_channel_capacity(),
me_c2me_send_timeout_ms: default_me_c2me_send_timeout_ms(),
me_reader_route_data_wait_ms: default_me_reader_route_data_wait_ms(), me_reader_route_data_wait_ms: default_me_reader_route_data_wait_ms(),
me_d2c_flush_batch_max_frames: default_me_d2c_flush_batch_max_frames(), me_d2c_flush_batch_max_frames: default_me_d2c_flush_batch_max_frames(),
me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(), me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(),
@ -1005,8 +990,6 @@ impl Default for GeneralConfig {
me_warn_rate_limit_ms: default_me_warn_rate_limit_ms(), me_warn_rate_limit_ms: default_me_warn_rate_limit_ms(),
me_route_no_writer_mode: MeRouteNoWriterMode::default(), me_route_no_writer_mode: MeRouteNoWriterMode::default(),
me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(), me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(),
me_route_hybrid_max_wait_ms: default_me_route_hybrid_max_wait_ms(),
me_route_blocking_send_timeout_ms: default_me_route_blocking_send_timeout_ms(),
me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(), me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(),
me_route_inline_recovery_wait_ms: default_me_route_inline_recovery_wait_ms(), me_route_inline_recovery_wait_ms: default_me_route_inline_recovery_wait_ms(),
links: LinksConfig::default(), links: LinksConfig::default(),
@ -1242,11 +1225,6 @@ pub struct ServerConfig {
/// 0 means unlimited. /// 0 means unlimited.
#[serde(default = "default_server_max_connections")] #[serde(default = "default_server_max_connections")]
pub max_connections: u32, pub max_connections: u32,
/// Maximum wait in milliseconds while acquiring a connection slot permit.
/// `0` keeps legacy unbounded wait behavior.
#[serde(default = "default_accept_permit_timeout_ms")]
pub accept_permit_timeout_ms: u64,
} }
impl Default for ServerConfig { impl Default for ServerConfig {
@ -1266,7 +1244,6 @@ impl Default for ServerConfig {
api: ApiConfig::default(), api: ApiConfig::default(),
listeners: Vec::new(), listeners: Vec::new(),
max_connections: default_server_max_connections(), max_connections: default_server_max_connections(),
accept_permit_timeout_ms: default_accept_permit_timeout_ms(),
} }
} }
} }

View File

@ -205,7 +205,6 @@ pub(crate) fn format_uptime(total_secs: u64) -> String {
format!("{} / {} seconds", parts.join(", "), total_secs) format!("{} / {} seconds", parts.join(", "), total_secs)
} }
#[allow(dead_code)]
pub(crate) async fn wait_until_admission_open(admission_rx: &mut watch::Receiver<bool>) -> bool { pub(crate) async fn wait_until_admission_open(admission_rx: &mut watch::Receiver<bool>) -> bool {
loop { loop {
if *admission_rx.borrow() { if *admission_rx.borrow() {

View File

@ -24,7 +24,7 @@ use crate::transport::{
ListenOptions, UpstreamManager, create_listener, find_listener_processes, ListenOptions, UpstreamManager, create_listener, find_listener_processes,
}; };
use super::helpers::{is_expected_handshake_eof, print_proxy_links}; use super::helpers::{is_expected_handshake_eof, print_proxy_links, wait_until_admission_open};
pub(crate) struct BoundListeners { pub(crate) struct BoundListeners {
pub(crate) listeners: Vec<(TcpListener, bool)>, pub(crate) listeners: Vec<(TcpListener, bool)>,
@ -195,7 +195,7 @@ pub(crate) async fn bind_listeners(
has_unix_listener = true; has_unix_listener = true;
let mut config_rx_unix: watch::Receiver<Arc<ProxyConfig>> = config_rx.clone(); let mut config_rx_unix: watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
let admission_rx_unix = admission_rx.clone(); let mut admission_rx_unix = admission_rx.clone();
let stats = stats.clone(); let stats = stats.clone();
let upstream_manager = upstream_manager.clone(); let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone(); let replay_checker = replay_checker.clone();
@ -212,45 +212,18 @@ pub(crate) async fn bind_listeners(
let unix_conn_counter = Arc::new(std::sync::atomic::AtomicU64::new(1)); let unix_conn_counter = Arc::new(std::sync::atomic::AtomicU64::new(1));
loop { loop {
if !wait_until_admission_open(&mut admission_rx_unix).await {
warn!("Conditional-admission gate channel closed for unix listener");
break;
}
match unix_listener.accept().await { match unix_listener.accept().await {
Ok((stream, _)) => { Ok((stream, _)) => {
if !*admission_rx_unix.borrow() { let permit = match max_connections_unix.clone().acquire_owned().await {
drop(stream);
continue;
}
let accept_permit_timeout_ms = config_rx_unix
.borrow()
.server
.accept_permit_timeout_ms;
let permit = if accept_permit_timeout_ms == 0 {
match max_connections_unix.clone().acquire_owned().await {
Ok(permit) => permit, Ok(permit) => permit,
Err(_) => { Err(_) => {
error!("Connection limiter is closed"); error!("Connection limiter is closed");
break; break;
} }
}
} else {
match tokio::time::timeout(
Duration::from_millis(accept_permit_timeout_ms),
max_connections_unix.clone().acquire_owned(),
)
.await
{
Ok(Ok(permit)) => permit,
Ok(Err(_)) => {
error!("Connection limiter is closed");
break;
}
Err(_) => {
debug!(
timeout_ms = accept_permit_timeout_ms,
"Dropping accepted unix connection: permit wait timeout"
);
drop(stream);
continue;
}
}
}; };
let conn_id = let conn_id =
unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
@ -339,7 +312,7 @@ pub(crate) fn spawn_tcp_accept_loops(
) { ) {
for (listener, listener_proxy_protocol) in listeners { for (listener, listener_proxy_protocol) in listeners {
let mut config_rx: watch::Receiver<Arc<ProxyConfig>> = config_rx.clone(); let mut config_rx: watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
let admission_rx_tcp = admission_rx.clone(); let mut admission_rx_tcp = admission_rx.clone();
let stats = stats.clone(); let stats = stats.clone();
let upstream_manager = upstream_manager.clone(); let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone(); let replay_checker = replay_checker.clone();
@ -354,47 +327,18 @@ pub(crate) fn spawn_tcp_accept_loops(
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
if !wait_until_admission_open(&mut admission_rx_tcp).await {
warn!("Conditional-admission gate channel closed for tcp listener");
break;
}
match listener.accept().await { match listener.accept().await {
Ok((stream, peer_addr)) => { Ok((stream, peer_addr)) => {
if !*admission_rx_tcp.borrow() { let permit = match max_connections_tcp.clone().acquire_owned().await {
debug!(peer = %peer_addr, "Admission gate closed, dropping connection");
drop(stream);
continue;
}
let accept_permit_timeout_ms = config_rx
.borrow()
.server
.accept_permit_timeout_ms;
let permit = if accept_permit_timeout_ms == 0 {
match max_connections_tcp.clone().acquire_owned().await {
Ok(permit) => permit, Ok(permit) => permit,
Err(_) => { Err(_) => {
error!("Connection limiter is closed"); error!("Connection limiter is closed");
break; break;
} }
}
} else {
match tokio::time::timeout(
Duration::from_millis(accept_permit_timeout_ms),
max_connections_tcp.clone().acquire_owned(),
)
.await
{
Ok(Ok(permit)) => permit,
Ok(Err(_)) => {
error!("Connection limiter is closed");
break;
}
Err(_) => {
debug!(
peer = %peer_addr,
timeout_ms = accept_permit_timeout_ms,
"Dropping accepted connection: permit wait timeout"
);
drop(stream);
continue;
}
}
}; };
let config = config_rx.borrow_and_update().clone(); let config = config_rx.borrow_and_update().clone();
let stats = stats.clone(); let stats = stats.clone();

View File

@ -267,8 +267,6 @@ pub(crate) async fn initialize_me_pool(
config.general.me_warn_rate_limit_ms, config.general.me_warn_rate_limit_ms,
config.general.me_route_no_writer_mode, config.general.me_route_no_writer_mode,
config.general.me_route_no_writer_wait_ms, config.general.me_route_no_writer_wait_ms,
config.general.me_route_hybrid_max_wait_ms,
config.general.me_route_blocking_send_timeout_ms,
config.general.me_route_inline_recovery_attempts, config.general.me_route_inline_recovery_attempts,
config.general.me_route_inline_recovery_wait_ms, config.general.me_route_inline_recovery_wait_ms,
); );

View File

@ -1692,57 +1692,6 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
} }
); );
let _ = writeln!(
out,
"# HELP telemt_me_writer_close_signal_drop_total Close-signal drops for already-removed ME writers"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_close_signal_drop_total counter");
let _ = writeln!(
out,
"telemt_me_writer_close_signal_drop_total {}",
if me_allows_normal {
stats.get_me_writer_close_signal_drop_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_close_signal_channel_full_total Close-signal drops caused by full writer command channels"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
);
let _ = writeln!(
out,
"telemt_me_writer_close_signal_channel_full_total {}",
if me_allows_normal {
stats.get_me_writer_close_signal_channel_full_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_draining_writers_reap_progress_total Draining-writer removals processed by reap cleanup"
);
let _ = writeln!(
out,
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
);
let _ = writeln!(
out,
"telemt_me_draining_writers_reap_progress_total {}",
if me_allows_normal {
stats.get_me_draining_writers_reap_progress_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals"); let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals");
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter"); let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter");
let _ = writeln!( let _ = writeln!(
@ -2175,13 +2124,6 @@ mod tests {
assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter")); assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"));
assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter")); assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter"));
assert!(output.contains(
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
));
assert!(output.contains(
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
));
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter")); assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter"));
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter")); assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter"));
assert!(output.contains( assert!(output.contains(

View File

@ -222,7 +222,6 @@ fn should_yield_c2me_sender(sent_since_yield: usize, has_backlog: bool) -> bool
async fn enqueue_c2me_command( async fn enqueue_c2me_command(
tx: &mpsc::Sender<C2MeCommand>, tx: &mpsc::Sender<C2MeCommand>,
cmd: C2MeCommand, cmd: C2MeCommand,
send_timeout: Duration,
) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> { ) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> {
match tx.try_send(cmd) { match tx.try_send(cmd) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
@ -232,17 +231,7 @@ async fn enqueue_c2me_command(
if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS { if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS {
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
if send_timeout.is_zero() { tx.send(cmd).await
return tx.send(cmd).await;
}
match tokio::time::timeout(send_timeout, tx.reserve()).await {
Ok(Ok(permit)) => {
permit.send(cmd);
Ok(())
}
Ok(Err(_)) => Err(mpsc::error::SendError(cmd)),
Err(_) => Err(mpsc::error::SendError(cmd)),
}
} }
} }
} }
@ -366,7 +355,6 @@ where
.general .general
.me_c2me_channel_capacity .me_c2me_channel_capacity
.max(C2ME_CHANNEL_CAPACITY_FALLBACK); .max(C2ME_CHANNEL_CAPACITY_FALLBACK);
let c2me_send_timeout = Duration::from_millis(config.general.me_c2me_send_timeout_ms);
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity); let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity);
let me_pool_c2me = me_pool.clone(); let me_pool_c2me = me_pool.clone();
let effective_tag = effective_tag; let effective_tag = effective_tag;
@ -375,21 +363,6 @@ where
while let Some(cmd) = c2me_rx.recv().await { while let Some(cmd) = c2me_rx.recv().await {
match cmd { match cmd {
C2MeCommand::Data { payload, flags } => { C2MeCommand::Data { payload, flags } => {
if c2me_send_timeout.is_zero() {
me_pool_c2me
.send_proxy_req(
conn_id,
success.dc_idx,
peer,
translated_local_addr,
payload.as_ref(),
flags,
effective_tag.as_deref(),
)
.await?;
} else {
match tokio::time::timeout(
c2me_send_timeout,
me_pool_c2me.send_proxy_req( me_pool_c2me.send_proxy_req(
conn_id, conn_id,
success.dc_idx, success.dc_idx,
@ -398,19 +371,7 @@ where
payload.as_ref(), payload.as_ref(),
flags, flags,
effective_tag.as_deref(), effective_tag.as_deref(),
), ).await?;
)
.await
{
Ok(send_result) => send_result?,
Err(_) => {
return Err(ProxyError::Proxy(format!(
"ME send timeout after {}ms",
c2me_send_timeout.as_millis()
)));
}
}
}
sent_since_yield = sent_since_yield.saturating_add(1); sent_since_yield = sent_since_yield.saturating_add(1);
if should_yield_c2me_sender(sent_since_yield, !c2me_rx.is_empty()) { if should_yield_c2me_sender(sent_since_yield, !c2me_rx.is_empty()) {
sent_since_yield = 0; sent_since_yield = 0;
@ -594,7 +555,7 @@ where
loop { loop {
if session_lease.is_stale() { if session_lease.is_stale() {
stats.increment_reconnect_stale_close_total(); stats.increment_reconnect_stale_close_total();
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await; let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
main_result = Err(ProxyError::Proxy("Session evicted by reconnect".to_string())); main_result = Err(ProxyError::Proxy("Session evicted by reconnect".to_string()));
break; break;
} }
@ -612,7 +573,7 @@ where
"Cutover affected middle session, closing client connection" "Cutover affected middle session, closing client connection"
); );
tokio::time::sleep(delay).await; tokio::time::sleep(delay).await;
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await; let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string())); main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
break; break;
} }
@ -646,11 +607,7 @@ where
flags |= RPC_FLAG_NOT_ENCRYPTED; flags |= RPC_FLAG_NOT_ENCRYPTED;
} }
// Keep client read loop lightweight: route heavy ME send path via a dedicated task. // Keep client read loop lightweight: route heavy ME send path via a dedicated task.
if enqueue_c2me_command( if enqueue_c2me_command(&c2me_tx, C2MeCommand::Data { payload, flags })
&c2me_tx,
C2MeCommand::Data { payload, flags },
c2me_send_timeout,
)
.await .await
.is_err() .is_err()
{ {
@ -661,12 +618,7 @@ where
Ok(None) => { Ok(None) => {
debug!(conn_id, "Client EOF"); debug!(conn_id, "Client EOF");
client_closed = true; client_closed = true;
let _ = enqueue_c2me_command( let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
&c2me_tx,
C2MeCommand::Close,
c2me_send_timeout,
)
.await;
break; break;
} }
Err(e) => { Err(e) => {
@ -1041,7 +993,6 @@ mod tests {
payload: Bytes::from_static(&[1, 2, 3]), payload: Bytes::from_static(&[1, 2, 3]),
flags: 0, flags: 0,
}, },
TokioDuration::from_millis(50),
) )
.await .await
.unwrap(); .unwrap();
@ -1077,7 +1028,6 @@ mod tests {
payload: Bytes::from_static(&[7, 7]), payload: Bytes::from_static(&[7, 7]),
flags: 7, flags: 7,
}, },
TokioDuration::from_millis(100),
) )
.await .await
.unwrap(); .unwrap();

View File

@ -123,9 +123,6 @@ pub struct Stats {
pool_drain_soft_evict_total: AtomicU64, pool_drain_soft_evict_total: AtomicU64,
pool_drain_soft_evict_writer_total: AtomicU64, pool_drain_soft_evict_writer_total: AtomicU64,
pool_stale_pick_total: AtomicU64, pool_stale_pick_total: AtomicU64,
me_writer_close_signal_drop_total: AtomicU64,
me_writer_close_signal_channel_full_total: AtomicU64,
me_draining_writers_reap_progress_total: AtomicU64,
me_writer_removed_total: AtomicU64, me_writer_removed_total: AtomicU64,
me_writer_removed_unexpected_total: AtomicU64, me_writer_removed_unexpected_total: AtomicU64,
me_refill_triggered_total: AtomicU64, me_refill_triggered_total: AtomicU64,
@ -737,24 +734,6 @@ impl Stats {
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed); self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
} }
} }
pub fn increment_me_writer_close_signal_drop_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_close_signal_drop_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_close_signal_channel_full_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_close_signal_channel_full_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_draining_writers_reap_progress_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_draining_writers_reap_progress_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_removed_total(&self) { pub fn increment_me_writer_removed_total(&self) {
if self.telemetry_me_allows_debug() { if self.telemetry_me_allows_debug() {
self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed); self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed);
@ -1280,17 +1259,6 @@ impl Stats {
pub fn get_pool_stale_pick_total(&self) -> u64 { pub fn get_pool_stale_pick_total(&self) -> u64 {
self.pool_stale_pick_total.load(Ordering::Relaxed) self.pool_stale_pick_total.load(Ordering::Relaxed)
} }
pub fn get_me_writer_close_signal_drop_total(&self) -> u64 {
self.me_writer_close_signal_drop_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_close_signal_channel_full_total(&self) -> u64 {
self.me_writer_close_signal_channel_full_total
.load(Ordering::Relaxed)
}
pub fn get_me_draining_writers_reap_progress_total(&self) -> u64 {
self.me_draining_writers_reap_progress_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_removed_total(&self) -> u64 { pub fn get_me_writer_removed_total(&self) -> u64 {
self.me_writer_removed_total.load(Ordering::Relaxed) self.me_writer_removed_total.load(Ordering::Relaxed)
} }

View File

@ -314,8 +314,6 @@ pub(super) async fn reap_draining_writers(
} }
pool.stats.increment_pool_force_close_total(); pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(writer_id).await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
closed_total = closed_total.saturating_add(1); closed_total = closed_total.saturating_add(1);
} }
for writer_id in empty_writer_ids { for writer_id in empty_writer_ids {
@ -326,8 +324,6 @@ pub(super) async fn reap_draining_writers(
continue; continue;
} }
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(writer_id).await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
closed_total = closed_total.saturating_add(1); closed_total = closed_total.saturating_add(1);
} }
@ -1578,8 +1574,6 @@ mod tests {
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(), MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
) )

View File

@ -111,8 +111,6 @@ async fn make_pool(
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(), MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
); );

View File

@ -110,8 +110,6 @@ async fn make_pool(
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(), MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
); );

View File

@ -4,7 +4,6 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use bytes::Bytes;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -104,8 +103,6 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(), MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
) )
@ -210,89 +207,6 @@ async fn reap_draining_writers_removes_empty_draining_writers() {
assert_eq!(current_writer_ids(&pool).await, vec![3]); assert_eq!(current_writer_ids(&pool).await, vec![3]);
} }
#[tokio::test]
async fn reap_draining_writers_does_not_block_on_stuck_writer_close_signal() {
let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs();
let (blocked_tx, blocked_rx) = mpsc::channel::<WriterCommand>(1);
assert!(
blocked_tx
.try_send(WriterCommand::Data(Bytes::from_static(b"stuck")))
.is_ok()
);
let blocked_rx_guard = tokio::spawn(async move {
let _hold_rx = blocked_rx;
tokio::time::sleep(Duration::from_secs(30)).await;
});
let blocked_writer_id = 90u64;
let blocked_writer = MeWriter {
id: blocked_writer_id,
addr: SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
4500 + blocked_writer_id as u16,
),
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
writer_dc: 2,
generation: 1,
contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())),
created_at: Instant::now() - Duration::from_secs(blocked_writer_id),
tx: blocked_tx.clone(),
cancel: CancellationToken::new(),
degraded: Arc::new(AtomicBool::new(false)),
rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)),
draining: Arc::new(AtomicBool::new(true)),
draining_started_at_epoch_secs: Arc::new(AtomicU64::new(
now_epoch_secs.saturating_sub(120),
)),
drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)),
allow_drain_fallback: Arc::new(AtomicBool::new(false)),
};
pool.writers.write().await.push(blocked_writer);
pool.registry
.register_writer(blocked_writer_id, blocked_tx)
.await;
pool.conn_count.fetch_add(1, Ordering::Relaxed);
insert_draining_writer(&pool, 91, now_epoch_secs.saturating_sub(110), 0, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
let reap_res = tokio::time::timeout(
Duration::from_millis(500),
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed),
)
.await;
blocked_rx_guard.abort();
assert!(reap_res.is_ok(), "reap should not block on close signal");
assert!(current_writer_ids(&pool).await.is_empty());
assert_eq!(pool.stats.get_me_writer_close_signal_drop_total(), 2);
assert_eq!(pool.stats.get_me_writer_close_signal_channel_full_total(), 1);
assert_eq!(pool.stats.get_me_draining_writers_reap_progress_total(), 2);
let activity = pool.registry.writer_activity_snapshot().await;
assert!(!activity.bound_clients_by_writer.contains_key(&blocked_writer_id));
assert!(!activity.bound_clients_by_writer.contains_key(&91));
let (probe_conn_id, _rx) = pool.registry.register().await;
assert!(
!pool.registry
.bind_writer(
probe_conn_id,
blocked_writer_id,
ConnMeta {
target_dc: 2,
client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6400),
our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443),
proto_flags: 0,
},
)
.await
);
let _ = pool.registry.unregister(probe_conn_id).await;
}
#[tokio::test] #[tokio::test]
async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() { async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() {
let pool = make_pool(2).await; let pool = make_pool(2).await;

View File

@ -193,8 +193,6 @@ pub struct MePool {
pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>, pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>,
pub(super) me_route_no_writer_mode: AtomicU8, pub(super) me_route_no_writer_mode: AtomicU8,
pub(super) me_route_no_writer_wait: Duration, pub(super) me_route_no_writer_wait: Duration,
pub(super) me_route_hybrid_max_wait: Duration,
pub(super) me_route_blocking_send_timeout: Duration,
pub(super) me_route_inline_recovery_attempts: u32, pub(super) me_route_inline_recovery_attempts: u32,
pub(super) me_route_inline_recovery_wait: Duration, pub(super) me_route_inline_recovery_wait: Duration,
pub(super) me_health_interval_ms_unhealthy: AtomicU64, pub(super) me_health_interval_ms_unhealthy: AtomicU64,
@ -309,8 +307,6 @@ impl MePool {
me_warn_rate_limit_ms: u64, me_warn_rate_limit_ms: u64,
me_route_no_writer_mode: MeRouteNoWriterMode, me_route_no_writer_mode: MeRouteNoWriterMode,
me_route_no_writer_wait_ms: u64, me_route_no_writer_wait_ms: u64,
me_route_hybrid_max_wait_ms: u64,
me_route_blocking_send_timeout_ms: u64,
me_route_inline_recovery_attempts: u32, me_route_inline_recovery_attempts: u32,
me_route_inline_recovery_wait_ms: u64, me_route_inline_recovery_wait_ms: u64,
) -> Arc<Self> { ) -> Arc<Self> {
@ -494,10 +490,6 @@ impl MePool {
me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)), me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)),
me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()),
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
me_route_hybrid_max_wait: Duration::from_millis(me_route_hybrid_max_wait_ms),
me_route_blocking_send_timeout: Duration::from_millis(
me_route_blocking_send_timeout_ms,
),
me_route_inline_recovery_attempts, me_route_inline_recovery_attempts,
me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms), me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms),
me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)), me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)),

View File

@ -8,7 +8,6 @@ use bytes::Bytes;
use bytes::BytesMut; use bytes::BytesMut;
use rand::Rng; use rand::Rng;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
@ -313,6 +312,8 @@ impl MePool {
let mut p = Vec::with_capacity(12); let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_PING_U32.to_le_bytes()); p.extend_from_slice(&RPC_PING_U32.to_le_bytes());
p.extend_from_slice(&sent_id.to_le_bytes()); p.extend_from_slice(&sent_id.to_le_bytes());
{
let mut tracker = ping_tracker_ping.lock().await;
let now_epoch_ms = std::time::SystemTime::now() let now_epoch_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default() .unwrap_or_default()
@ -336,6 +337,17 @@ impl MePool {
run_cleanup = true; run_cleanup = true;
} }
} }
if run_cleanup {
let before = tracker.len();
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
let expired = before.saturating_sub(tracker.len());
if expired > 0 {
stats_ping.increment_me_keepalive_timeout_by(expired as u64);
}
}
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
}
ping_id = ping_id.wrapping_add(1); ping_id = ping_id.wrapping_add(1);
stats_ping.increment_me_keepalive_sent(); stats_ping.increment_me_keepalive_sent();
if tx_ping if tx_ping
@ -355,16 +367,6 @@ impl MePool {
} }
break; break;
} }
let mut tracker = ping_tracker_ping.lock().await;
if run_cleanup {
let before = tracker.len();
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
let expired = before.saturating_sub(tracker.len());
if expired > 0 {
stats_ping.increment_me_keepalive_timeout_by(expired as u64);
}
}
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
} }
}); });
@ -492,9 +494,11 @@ impl MePool {
} }
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) { pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
// Full client cleanup now happens inside `registry.writer_lost` to keep let conns = self.remove_writer_only(writer_id).await;
// writer reap/remove paths strictly non-blocking per connection. for bound in conns {
let _ = self.remove_writer_only(writer_id).await; let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await;
let _ = self.registry.unregister(bound.conn_id).await;
}
} }
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> { async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
@ -524,11 +528,6 @@ impl MePool {
self.conn_count.fetch_sub(1, Ordering::Relaxed); self.conn_count.fetch_sub(1, Ordering::Relaxed);
} }
} }
// State invariant:
// - writer is removed from `self.writers` (pool visibility),
// - writer is removed from registry routing/binding maps via `writer_lost`.
// The close command below is only a best-effort accelerator for task shutdown.
// Cleanup progress must never depend on command-channel availability.
let conns = self.registry.writer_lost(writer_id).await; let conns = self.registry.writer_lost(writer_id).await;
{ {
let mut tracker = self.ping_tracker.lock().await; let mut tracker = self.ping_tracker.lock().await;
@ -536,25 +535,7 @@ impl MePool {
} }
self.rtt_stats.lock().await.remove(&writer_id); self.rtt_stats.lock().await.remove(&writer_id);
if let Some(tx) = close_tx { if let Some(tx) = close_tx {
match tx.try_send(WriterCommand::Close) { let _ = tx.send(WriterCommand::Close).await;
Ok(()) => {}
Err(TrySendError::Full(_)) => {
self.stats.increment_me_writer_close_signal_drop_total();
self.stats
.increment_me_writer_close_signal_channel_full_total();
debug!(
writer_id,
"Skipping close signal for removed writer: command channel is full"
);
}
Err(TrySendError::Closed(_)) => {
self.stats.increment_me_writer_close_signal_drop_total();
debug!(
writer_id,
"Skipping close signal for removed writer: command channel is closed"
);
}
}
} }
if trigger_refill if trigger_refill
&& let Some(addr) = removed_addr && let Some(addr) = removed_addr

View File

@ -8,7 +8,6 @@ use bytes::{Bytes, BytesMut};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::sync::{Mutex, mpsc}; use tokio::sync::{Mutex, mpsc};
use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn}; use tracing::{debug, trace, warn};
@ -174,12 +173,12 @@ pub(crate) async fn reader_loop(
} else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 { } else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
debug!(cid, "RPC_CLOSE_EXT from ME"); debug!(cid, "RPC_CLOSE_EXT from ME");
let _ = reg.route_nowait(cid, MeResponse::Close).await; reg.route(cid, MeResponse::Close).await;
reg.unregister(cid).await; reg.unregister(cid).await;
} else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 { } else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
debug!(cid, "RPC_CLOSE_CONN from ME"); debug!(cid, "RPC_CLOSE_CONN from ME");
let _ = reg.route_nowait(cid, MeResponse::Close).await; reg.route(cid, MeResponse::Close).await;
reg.unregister(cid).await; reg.unregister(cid).await;
} else if pt == RPC_PING_U32 && body.len() >= 8 { } else if pt == RPC_PING_U32 && body.len() >= 8 {
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
@ -187,16 +186,14 @@ pub(crate) async fn reader_loop(
let mut pong = Vec::with_capacity(12); let mut pong = Vec::with_capacity(12);
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes()); pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
pong.extend_from_slice(&ping_id.to_le_bytes()); pong.extend_from_slice(&ping_id.to_le_bytes());
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(pong))) { if tx
Ok(()) => {} .send(WriterCommand::DataAndFlush(Bytes::from(pong)))
Err(TrySendError::Full(_)) => { .await
debug!(ping_id, "PONG dropped: writer command channel is full"); .is_err()
} {
Err(TrySendError::Closed(_)) => { warn!("PONG send failed");
warn!("PONG send failed: writer channel closed");
break; break;
} }
}
} else if pt == RPC_PONG_U32 && body.len() >= 8 { } else if pt == RPC_PONG_U32 && body.len() >= 8 {
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
stats.increment_me_keepalive_pong(); stats.increment_me_keepalive_pong();
@ -235,13 +232,6 @@ async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) {
let mut p = Vec::with_capacity(12); let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes()); p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes());
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
Ok(()) => {} let _ = tx.send(WriterCommand::DataAndFlush(Bytes::from(p))).await;
Err(TrySendError::Full(_)) => {
debug!(conn_id, "ME close_conn signal skipped: writer command channel is full");
}
Err(TrySendError::Closed(_)) => {
debug!(conn_id, "ME close_conn signal skipped: writer command channel is closed");
}
}
} }

View File

@ -169,7 +169,6 @@ impl ConnRegistry {
None None
} }
#[allow(dead_code)]
pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult { pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult {
let tx = { let tx = {
let inner = self.inner.read().await; let inner = self.inner.read().await;
@ -446,9 +445,6 @@ impl ConnRegistry {
} }
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> { pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
let mut close_txs = Vec::<mpsc::Sender<MeResponse>>::new();
let mut out = Vec::new();
{
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
inner.writers.remove(&writer_id); inner.writers.remove(&writer_id);
inner.last_meta_for_writer.remove(&writer_id); inner.last_meta_for_writer.remove(&writer_id);
@ -460,24 +456,19 @@ impl ConnRegistry {
.into_iter() .into_iter()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut out = Vec::new();
for conn_id in conns { for conn_id in conns {
if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) { if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) {
continue; continue;
} }
inner.writer_for_conn.remove(&conn_id); inner.writer_for_conn.remove(&conn_id);
if let Some(client_tx) = inner.map.remove(&conn_id) { if let Some(m) = inner.meta.get(&conn_id) {
close_txs.push(client_tx); out.push(BoundConn {
} conn_id,
if let Some(meta) = inner.meta.remove(&conn_id) { meta: m.clone(),
out.push(BoundConn { conn_id, meta }); });
} }
} }
}
for client_tx in close_txs {
let _ = client_tx.try_send(MeResponse::Close);
}
out out
} }
@ -500,7 +491,6 @@ impl ConnRegistry {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use super::ConnMeta; use super::ConnMeta;
use super::ConnRegistry; use super::ConnRegistry;
@ -673,39 +663,6 @@ mod tests {
assert!(registry.is_writer_empty(20).await); assert!(registry.is_writer_empty(20).await);
} }
#[tokio::test]
async fn writer_lost_removes_bound_conn_from_registry_and_signals_close() {
let registry = ConnRegistry::new();
let (conn_id, mut rx) = registry.register().await;
let (writer_tx, _writer_rx) = tokio::sync::mpsc::channel(8);
registry.register_writer(10, writer_tx).await;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
assert!(
registry
.bind_writer(
conn_id,
10,
ConnMeta {
target_dc: 2,
client_addr: addr,
our_addr: addr,
proto_flags: 0,
},
)
.await
);
let lost = registry.writer_lost(10).await;
assert_eq!(lost.len(), 1);
assert_eq!(lost[0].conn_id, conn_id);
assert!(registry.get_writer(conn_id).await.is_none());
assert!(registry.get_meta(conn_id).await.is_none());
assert_eq!(registry.unregister(conn_id).await, None);
let close = tokio::time::timeout(Duration::from_millis(50), rx.recv()).await;
assert!(matches!(close, Ok(Some(MeResponse::Close))));
}
#[tokio::test] #[tokio::test]
async fn bind_writer_rejects_unregistered_writer() { async fn bind_writer_rejects_unregistered_writer() {
let registry = ConnRegistry::new(); let registry = ConnRegistry::new();

View File

@ -6,7 +6,6 @@ use std::sync::atomic::Ordering;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use bytes::Bytes; use bytes::Bytes;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::error::TrySendError;
use tracing::{debug, warn}; use tracing::{debug, warn};
@ -30,29 +29,6 @@ const PICK_PENALTY_DRAINING: u64 = 600;
const PICK_PENALTY_STALE: u64 = 300; const PICK_PENALTY_STALE: u64 = 300;
const PICK_PENALTY_DEGRADED: u64 = 250; const PICK_PENALTY_DEGRADED: u64 = 250;
enum TimedSendError<T> {
Closed(T),
Timeout(T),
}
async fn send_writer_command_with_timeout(
tx: &mpsc::Sender<WriterCommand>,
cmd: WriterCommand,
timeout: Duration,
) -> std::result::Result<(), TimedSendError<WriterCommand>> {
if timeout.is_zero() {
return tx.send(cmd).await.map_err(|err| TimedSendError::Closed(err.0));
}
match tokio::time::timeout(timeout, tx.reserve()).await {
Ok(Ok(permit)) => {
permit.send(cmd);
Ok(())
}
Ok(Err(_)) => Err(TimedSendError::Closed(cmd)),
Err(_) => Err(TimedSendError::Timeout(cmd)),
}
}
impl MePool { impl MePool {
/// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default. /// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default.
pub async fn send_proxy_req( pub async fn send_proxy_req(
@ -102,18 +78,8 @@ impl MePool {
let mut hybrid_last_recovery_at: Option<Instant> = None; let mut hybrid_last_recovery_at: Option<Instant> = None;
let hybrid_wait_step = self.me_route_no_writer_wait.max(Duration::from_millis(50)); let hybrid_wait_step = self.me_route_no_writer_wait.max(Duration::from_millis(50));
let mut hybrid_wait_current = hybrid_wait_step; let mut hybrid_wait_current = hybrid_wait_step;
let hybrid_deadline = Instant::now() + self.me_route_hybrid_max_wait;
loop { loop {
if matches!(no_writer_mode, MeRouteNoWriterMode::HybridAsyncPersistent)
&& Instant::now() >= hybrid_deadline
{
self.stats.increment_me_no_writer_failfast_total();
return Err(ProxyError::Proxy(
"No ME writer available in hybrid wait window".into(),
));
}
let mut skip_writer_id: Option<u64> = None;
let current_meta = self let current_meta = self
.registry .registry
.get_meta(conn_id) .get_meta(conn_id)
@ -124,31 +90,13 @@ impl MePool {
match current.tx.try_send(WriterCommand::Data(current_payload.clone())) { match current.tx.try_send(WriterCommand::Data(current_payload.clone())) {
Ok(()) => return Ok(()), Ok(()) => return Ok(()),
Err(TrySendError::Full(cmd)) => { Err(TrySendError::Full(cmd)) => {
match send_writer_command_with_timeout( if current.tx.send(cmd).await.is_ok() {
&current.tx, return Ok(());
cmd, }
self.me_route_blocking_send_timeout,
)
.await
{
Ok(()) => return Ok(()),
Err(TimedSendError::Closed(_)) => {
warn!(writer_id = current.writer_id, "ME writer channel closed"); warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await; self.remove_writer_and_close_clients(current.writer_id).await;
continue; continue;
} }
Err(TimedSendError::Timeout(_)) => {
debug!(
conn_id,
writer_id = current.writer_id,
timeout_ms = self.me_route_blocking_send_timeout.as_millis()
as u64,
"ME writer send timed out for bound writer, trying reroute"
);
skip_writer_id = Some(current.writer_id);
}
}
}
Err(TrySendError::Closed(_)) => { Err(TrySendError::Closed(_)) => {
warn!(writer_id = current.writer_id, "ME writer channel closed"); warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await; self.remove_writer_and_close_clients(current.writer_id).await;
@ -252,9 +200,6 @@ impl MePool {
.candidate_indices_for_dc(&writers_snapshot, routed_dc, true) .candidate_indices_for_dc(&writers_snapshot, routed_dc, true)
.await; .await;
} }
if let Some(skip_writer_id) = skip_writer_id {
candidate_indices.retain(|idx| writers_snapshot[*idx].id != skip_writer_id);
}
if candidate_indices.is_empty() { if candidate_indices.is_empty() {
let pick_mode = self.writer_pick_mode(); let pick_mode = self.writer_pick_mode();
match no_writer_mode { match no_writer_mode {
@ -477,13 +422,7 @@ impl MePool {
self.stats.increment_me_writer_pick_blocking_fallback_total(); self.stats.increment_me_writer_pick_blocking_fallback_total();
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port()); let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
let (payload, meta) = build_routed_payload(effective_our_addr); let (payload, meta) = build_routed_payload(effective_our_addr);
match send_writer_command_with_timeout( match w.tx.send(WriterCommand::Data(payload.clone())).await {
&w.tx,
WriterCommand::Data(payload.clone()),
self.me_route_blocking_send_timeout,
)
.await
{
Ok(()) => { Ok(()) => {
self.stats self.stats
.increment_me_writer_pick_success_fallback_total(pick_mode); .increment_me_writer_pick_success_fallback_total(pick_mode);
@ -500,20 +439,11 @@ impl MePool {
} }
return Ok(()); return Ok(());
} }
Err(TimedSendError::Closed(_)) => { Err(_) => {
self.stats.increment_me_writer_pick_closed_total(pick_mode); self.stats.increment_me_writer_pick_closed_total(pick_mode);
warn!(writer_id = w.id, "ME writer channel closed (blocking)"); warn!(writer_id = w.id, "ME writer channel closed (blocking)");
self.remove_writer_and_close_clients(w.id).await; self.remove_writer_and_close_clients(w.id).await;
} }
Err(TimedSendError::Timeout(_)) => {
self.stats.increment_me_writer_pick_full_total(pick_mode);
debug!(
conn_id,
writer_id = w.id,
timeout_ms = self.me_route_blocking_send_timeout.as_millis() as u64,
"ME writer blocking fallback send timed out"
);
}
} }
} }
} }
@ -643,20 +573,14 @@ impl MePool {
let mut p = Vec::with_capacity(12); let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes());
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) { if w.tx
Ok(()) => {} .send(WriterCommand::DataAndFlush(Bytes::from(p)))
Err(TrySendError::Full(_)) => { .await
debug!( .is_err()
conn_id, {
writer_id = w.writer_id,
"ME close skipped: writer command channel is full"
);
}
Err(TrySendError::Closed(_)) => {
debug!("ME close write failed"); debug!("ME close write failed");
self.remove_writer_and_close_clients(w.writer_id).await; self.remove_writer_and_close_clients(w.writer_id).await;
} }
}
} else { } else {
debug!(conn_id, "ME close skipped (writer missing)"); debug!(conn_id, "ME close skipped (writer missing)");
} }
@ -672,12 +596,8 @@ impl MePool {
p.extend_from_slice(&conn_id.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes());
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) { match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
Ok(()) => {} Ok(()) => {}
Err(TrySendError::Full(_)) => { Err(TrySendError::Full(cmd)) => {
debug!( let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await;
conn_id,
writer_id = w.writer_id,
"ME close_conn skipped: writer command channel is full"
);
} }
Err(TrySendError::Closed(_)) => { Err(TrySendError::Closed(_)) => {
debug!(conn_id, "ME close_conn skipped: writer channel closed"); debug!(conn_id, "ME close_conn skipped: writer channel closed");