Compare commits

..

15 Commits
3.3.2 ... 3.3.4

Author SHA1 Message Date
Alexey
dd12997744 Merge pull request #338 from telemt/flow-api
API Zero + API Docs
2026-03-06 13:08:12 +03:00
Alexey
fc160913bf Update API.md 2026-03-06 13:07:31 +03:00
Alexey
92c22ef16d API Zero
Added new endpoints:
- GET /v1/system/info
- GET /v1/runtime/gates
- GET /v1/limits/effective
- GET /v1/security/posture

Added API runtime state without impacting the hot path:
- config_reload_count
- last_config_reload_epoch_secs
- admission_open
- process_started_at_epoch_secs

Added background watcher tasks in api::serve:
- configuration reload tracking
- admission gate state tracking
2026-03-06 13:06:57 +03:00
Alexey
aff22d0855 Merge pull request #337 from telemt/readme
Update README.md
2026-03-06 12:47:06 +03:00
Alexey
b3d3bca15a Update README.md 2026-03-06 12:46:51 +03:00
Alexey
92f38392eb Merge pull request #336 from telemt/bump
Update Cargo.toml
2026-03-06 12:45:47 +03:00
Alexey
30ef8df1b3 Update Cargo.toml 2026-03-06 12:44:40 +03:00
Alexey
2e174adf16 Merge pull request #335 from telemt/flow-stunae
Update load.rs
2026-03-06 12:39:28 +03:00
Alexey
4e803b1412 Update load.rs 2026-03-06 12:08:43 +03:00
Alexey
9b174318ce Runtime Model: merge pull request #334 from telemt/docs
Runtime Model
2026-03-06 11:12:16 +03:00
Alexey
99edcbe818 Runtime Model 2026-03-06 11:11:44 +03:00
Alexey
ef7dc2b80f Merge pull request #332 from telemt/bump
Update Cargo.toml
2026-03-06 04:05:46 +03:00
Alexey
691607f269 Update Cargo.toml 2026-03-06 04:05:35 +03:00
Alexey
55561a23bc ME NoWait Routing + Upstream Connbudget + another fixes: merge pull request #331 from telemt/flow-hp
ME NoWait Routing + Upstream Connbudget + another fixes
2026-03-06 04:05:04 +03:00
Alexey
f32c34f126 ME NoWait Routing + Upstream Connbudget + PROXY Header t/o + allocation cuts 2026-03-06 03:58:08 +03:00
22 changed files with 1261 additions and 58 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.3.2"
version = "3.3.4"
edition = "2024"
[dependencies]

View File

@@ -3,7 +3,7 @@
***Löst Probleme, bevor andere überhaupt wissen, dass sie existieren*** / ***It solves problems before others even realize they exist***
**Telemt** is a fast, secure, and feature-rich server written in Rust: it fully implements the official Telegram proxy algo and adds many production-ready improvements such as:
- ME Pool + Reader/Writer + Registry + Refill + Adaptive Floor + Trio-State + Generation Lifecycle
- [ME Pool + Reader/Writer + Registry + Refill + Adaptive Floor + Trio-State + Generation Lifecycle](https://github.com/telemt/telemt/blob/main/docs/model/MODEL.en.md)
- [Full-covered API w/ management](https://github.com/telemt/telemt/blob/main/docs/API.md)
- Anti-Replay on Sliding Window
- Prometheus-format Metrics

View File

@@ -76,6 +76,10 @@ Notes:
| Method | Path | Body | Success | `data` contract |
| --- | --- | --- | --- | --- |
| `GET` | `/v1/health` | none | `200` | `HealthData` |
| `GET` | `/v1/system/info` | none | `200` | `SystemInfoData` |
| `GET` | `/v1/runtime/gates` | none | `200` | `RuntimeGatesData` |
| `GET` | `/v1/limits/effective` | none | `200` | `EffectiveLimitsData` |
| `GET` | `/v1/security/posture` | none | `200` | `SecurityPostureData` |
| `GET` | `/v1/stats/summary` | none | `200` | `SummaryData` |
| `GET` | `/v1/stats/zero/all` | none | `200` | `ZeroAllData` |
| `GET` | `/v1/stats/upstreams` | none | `200` | `UpstreamsData` |
@@ -176,6 +180,94 @@ Note: the request contract is defined, but the corresponding route currently ret
| `handshake_timeouts_total` | `u64` | Handshake timeout count. |
| `configured_users` | `usize` | Number of configured users in config. |
### `SystemInfoData`
| Field | Type | Description |
| --- | --- | --- |
| `version` | `string` | Binary version (`CARGO_PKG_VERSION`). |
| `target_arch` | `string` | Target architecture (`std::env::consts::ARCH`). |
| `target_os` | `string` | Target OS (`std::env::consts::OS`). |
| `build_profile` | `string` | Build profile (`PROFILE` env when available). |
| `git_commit` | `string?` | Optional commit hash from build env metadata. |
| `build_time_utc` | `string?` | Optional build timestamp from build env metadata. |
| `rustc_version` | `string?` | Optional compiler version from build env metadata. |
| `process_started_at_epoch_secs` | `u64` | Process start time as Unix epoch seconds. |
| `uptime_seconds` | `f64` | Process uptime in seconds. |
| `config_path` | `string` | Active config file path used by runtime. |
| `config_hash` | `string` | SHA-256 hash of current config content (same value as envelope `revision`). |
| `config_reload_count` | `u64` | Number of successfully observed config updates since process start. |
| `last_config_reload_epoch_secs` | `u64?` | Unix epoch seconds of the latest observed config reload; null/absent before first reload. |
### `RuntimeGatesData`
| Field | Type | Description |
| --- | --- | --- |
| `accepting_new_connections` | `bool` | Current admission-gate state for new listener accepts. |
| `conditional_cast_enabled` | `bool` | Whether conditional ME admission logic is enabled (`general.use_middle_proxy`). |
| `me_runtime_ready` | `bool` | Current ME runtime readiness status used for conditional gate decisions. |
| `me2dc_fallback_enabled` | `bool` | Whether ME -> direct fallback is enabled. |
| `use_middle_proxy` | `bool` | Current transport mode preference. |
### `EffectiveLimitsData`
| Field | Type | Description |
| --- | --- | --- |
| `update_every_secs` | `u64` | Effective unified updater interval. |
| `me_reinit_every_secs` | `u64` | Effective ME periodic reinit interval. |
| `me_pool_force_close_secs` | `u64` | Effective stale-writer force-close timeout. |
| `timeouts` | `EffectiveTimeoutLimits` | Effective timeout policy snapshot. |
| `upstream` | `EffectiveUpstreamLimits` | Effective upstream connect/retry limits. |
| `middle_proxy` | `EffectiveMiddleProxyLimits` | Effective ME pool/floor/reconnect limits. |
| `user_ip_policy` | `EffectiveUserIpPolicyLimits` | Effective unique-IP policy mode/window. |
#### `EffectiveTimeoutLimits`
| Field | Type | Description |
| --- | --- | --- |
| `client_handshake_secs` | `u64` | Client handshake timeout. |
| `tg_connect_secs` | `u64` | Upstream Telegram connect timeout. |
| `client_keepalive_secs` | `u64` | Client keepalive interval. |
| `client_ack_secs` | `u64` | ACK timeout. |
| `me_one_retry` | `u8` | Fast retry count for single-endpoint ME DC. |
| `me_one_timeout_ms` | `u64` | Fast retry timeout per attempt for single-endpoint ME DC. |
#### `EffectiveUpstreamLimits`
| Field | Type | Description |
| --- | --- | --- |
| `connect_retry_attempts` | `u32` | Upstream connect retry attempts. |
| `connect_retry_backoff_ms` | `u64` | Upstream retry backoff delay. |
| `connect_budget_ms` | `u64` | Total connect wall-clock budget across retries. |
| `unhealthy_fail_threshold` | `u32` | Consecutive fail threshold for unhealthy marking. |
| `connect_failfast_hard_errors` | `bool` | Whether hard errors skip additional retries. |
#### `EffectiveMiddleProxyLimits`
| Field | Type | Description |
| --- | --- | --- |
| `floor_mode` | `string` | Effective floor mode (`static` or `adaptive`). |
| `adaptive_floor_idle_secs` | `u64` | Adaptive floor idle threshold. |
| `adaptive_floor_min_writers_single_endpoint` | `u8` | Adaptive floor minimum for single-endpoint DCs. |
| `adaptive_floor_recover_grace_secs` | `u64` | Adaptive floor recovery grace period. |
| `reconnect_max_concurrent_per_dc` | `u32` | Max concurrent reconnects per DC. |
| `reconnect_backoff_base_ms` | `u64` | Reconnect base backoff. |
| `reconnect_backoff_cap_ms` | `u64` | Reconnect backoff cap. |
| `reconnect_fast_retry_count` | `u32` | Number of fast retries before standard backoff strategy. |
| `me2dc_fallback` | `bool` | Effective ME -> direct fallback flag. |
#### `EffectiveUserIpPolicyLimits`
| Field | Type | Description |
| --- | --- | --- |
| `mode` | `string` | Unique-IP policy mode (`active_window`, `time_window`, `combined`). |
| `window_secs` | `u64` | Time window length used by unique-IP policy. |
### `SecurityPostureData`
| Field | Type | Description |
| --- | --- | --- |
| `api_read_only` | `bool` | Current API read-only state. |
| `api_whitelist_enabled` | `bool` | Whether whitelist filtering is active. |
| `api_whitelist_entries` | `usize` | Number of configured whitelist CIDRs. |
| `api_auth_header_enabled` | `bool` | Whether `Authorization` header validation is active. |
| `proxy_protocol_enabled` | `bool` | Global PROXY protocol accept setting. |
| `log_level` | `string` | Effective log level (`debug`, `verbose`, `normal`, `silent`). |
| `telemetry_core_enabled` | `bool` | Core telemetry toggle. |
| `telemetry_user_enabled` | `bool` | Per-user telemetry toggle. |
| `telemetry_me_level` | `string` | ME telemetry level (`silent`, `normal`, `debug`). |
### `ZeroAllData`
| Field | Type | Description |
| --- | --- | --- |

285
docs/model/MODEL.en.md Normal file
View File

@@ -0,0 +1,285 @@
# Telemt Runtime Model
## Scope
This document defines runtime concepts used by the Middle-End (ME) transport pipeline and the orchestration logic around it.
It focuses on:
- `ME Pool / Reader / Writer / Refill / Registry`
- `Adaptive Floor`
- `Trio-State`
- `Generation Lifecycle`
## Core Entities
### ME Pool
`ME Pool` is the runtime orchestrator for all Middle-End writers.
Responsibilities:
- Holds writer inventory by DC/family/endpoint.
- Maintains routing primitives and writer selection policy.
- Tracks generation state (`active`, `warm`, `draining` context).
- Applies runtime policies (floor mode, refill, reconnect, reinit, fallback behavior).
- Exposes readiness gates used by admission logic (for conditional accept/cast behavior).
Non-goals:
- It does not own client protocol decoding.
- It does not own per-client business policy (quotas/limits).
### ME Writer
`ME Writer` is a long-lived ME RPC tunnel bound to one concrete ME endpoint (`ip:port`), with:
- Outbound command channel (send path).
- Associated reader loop (inbound path).
- Health/degraded flags.
- Contour/state and generation metadata.
A writer is the actual data plane carrier for client sessions once bound.
### ME Reader
`ME Reader` is the inbound parser/dispatcher for one writer:
- Reads/decrypts ME RPC frames.
- Validates sequence/checksum.
- Routes payloads to client-connection channels via `Registry`.
- Emits close/ack/data events and updates telemetry.
Design intent:
- Reader must stay non-blocking as much as possible.
- Backpressure on a single client route must not stall the whole writer stream.
### Refill
`Refill` is the recovery mechanism that restores writer coverage when capacity drops:
- Per-endpoint restore (same endpoint first).
- Per-DC restore to satisfy required floor.
- Optional outage-mode/shadow behavior for fragile single-endpoint DCs.
Refill works asynchronously and should not block hot routing paths.
### Registry
`Registry` is the routing index between ME and client sessions:
- `conn_id -> client response channel`
- `conn_id <-> writer_id` binding map
- writer activity snapshots and idle tracking
Main invariants:
- A `conn_id` routes to at most one active response channel.
- Writer loss triggers safe unbind/cleanup and close propagation.
- Registry state is the source of truth for active ME-bound session mapping.
## Adaptive Floor
### What it is
`Adaptive Floor` is a runtime policy that changes target writer count per DC based on observed activity, instead of always holding static peak floor.
### Why it exists
Goals:
- Reduce idle writer churn under low traffic.
- Keep enough warm capacity to avoid client-visible stalls on burst recovery.
- Limit needless reconnect storms on unstable endpoints.
### Behavioral model
- Under activity: floor converges toward configured static requirement.
- Under prolonged idle: floor can shrink to a safe minimum.
- Recovery/grace windows prevent aggressive oscillation.
### Safety constraints
- Never violate minimal survivability floor for a DC group.
- Refill must still restore quickly on demand.
- Floor adaptation must not force-drop already bound healthy sessions.
## Trio-State
`Trio-State` is writer contouring:
- `Warm`
- `Active`
- `Draining`
### State semantics
- `Warm`: connected and validated, not primary for new binds.
- `Active`: preferred for new binds and normal traffic.
- `Draining`: no new regular binds; existing sessions continue until graceful retirement rules apply.
### Transition intent
- `Warm -> Active`: when coverage/readiness conditions are satisfied.
- `Active -> Draining`: on generation swap, endpoint replacement, or controlled retirement.
- `Draining -> removed`: after drain TTL/force-close policy (or when naturally empty).
This separation reduces SPOF and keeps cutovers predictable.
## Generation Lifecycle
Generation isolates pool epochs during reinit/reconfiguration.
### Lifecycle phases
1. `Bootstrap`: initial writers are established.
2. `Warmup`: next generation writers are created and validated.
3. `Activation`: generation promoted to active when coverage gate passes.
4. `Drain`: previous generation becomes draining, existing sessions are allowed to finish.
5. `Retire`: old generation writers are removed after graceful rules.
### Operational guarantees
- No partial generation activation without minimum coverage.
- Existing healthy client sessions should not be dropped just because a new generation appears.
- Draining generation exists to absorb in-flight traffic during swap.
### Readiness and admission
Pool readiness is not equivalent to “all endpoints fully saturated”.
Typical gating strategy:
- Open admission when per-DC minimal alive coverage exists.
- Continue background saturation for multi-endpoint DCs.
This keeps startup latency low while preserving eventual full capacity.
## Interactions Between Concepts
- `Generation` defines pool epochs.
- `Trio-State` defines per-writer role inside/around those epochs.
- `Adaptive Floor` defines how much capacity should be maintained right now.
- `Refill` is the actuator that closes the gap between desired and current capacity.
- `Registry` keeps per-session routing correctness while all of the above changes over time.
## Architectural Approach
### Layered Design
The runtime is intentionally split into two planes:
- `Control Plane`: decides desired topology and policy (`floor`, `generation swap`, `refill`, `fallback`).
- `Data Plane`: executes packet/session transport (`reader`, `writer`, routing, acks, close propagation).
Architectural rule:
- Control Plane may change writer inventory and policy.
- Data Plane must remain stable and low-latency while those changes happen.
### Ownership Model
Ownership is centered around explicit state domains:
- `MePool` owns writer lifecycle and policy state.
- `Registry` owns per-connection routing bindings.
- `Writer task` owns outbound ME socket send progression.
- `Reader task` owns inbound ME socket parsing and event dispatch.
This prevents accidental cross-layer mutation and keeps invariants local.
### Control Plane Responsibilities
Control Plane is event-driven and policy-driven:
- Startup initialization and readiness gates.
- Runtime reinit (periodic or config-triggered).
- Coverage checks per DC/family/endpoint group.
- Floor enforcement (static/adaptive).
- Refill scheduling and retry orchestration.
- Generation transition (`warm -> active`, previous `active -> draining`).
Control Plane must prioritize determinism over short-term aggressiveness.
### Data Plane Responsibilities
Data Plane is throughput-first and allocation-sensitive:
- Session bind to writer.
- Per-frame parsing/validation and dispatch.
- Ack and close signal propagation.
- Route drop behavior under missing connection or closed channel.
- Minimal critical logging in hot path.
Data Plane should avoid waiting on operations that are not strictly required for frame correctness.
## Concurrency and Synchronization
### Concurrency Principles
- Per-writer isolation: each writer has independent send/read task loops.
- Per-connection isolation: client channel state is scoped by `conn_id`.
- Asynchronous recovery: refill/reconnect runs outside the packet hot path.
### Synchronization Strategy
- Shared maps use fine-grained, short-lived locking.
- Read-mostly paths avoid broad write-lock windows.
- Backpressure decisions are localized at route/channel boundary.
Design target:
- A slow consumer should degrade only itself (or its route), not global writer progress.
### Cancellation and Shutdown
Writer and reader loops are cancellation-aware:
- explicit cancel token / close command support;
- safe unbind and cleanup via registry;
- deterministic order: stop admission -> drain/close -> release resources.
## Consistency Model
### Session Consistency
For one `conn_id`:
- exactly one active route target at a time;
- close and unbind must be idempotent;
- writer loss must not leave dangling bindings.
### Generation Consistency
Generational consistency guarantees:
- New generation is not promoted before minimum coverage gate.
- Previous generation remains available in `draining` state during handover.
- Forced retirement is policy-bound (`drain ttl`, optional force-close), not immediate.
### Policy Consistency
Policy changes (`adaptive/static floor`, fallback mode, retries) should apply without violating established active-session routing invariants.
## Backpressure and Flow Control
### Route-Level Backpressure
Route channels are bounded by design.
When pressure increases:
- short burst absorption is allowed;
- prolonged congestion triggers controlled drop semantics;
- drop accounting is explicit via metrics/counters.
### Reader Non-Blocking Priority
Inbound ME reader path should never be serialized behind one congested client route.
Practical implication:
- prefer non-blocking route attempt in the parser loop;
- move heavy recovery to async side paths.
## Failure Domain Strategy
### Endpoint-Level Failure
Failure of one endpoint should trigger endpoint-scoped recovery first:
- same endpoint reconnect;
- endpoint replacement within same DC group if applicable.
### DC-Level Degradation
If a DC group cannot satisfy floor:
- keep service via remaining coverage if policy allows;
- continue asynchronous refill saturation in background.
### Whole-Pool Readiness Loss
If no sufficient ME coverage exists:
- admission gate can hold new accepts (conditional policy);
- existing sessions should continue when their path remains healthy.
## Performance Architecture Notes
### Hotpath Discipline
Allowed in hotpath:
- fixed-size parsing and cheap validation;
- bounded channel operations;
- precomputed or low-allocation access patterns.
Avoid in hotpath:
- repeated expensive decoding;
- broad locks with awaits inside critical sections;
- verbose high-frequency logging.
### Throughput Stability Over Peak Spikes
Architecture prefers stable throughput and predictable latency over short peak gains that increase churn or long-tail reconnect times.
## Evolution and Extension Rules
To evolve this model safely:
- Add new policy knobs in Control Plane first.
- Keep Data Plane contracts stable (`conn_id`, route semantics, close semantics).
- Validate generation and registry invariants before enabling by default.
- Introduce new retry/recovery strategies behind explicit config.
## Failure and Recovery Notes
- Single-endpoint DC failure is a normal degraded mode case; policy should prioritize fast reconnect and optional shadow/probing strategies.
- Idle close by peer should be treated as expected when upstream enforces idle timeout.
- Reconnect backoff must protect against synchronized churn while still allowing fast first retries.
- Fallback (`ME -> direct DC`) is a policy switch, not a transport bug by itself.
## Terminology Summary
- `Coverage`: enough live writers to satisfy per-DC acceptance policy.
- `Floor`: target minimum writer count policy.
- `Churn`: frequent writer reconnect/remove cycles.
- `Hotpath`: per-packet/per-connection data path where extra waits/allocations are expensive.

285
docs/model/MODEL.ru.md Normal file
View File

@@ -0,0 +1,285 @@
# Runtime-модель Telemt
## Область описания
Документ фиксирует ключевые runtime-понятия пайплайна Middle-End (ME) и оркестрации вокруг него.
Фокус:
- `ME Pool / Reader / Writer / Refill / Registry`
- `Adaptive Floor`
- `Trio-State`
- `Generation Lifecycle`
## Базовые сущности
### ME Pool
`ME Pool` — центральный оркестратор всех Middle-End writer-ов.
Зона ответственности:
- хранит инвентарь writer-ов по DC/family/endpoint;
- управляет выбором writer-а и маршрутизацией;
- ведёт состояние поколений (`active`, `warm`, `draining` контекст);
- применяет runtime-политики (floor, refill, reconnect, reinit, fallback);
- отдаёт сигналы готовности для admission-логики (conditional accept/cast).
Что не делает:
- не декодирует клиентский протокол;
- не реализует бизнес-политику пользователя (квоты/лимиты).
### ME Writer
`ME Writer` — долгоживущий ME RPC-канал к конкретному endpoint (`ip:port`), у которого есть:
- канал команд на отправку;
- связанный reader loop для входящего потока;
- флаги состояния/деградации;
- метаданные contour/state и generation.
Writer — это фактический data-plane носитель клиентских сессий после бинда.
### ME Reader
`ME Reader` — входной parser/dispatcher одного writer-а:
- читает и расшифровывает ME RPC-фреймы;
- проверяет sequence/checksum;
- маршрутизирует payload в client-каналы через `Registry`;
- обрабатывает close/ack/data и обновляет телеметрию.
Инженерный принцип:
- Reader должен оставаться неблокирующим.
- Backpressure одной клиентской сессии не должен останавливать весь поток writer-а.
### Refill
`Refill` — механизм восстановления покрытия writer-ов при просадке:
- восстановление на том же endpoint в первую очередь;
- восстановление по DC до требуемого floor;
- опциональные outage/shadow-режимы для хрупких single-endpoint DC.
Refill работает асинхронно и не должен блокировать hotpath.
### Registry
`Registry` — маршрутизационный индекс между ME и клиентскими сессиями:
- `conn_id -> канал ответа клиенту`;
- map биндов `conn_id <-> writer_id`;
- снимки активности writer-ов и idle-трекинг.
Ключевые инварианты:
- один `conn_id` маршрутизируется максимум в один активный канал ответа;
- потеря writer-а приводит к безопасному unbind/cleanup и отправке close;
- именно `Registry` является источником истины по активным ME-биндам.
## Adaptive Floor
### Что это
`Adaptive Floor` — runtime-политика, которая динамически меняет целевое число writer-ов на DC в зависимости от активности, а не держит всегда фиксированный статический floor.
### Зачем
Цели:
- уменьшить churn на idle-трафике;
- сохранить достаточную прогретую ёмкость для быстрых всплесков;
- снизить лишние reconnect-штормы на нестабильных endpoint.
### Модель поведения
- при активности floor стремится к статическому требованию;
- при длительном idle floor может снижаться до безопасного минимума;
- grace/recovery окна не дают системе "флапать" слишком резко.
### Ограничения безопасности
- нельзя нарушать минимальный floor выживаемости DC-группы;
- refill обязан быстро нарастить покрытие по запросу;
- адаптация не должна принудительно ронять уже привязанные healthy-сессии.
## Trio-State
`Trio-State` — контурная роль writer-а:
- `Warm`
- `Active`
- `Draining`
### Семантика состояний
- `Warm`: writer подключён и валиден, но не основной для новых биндов.
- `Active`: приоритетный для новых биндов и обычного трафика.
- `Draining`: новые обычные бинды не назначаются; текущие сессии живут до правил graceful-вывода.
### Логика переходов
- `Warm -> Active`: когда достигнуты условия покрытия/готовности.
- `Active -> Draining`: при swap поколения, замене endpoint или контролируемом выводе.
- `Draining -> removed`: после drain TTL/force-close политики (или естественного опустошения).
Такое разделение снижает SPOF-риски и делает cutover предсказуемым.
## Generation Lifecycle
Generation изолирует эпохи пула при reinit/reconfiguration.
### Фазы жизненного цикла
1. `Bootstrap`: поднимается начальный набор writer-ов.
2. `Warmup`: создаётся и валидируется новое поколение.
3. `Activation`: новое поколение становится active после прохождения coverage-gate.
4. `Drain`: предыдущее поколение переводится в draining, текущим сессиям дают завершиться.
5. `Retire`: старое поколение удаляется по graceful-правилам.
### Операционные гарантии
- нельзя активировать поколение частично без минимального покрытия;
- healthy-клиенты не должны теряться только из-за появления нового поколения;
- draining-поколение служит буфером для in-flight трафика во время swap.
### Готовность и приём клиентов
Готовность пула не равна "все endpoint полностью насыщены".
Типичная стратегия:
- открыть admission при минимально достаточном alive-покрытии по DC;
- параллельно продолжать saturation для multi-endpoint DC.
Это уменьшает startup latency и сохраняет выход на полную ёмкость.
## Как понятия связаны между собой
- `Generation` задаёт эпохи пула.
- `Trio-State` задаёт роль каждого writer-а внутри/между эпохами.
- `Adaptive Floor` задаёт, сколько ёмкости нужно сейчас.
- `Refill` — исполнитель, который закрывает разницу между desired и current capacity.
- `Registry` гарантирует корректную маршрутизацию сессий, пока всё выше меняется.
## Архитектурный подход
### Слоистая модель
Runtime специально разделён на две плоскости:
- `Control Plane`: принимает решения о целевой топологии и политиках (`floor`, `generation swap`, `refill`, `fallback`).
- `Data Plane`: исполняет транспорт сессий и пакетов (`reader`, `writer`, маршрутизация, ack, close).
Ключевое правило:
- Control Plane может менять состав writer-ов и policy.
- Data Plane должен оставаться стабильным и низколатентным в момент этих изменений.
### Модель владения состоянием
Владение разделено по доменам:
- `MePool` владеет жизненным циклом writer-ов и policy-state.
- `Registry` владеет routing-биндами клиентских сессий.
- `Writer task` владеет исходящей прогрессией ME-сокета.
- `Reader task` владеет входящим парсингом и dispatch-событиями.
Это ограничивает побочные мутации и локализует инварианты.
### Обязанности Control Plane
Control Plane работает событийно и policy-ориентированно:
- стартовая инициализация и readiness-gate;
- runtime reinit (периодический и/или по изменению конфигурации);
- проверки покрытия по DC/family/endpoint group;
- применение floor-политики (static/adaptive);
- планирование refill и orchestration retry;
- переходы поколений (`warm -> active`, прежний `active -> draining`).
Для него важнее детерминизм, чем агрессивная краткосрочная реакция.
### Обязанности Data Plane
Data Plane ориентирован на пропускную способность и предсказуемую задержку:
- bind клиентской сессии к writer-у;
- per-frame parsing/validation/dispatch;
- распространение ack/close;
- корректная реакция на missing conn/closed channel;
- минимальный лог-шум в hotpath.
Data Plane не должен ждать операций, не критичных для корректности текущего фрейма.
## Конкурентность и синхронизация
### Принципы конкурентности
- Изоляция по writer-у: у каждого writer-а независимые send/read loop.
- Изоляция по сессии: состояние канала локально для `conn_id`.
- Асинхронное восстановление: refill/reconnect выполняются вне пакетного hotpath.
### Стратегия синхронизации
- Для shared map используются короткие и узкие lock-секции.
- Read-heavy пути избегают длительных write-lock окон.
- Решения по backpressure локализованы на границе route/channel.
Цель:
- медленный consumer должен деградировать локально, не останавливая глобальный прогресс writer-а.
### Cancellation и shutdown
Reader/Writer loop должны быть cancellation-aware:
- явные cancel token / close command;
- безопасный unbind/cleanup через registry;
- детерминированный порядок: stop admission -> drain/close -> release resources.
## Модель согласованности
### Согласованность сессии
Для одного `conn_id`:
- одновременно ровно один активный route-target;
- close/unbind операции идемпотентны;
- потеря writer-а не оставляет dangling-бинды.
### Согласованность поколения
Гарантии generation:
- новое поколение не активируется до прохождения минимального coverage-gate;
- предыдущее поколение остаётся в `draining` на время handover;
- принудительный вывод writer-ов ограничен policy (`drain ttl`, optional force-close), а не мгновенный.
### Согласованность политик
Изменение policy (`adaptive/static floor`, fallback mode, retries) не должно ломать инварианты маршрутизации уже активных сессий.
## Backpressure и управление потоком
### Route-level backpressure
Route-каналы намеренно bounded.
При росте нагрузки:
- кратковременный burst поглощается;
- длительная перегрузка переходит в контролируемую drop-семантику;
- все drop-сценарии должны быть прозрачно видны в метриках.
### Приоритет неблокирующего Reader
Входящий ME-reader path не должен сериализоваться из-за одной перегруженной клиентской сессии.
Практически это означает:
- использовать неблокирующую попытку route в parser loop;
- выносить тяжёлое восстановление в асинхронные side-path.
## Стратегия доменов отказа
### Отказ отдельного endpoint
Сначала применяется endpoint-local recovery:
- reconnect в тот же endpoint;
- затем замена endpoint внутри той же DC-группы (если доступно).
### Деградация уровня DC
Если DC-группа не набирает floor:
- сервис сохраняется на остаточном покрытии (если policy разрешает);
- saturation refill продолжается асинхронно в фоне.
### Потеря готовности всего пула
Если достаточного ME-покрытия нет:
- admission gate может временно закрыть приём новых подключений (conditional policy);
- уже активные сессии продолжают работать, пока их маршрут остаётся healthy.
## Архитектурные заметки по производительности
### Дисциплина hotpath
Допустимо в hotpath:
- фиксированный и дешёвый parsing/validation;
- bounded channel operations;
- precomputed/low-allocation доступ к данным.
Нежелательно в hotpath:
- повторные дорогие decode;
- широкие lock-секции с `await` внутри;
- высокочастотный подробный logging.
### Стабильность важнее пиков
Архитектура приоритетно выбирает стабильную пропускную способность и предсказуемую latency, а не краткосрочные пики ценой churn и long-tail reconnect.
## Правила эволюции модели
Чтобы расширять модель безопасно:
- новые policy knobs сначала внедрять в Control Plane;
- контракты Data Plane (`conn_id`, route/close семантика) держать стабильными;
- перед дефолтным включением проверять generation/registry инварианты;
- новые recovery/retry стратегии вводить через явный config-флаг.
## Нюансы отказов и восстановления
- падение single-endpoint DC — штатный деградированный сценарий; приоритет: быстрый reconnect и, при необходимости, shadow/probing;
- idle-close со стороны peer должен считаться нормальным событием при upstream idle-timeout;
- backoff reconnect-логики должен ограничивать синхронный churn, но сохранять быстрые первые попытки;
- fallback (`ME -> direct DC`) — это переключаемая policy-ветка, а не автоматический признак бага транспорта.
## Краткий словарь
- `Coverage`: достаточное число живых writer-ов для политики приёма по DC.
- `Floor`: целевая минимальная ёмкость writer-ов.
- `Churn`: частые циклы reconnect/remove writer-ов.
- `Hotpath`: пер-пакетный/пер-коннектный путь, где любые лишние ожидания и аллокации особенно дороги.

View File

@@ -2,7 +2,8 @@ use std::convert::Infallible;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use http_body_util::{BodyExt, Full};
use hyper::body::{Bytes, Incoming};
@@ -25,6 +26,7 @@ use crate::transport::UpstreamManager;
mod config_store;
mod model;
mod runtime_stats;
mod runtime_zero;
mod users;
use config_store::{current_revision, parse_if_match};
@@ -36,8 +38,19 @@ use runtime_stats::{
MinimalCacheEntry, build_dcs_data, build_me_writers_data, build_minimal_all_data,
build_upstreams_data, build_zero_all_data,
};
use runtime_zero::{
build_limits_effective_data, build_runtime_gates_data, build_security_posture_data,
build_system_info_data,
};
use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config};
pub(super) struct ApiRuntimeState {
pub(super) process_started_at_epoch_secs: u64,
pub(super) config_reload_count: AtomicU64,
pub(super) last_config_reload_epoch_secs: AtomicU64,
pub(super) admission_open: AtomicBool,
}
#[derive(Clone)]
pub(super) struct ApiShared {
pub(super) stats: Arc<Stats>,
@@ -50,6 +63,7 @@ pub(super) struct ApiShared {
pub(super) mutation_lock: Arc<Mutex<()>>,
pub(super) minimal_cache: Arc<Mutex<Option<MinimalCacheEntry>>>,
pub(super) request_id: Arc<AtomicU64>,
pub(super) runtime_state: Arc<ApiRuntimeState>,
}
impl ApiShared {
@@ -65,9 +79,11 @@ pub async fn serve(
me_pool: Option<Arc<MePool>>,
upstream_manager: Arc<UpstreamManager>,
config_rx: watch::Receiver<Arc<ProxyConfig>>,
admission_rx: watch::Receiver<bool>,
config_path: PathBuf,
startup_detected_ip_v4: Option<IpAddr>,
startup_detected_ip_v6: Option<IpAddr>,
process_started_at_epoch_secs: u64,
) {
let listener = match TcpListener::bind(listen).await {
Ok(listener) => listener,
@@ -83,6 +99,13 @@ pub async fn serve(
info!("API endpoint: http://{}/v1/*", listen);
let runtime_state = Arc::new(ApiRuntimeState {
process_started_at_epoch_secs,
config_reload_count: AtomicU64::new(0),
last_config_reload_epoch_secs: AtomicU64::new(0),
admission_open: AtomicBool::new(*admission_rx.borrow()),
});
let shared = Arc::new(ApiShared {
stats,
ip_tracker,
@@ -94,6 +117,38 @@ pub async fn serve(
mutation_lock: Arc::new(Mutex::new(())),
minimal_cache: Arc::new(Mutex::new(None)),
request_id: Arc::new(AtomicU64::new(1)),
runtime_state: runtime_state.clone(),
});
let mut config_rx_reload = config_rx.clone();
let runtime_state_reload = runtime_state.clone();
tokio::spawn(async move {
loop {
if config_rx_reload.changed().await.is_err() {
break;
}
runtime_state_reload
.config_reload_count
.fetch_add(1, Ordering::Relaxed);
runtime_state_reload
.last_config_reload_epoch_secs
.store(now_epoch_secs(), Ordering::Relaxed);
}
});
let mut admission_rx_watch = admission_rx.clone();
tokio::spawn(async move {
runtime_state
.admission_open
.store(*admission_rx_watch.borrow(), Ordering::Relaxed);
loop {
if admission_rx_watch.changed().await.is_err() {
break;
}
runtime_state
.admission_open
.store(*admission_rx_watch.borrow(), Ordering::Relaxed);
}
});
loop {
@@ -189,6 +244,26 @@ async fn handle(
};
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/system/info") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_system_info_data(shared.as_ref(), cfg.as_ref(), &revision);
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/runtime/gates") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_runtime_gates_data(shared.as_ref(), cfg.as_ref());
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/limits/effective") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_limits_effective_data(cfg.as_ref());
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/security/posture") => {
let revision = current_revision(&shared.config_path).await?;
let data = build_security_posture_data(cfg.as_ref());
Ok(success_response(StatusCode::OK, data, revision))
}
("GET", "/v1/stats/summary") => {
let revision = current_revision(&shared.config_path).await?;
let data = SummaryData {
@@ -441,3 +516,10 @@ async fn read_body_with_limit(body: Incoming, limit: usize) -> Result<Vec<u8>, A
}
Ok(collected)
}
fn now_epoch_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}

227
src/api/runtime_zero.rs Normal file
View File

@@ -0,0 +1,227 @@
use std::sync::atomic::Ordering;
use serde::Serialize;
use crate::config::{MeFloorMode, ProxyConfig, UserMaxUniqueIpsMode};
use super::ApiShared;
#[derive(Serialize)]
pub(super) struct SystemInfoData {
pub(super) version: String,
pub(super) target_arch: String,
pub(super) target_os: String,
pub(super) build_profile: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) git_commit: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) build_time_utc: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) rustc_version: Option<String>,
pub(super) process_started_at_epoch_secs: u64,
pub(super) uptime_seconds: f64,
pub(super) config_path: String,
pub(super) config_hash: String,
pub(super) config_reload_count: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) last_config_reload_epoch_secs: Option<u64>,
}
#[derive(Serialize)]
pub(super) struct RuntimeGatesData {
pub(super) accepting_new_connections: bool,
pub(super) conditional_cast_enabled: bool,
pub(super) me_runtime_ready: bool,
pub(super) me2dc_fallback_enabled: bool,
pub(super) use_middle_proxy: bool,
}
#[derive(Serialize)]
pub(super) struct EffectiveTimeoutLimits {
pub(super) client_handshake_secs: u64,
pub(super) tg_connect_secs: u64,
pub(super) client_keepalive_secs: u64,
pub(super) client_ack_secs: u64,
pub(super) me_one_retry: u8,
pub(super) me_one_timeout_ms: u64,
}
#[derive(Serialize)]
pub(super) struct EffectiveUpstreamLimits {
pub(super) connect_retry_attempts: u32,
pub(super) connect_retry_backoff_ms: u64,
pub(super) connect_budget_ms: u64,
pub(super) unhealthy_fail_threshold: u32,
pub(super) connect_failfast_hard_errors: bool,
}
#[derive(Serialize)]
pub(super) struct EffectiveMiddleProxyLimits {
pub(super) floor_mode: &'static str,
pub(super) adaptive_floor_idle_secs: u64,
pub(super) adaptive_floor_min_writers_single_endpoint: u8,
pub(super) adaptive_floor_recover_grace_secs: u64,
pub(super) reconnect_max_concurrent_per_dc: u32,
pub(super) reconnect_backoff_base_ms: u64,
pub(super) reconnect_backoff_cap_ms: u64,
pub(super) reconnect_fast_retry_count: u32,
pub(super) me2dc_fallback: bool,
}
#[derive(Serialize)]
pub(super) struct EffectiveUserIpPolicyLimits {
pub(super) mode: &'static str,
pub(super) window_secs: u64,
}
#[derive(Serialize)]
pub(super) struct EffectiveLimitsData {
pub(super) update_every_secs: u64,
pub(super) me_reinit_every_secs: u64,
pub(super) me_pool_force_close_secs: u64,
pub(super) timeouts: EffectiveTimeoutLimits,
pub(super) upstream: EffectiveUpstreamLimits,
pub(super) middle_proxy: EffectiveMiddleProxyLimits,
pub(super) user_ip_policy: EffectiveUserIpPolicyLimits,
}
#[derive(Serialize)]
pub(super) struct SecurityPostureData {
pub(super) api_read_only: bool,
pub(super) api_whitelist_enabled: bool,
pub(super) api_whitelist_entries: usize,
pub(super) api_auth_header_enabled: bool,
pub(super) proxy_protocol_enabled: bool,
pub(super) log_level: String,
pub(super) telemetry_core_enabled: bool,
pub(super) telemetry_user_enabled: bool,
pub(super) telemetry_me_level: String,
}
pub(super) fn build_system_info_data(
shared: &ApiShared,
_cfg: &ProxyConfig,
revision: &str,
) -> SystemInfoData {
let last_reload_epoch_secs = shared
.runtime_state
.last_config_reload_epoch_secs
.load(Ordering::Relaxed);
let last_config_reload_epoch_secs = (last_reload_epoch_secs > 0).then_some(last_reload_epoch_secs);
let git_commit = option_env!("TELEMT_GIT_COMMIT")
.or(option_env!("VERGEN_GIT_SHA"))
.or(option_env!("GIT_COMMIT"))
.map(ToString::to_string);
let build_time_utc = option_env!("BUILD_TIME_UTC")
.or(option_env!("VERGEN_BUILD_TIMESTAMP"))
.map(ToString::to_string);
let rustc_version = option_env!("RUSTC_VERSION")
.or(option_env!("VERGEN_RUSTC_SEMVER"))
.map(ToString::to_string);
SystemInfoData {
version: env!("CARGO_PKG_VERSION").to_string(),
target_arch: std::env::consts::ARCH.to_string(),
target_os: std::env::consts::OS.to_string(),
build_profile: option_env!("PROFILE").unwrap_or("unknown").to_string(),
git_commit,
build_time_utc,
rustc_version,
process_started_at_epoch_secs: shared.runtime_state.process_started_at_epoch_secs,
uptime_seconds: shared.stats.uptime_secs(),
config_path: shared.config_path.display().to_string(),
config_hash: revision.to_string(),
config_reload_count: shared.runtime_state.config_reload_count.load(Ordering::Relaxed),
last_config_reload_epoch_secs,
}
}
pub(super) fn build_runtime_gates_data(shared: &ApiShared, cfg: &ProxyConfig) -> RuntimeGatesData {
let me_runtime_ready = if !cfg.general.use_middle_proxy {
true
} else {
shared
.me_pool
.as_ref()
.map(|pool| pool.is_runtime_ready())
.unwrap_or(false)
};
RuntimeGatesData {
accepting_new_connections: shared.runtime_state.admission_open.load(Ordering::Relaxed),
conditional_cast_enabled: cfg.general.use_middle_proxy,
me_runtime_ready,
me2dc_fallback_enabled: cfg.general.me2dc_fallback,
use_middle_proxy: cfg.general.use_middle_proxy,
}
}
pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsData {
EffectiveLimitsData {
update_every_secs: cfg.general.effective_update_every_secs(),
me_reinit_every_secs: cfg.general.effective_me_reinit_every_secs(),
me_pool_force_close_secs: cfg.general.effective_me_pool_force_close_secs(),
timeouts: EffectiveTimeoutLimits {
client_handshake_secs: cfg.timeouts.client_handshake,
tg_connect_secs: cfg.timeouts.tg_connect,
client_keepalive_secs: cfg.timeouts.client_keepalive,
client_ack_secs: cfg.timeouts.client_ack,
me_one_retry: cfg.timeouts.me_one_retry,
me_one_timeout_ms: cfg.timeouts.me_one_timeout_ms,
},
upstream: EffectiveUpstreamLimits {
connect_retry_attempts: cfg.general.upstream_connect_retry_attempts,
connect_retry_backoff_ms: cfg.general.upstream_connect_retry_backoff_ms,
connect_budget_ms: cfg.general.upstream_connect_budget_ms,
unhealthy_fail_threshold: cfg.general.upstream_unhealthy_fail_threshold,
connect_failfast_hard_errors: cfg.general.upstream_connect_failfast_hard_errors,
},
middle_proxy: EffectiveMiddleProxyLimits {
floor_mode: me_floor_mode_label(cfg.general.me_floor_mode),
adaptive_floor_idle_secs: cfg.general.me_adaptive_floor_idle_secs,
adaptive_floor_min_writers_single_endpoint: cfg
.general
.me_adaptive_floor_min_writers_single_endpoint,
adaptive_floor_recover_grace_secs: cfg.general.me_adaptive_floor_recover_grace_secs,
reconnect_max_concurrent_per_dc: cfg.general.me_reconnect_max_concurrent_per_dc,
reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms,
reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms,
reconnect_fast_retry_count: cfg.general.me_reconnect_fast_retry_count,
me2dc_fallback: cfg.general.me2dc_fallback,
},
user_ip_policy: EffectiveUserIpPolicyLimits {
mode: user_max_unique_ips_mode_label(cfg.access.user_max_unique_ips_mode),
window_secs: cfg.access.user_max_unique_ips_window_secs,
},
}
}
pub(super) fn build_security_posture_data(cfg: &ProxyConfig) -> SecurityPostureData {
SecurityPostureData {
api_read_only: cfg.server.api.read_only,
api_whitelist_enabled: !cfg.server.api.whitelist.is_empty(),
api_whitelist_entries: cfg.server.api.whitelist.len(),
api_auth_header_enabled: !cfg.server.api.auth_header.is_empty(),
proxy_protocol_enabled: cfg.server.proxy_protocol,
log_level: cfg.general.log_level.to_string(),
telemetry_core_enabled: cfg.general.telemetry.core_enabled,
telemetry_user_enabled: cfg.general.telemetry.user_enabled,
telemetry_me_level: cfg.general.telemetry.me_level.to_string(),
}
}
fn user_max_unique_ips_mode_label(mode: UserMaxUniqueIpsMode) -> &'static str {
match mode {
UserMaxUniqueIpsMode::ActiveWindow => "active_window",
UserMaxUniqueIpsMode::TimeWindow => "time_window",
UserMaxUniqueIpsMode::Combined => "combined",
}
}
fn me_floor_mode_label(mode: MeFloorMode) -> &'static str {
match mode {
MeFloorMode::Static => "static",
MeFloorMode::Adaptive => "adaptive",
}
}

View File

@@ -15,6 +15,7 @@ const DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS: u64 = 180;
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
const DEFAULT_UPSTREAM_CONNECT_BUDGET_MS: u64 = 3000;
const DEFAULT_LISTEN_ADDR_IPV6: &str = "::";
const DEFAULT_ACCESS_USER: &str = "default";
const DEFAULT_ACCESS_SECRET: &str = "00000000000000000000000000000000";
@@ -113,6 +114,10 @@ pub(crate) fn default_api_minimal_runtime_cache_ttl_ms() -> u64 {
1000
}
pub(crate) fn default_proxy_protocol_header_timeout_ms() -> u64 {
500
}
pub(crate) fn default_prefer_4() -> u8 {
4
}
@@ -253,6 +258,10 @@ pub(crate) fn default_upstream_unhealthy_fail_threshold() -> u32 {
DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD
}
pub(crate) fn default_upstream_connect_budget_ms() -> u64 {
DEFAULT_UPSTREAM_CONNECT_BUDGET_MS
}
pub(crate) fn default_upstream_connect_failfast_hard_errors() -> bool {
false
}

View File

@@ -265,6 +265,12 @@ impl ProxyConfig {
));
}
if config.general.upstream_connect_budget_ms == 0 {
return Err(ProxyError::Config(
"general.upstream_connect_budget_ms must be > 0".to_string(),
));
}
if config.general.upstream_unhealthy_fail_threshold == 0 {
return Err(ProxyError::Config(
"general.upstream_unhealthy_fail_threshold must be > 0".to_string(),
@@ -462,6 +468,12 @@ impl ProxyConfig {
));
}
if config.server.proxy_protocol_header_timeout_ms == 0 {
return Err(ProxyError::Config(
"server.proxy_protocol_header_timeout_ms must be > 0".to_string(),
));
}
if config.general.effective_me_pool_force_close_secs() > 0
&& config.general.effective_me_pool_force_close_secs()
< config.general.me_pool_drain_ttl_secs
@@ -543,11 +555,6 @@ impl ProxyConfig {
warn!("prefer_ipv6 is deprecated, use [network].prefer = 6");
}
// Auto-enable NAT probe when Middle Proxy is requested.
if config.general.use_middle_proxy && !config.general.middle_proxy_nat_probe {
config.general.middle_proxy_nat_probe = true;
warn!("Auto-enabled middle_proxy_nat_probe for middle proxy mode");
}
if config.general.use_middle_proxy && !config.general.me_secret_atomic_snapshot {
config.general.me_secret_atomic_snapshot = true;
warn!(

View File

@@ -532,6 +532,10 @@ pub struct GeneralConfig {
#[serde(default = "default_upstream_connect_retry_backoff_ms")]
pub upstream_connect_retry_backoff_ms: u64,
/// Total wall-clock budget in milliseconds for one upstream connect request across retries.
#[serde(default = "default_upstream_connect_budget_ms")]
pub upstream_connect_budget_ms: u64,
/// Consecutive failed requests before upstream is marked unhealthy.
#[serde(default = "default_upstream_unhealthy_fail_threshold")]
pub upstream_unhealthy_fail_threshold: u32,
@@ -774,6 +778,7 @@ impl Default for GeneralConfig {
me_adaptive_floor_recover_grace_secs: default_me_adaptive_floor_recover_grace_secs(),
upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(),
upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(),
upstream_connect_budget_ms: default_upstream_connect_budget_ms(),
upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(),
upstream_connect_failfast_hard_errors: default_upstream_connect_failfast_hard_errors(),
stun_iface_mismatch_ignore: false,
@@ -962,6 +967,10 @@ pub struct ServerConfig {
#[serde(default)]
pub proxy_protocol: bool,
/// Timeout in milliseconds for reading and parsing PROXY protocol headers.
#[serde(default = "default_proxy_protocol_header_timeout_ms")]
pub proxy_protocol_header_timeout_ms: u64,
#[serde(default)]
pub metrics_port: Option<u16>,
@@ -985,6 +994,7 @@ impl Default for ServerConfig {
listen_unix_sock_perm: None,
listen_tcp: None,
proxy_protocol: false,
proxy_protocol_header_timeout_ms: default_proxy_protocol_header_timeout_ms(),
metrics_port: None,
metrics_whitelist: default_metrics_whitelist(),
api: ApiConfig::default(),

View File

@@ -21,6 +21,7 @@ struct SecureRandomInner {
rng: StdRng,
cipher: AesCtr,
buffer: Vec<u8>,
buffer_start: usize,
}
impl Drop for SecureRandomInner {
@@ -48,6 +49,7 @@ impl SecureRandom {
rng,
cipher,
buffer: Vec::with_capacity(1024),
buffer_start: 0,
}),
}
}
@@ -59,16 +61,29 @@ impl SecureRandom {
let mut written = 0usize;
while written < out.len() {
if inner.buffer_start >= inner.buffer.len() {
inner.buffer.clear();
inner.buffer_start = 0;
}
if inner.buffer.is_empty() {
let mut chunk = vec![0u8; CHUNK_SIZE];
inner.rng.fill_bytes(&mut chunk);
inner.cipher.apply(&mut chunk);
inner.buffer.extend_from_slice(&chunk);
inner.buffer_start = 0;
}
let take = (out.len() - written).min(inner.buffer.len());
out[written..written + take].copy_from_slice(&inner.buffer[..take]);
inner.buffer.drain(..take);
let available = inner.buffer.len().saturating_sub(inner.buffer_start);
let take = (out.len() - written).min(available);
let start = inner.buffer_start;
let end = start + take;
out[written..written + take].copy_from_slice(&inner.buffer[start..end]);
inner.buffer_start = end;
if inner.buffer_start >= inner.buffer.len() {
inner.buffer.clear();
inner.buffer_start = 0;
}
written += take;
}
}

View File

@@ -4,7 +4,7 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use rand::Rng;
use tokio::net::TcpListener;
use tokio::signal;
@@ -369,6 +369,10 @@ async fn load_startup_proxy_config_snapshot(
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let process_started_at = Instant::now();
let process_started_at_epoch_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let (config_path, cli_silent, cli_log_level) = parse_cli();
let mut config = match ProxyConfig::load(&config_path) {
@@ -464,6 +468,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
config.upstreams.clone(),
config.general.upstream_connect_retry_attempts,
config.general.upstream_connect_retry_backoff_ms,
config.general.upstream_connect_budget_ms,
config.general.upstream_unhealthy_fail_threshold,
config.general.upstream_connect_failfast_hard_errors,
stats.clone(),
@@ -1555,6 +1560,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let me_pool_api = me_pool.clone();
let upstream_manager_api = upstream_manager.clone();
let config_rx_api = config_rx.clone();
let admission_rx_api = admission_rx.clone();
let config_path_api = std::path::PathBuf::from(&config_path);
let startup_detected_ip_v4 = detected_ip_v4;
let startup_detected_ip_v6 = detected_ip_v6;
@@ -1566,9 +1572,11 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
me_pool_api,
upstream_manager_api,
config_rx_api,
admission_rx_api,
config_path_api,
startup_detected_ip_v4,
startup_detected_ip_v6,
process_started_at_epoch_secs,
)
.await;
});

View File

@@ -97,8 +97,11 @@ where
.unwrap_or_else(|_| "0.0.0.0:443".parse().unwrap());
if proxy_protocol_enabled {
match parse_proxy_protocol(&mut stream, peer).await {
Ok(info) => {
let proxy_header_timeout = Duration::from_millis(
config.server.proxy_protocol_header_timeout_ms.max(1),
);
match timeout(proxy_header_timeout, parse_proxy_protocol(&mut stream, peer)).await {
Ok(Ok(info)) => {
debug!(
peer = %peer,
client = %info.src_addr,
@@ -110,12 +113,18 @@ where
local_addr = dst;
}
}
Err(e) => {
Ok(Err(e)) => {
stats.increment_connects_bad();
warn!(peer = %peer, error = %e, "Invalid PROXY protocol header");
record_beobachten_class(&beobachten, &config, peer.ip(), "other");
return Err(e);
}
Err(_) => {
stats.increment_connects_bad();
warn!(peer = %peer, timeout_ms = proxy_header_timeout.as_millis(), "PROXY protocol header timeout");
record_beobachten_class(&beobachten, &config, peer.ip(), "other");
return Err(ProxyError::InvalidProxyProtocol);
}
}
}
@@ -161,7 +170,7 @@ where
let (read_half, write_half) = tokio::io::split(stream);
let (mut tls_reader, tls_writer, _tls_user) = match handle_tls_handshake(
let (mut tls_reader, tls_writer, tls_user) = match handle_tls_handshake(
&handshake, read_half, write_half, real_peer,
&config, &replay_checker, &rng, tls_cache.clone(),
).await {
@@ -190,7 +199,7 @@ where
let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake(
&mtproto_handshake, tls_reader, tls_writer, real_peer,
&config, &replay_checker, true,
&config, &replay_checker, true, Some(tls_user.as_str()),
).await {
HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader: _, writer: _ } => {
@@ -234,7 +243,7 @@ where
let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake(
&handshake, read_half, write_half, real_peer,
&config, &replay_checker, false,
&config, &replay_checker, false, None,
).await {
HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader, writer } => {
@@ -415,8 +424,16 @@ impl RunningClientHandler {
let mut local_addr = self.stream.local_addr().map_err(ProxyError::Io)?;
if self.proxy_protocol_enabled {
match parse_proxy_protocol(&mut self.stream, self.peer).await {
Ok(info) => {
let proxy_header_timeout = Duration::from_millis(
self.config.server.proxy_protocol_header_timeout_ms.max(1),
);
match timeout(
proxy_header_timeout,
parse_proxy_protocol(&mut self.stream, self.peer),
)
.await
{
Ok(Ok(info)) => {
debug!(
peer = %self.peer,
client = %info.src_addr,
@@ -428,7 +445,7 @@ impl RunningClientHandler {
local_addr = dst;
}
}
Err(e) => {
Ok(Err(e)) => {
self.stats.increment_connects_bad();
warn!(peer = %self.peer, error = %e, "Invalid PROXY protocol header");
record_beobachten_class(
@@ -439,6 +456,21 @@ impl RunningClientHandler {
);
return Err(e);
}
Err(_) => {
self.stats.increment_connects_bad();
warn!(
peer = %self.peer,
timeout_ms = proxy_header_timeout.as_millis(),
"PROXY protocol header timeout"
);
record_beobachten_class(
&self.beobachten,
&self.config,
self.peer.ip(),
"other",
);
return Err(ProxyError::InvalidProxyProtocol);
}
}
}
@@ -494,7 +526,7 @@ impl RunningClientHandler {
let (read_half, write_half) = self.stream.into_split();
let (mut tls_reader, tls_writer, _tls_user) = match handle_tls_handshake(
let (mut tls_reader, tls_writer, tls_user) = match handle_tls_handshake(
&handshake,
read_half,
write_half,
@@ -538,6 +570,7 @@ impl RunningClientHandler {
&config,
&replay_checker,
true,
Some(tls_user.as_str()),
)
.await
{
@@ -611,6 +644,7 @@ impl RunningClientHandler {
&config,
&replay_checker,
false,
None,
)
.await
{

View File

@@ -34,7 +34,7 @@ where
let user = &success.user;
let dc_addr = get_dc_addr_static(success.dc_idx, &config)?;
info!(
debug!(
user = %user,
peer = %success.peer,
dc = success.dc_idx,

View File

@@ -6,7 +6,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tracing::{debug, warn, trace, info};
use tracing::{debug, warn, trace};
use zeroize::Zeroize;
use crate::crypto::{sha256, AesCtr, SecureRandom};
@@ -19,6 +19,31 @@ use crate::stats::ReplayChecker;
use crate::config::ProxyConfig;
use crate::tls_front::{TlsFrontCache, emulator};
fn decode_user_secrets(
config: &ProxyConfig,
preferred_user: Option<&str>,
) -> Vec<(String, Vec<u8>)> {
let mut secrets = Vec::with_capacity(config.access.users.len());
if let Some(preferred) = preferred_user
&& let Some(secret_hex) = config.access.users.get(preferred)
&& let Ok(bytes) = hex::decode(secret_hex)
{
secrets.push((preferred.to_string(), bytes));
}
for (name, secret_hex) in &config.access.users {
if preferred_user.is_some_and(|preferred| preferred == name.as_str()) {
continue;
}
if let Ok(bytes) = hex::decode(secret_hex) {
secrets.push((name.clone(), bytes));
}
}
secrets
}
/// Result of successful handshake
///
/// Key material (`dec_key`, `dec_iv`, `enc_key`, `enc_iv`) is
@@ -82,11 +107,7 @@ where
return HandshakeResult::BadClient { reader, writer };
}
let secrets: Vec<(String, Vec<u8>)> = config.access.users.iter()
.filter_map(|(name, hex)| {
hex::decode(hex).ok().map(|bytes| (name.clone(), bytes))
})
.collect();
let secrets = decode_user_secrets(config, None);
let validation = match tls::validate_tls_handshake(
handshake,
@@ -201,7 +222,7 @@ where
return HandshakeResult::Error(ProxyError::Io(e));
}
info!(
debug!(
peer = %peer,
user = %validation.user,
"TLS handshake successful"
@@ -223,6 +244,7 @@ pub async fn handle_mtproto_handshake<R, W>(
config: &ProxyConfig,
replay_checker: &ReplayChecker,
is_tls: bool,
preferred_user: Option<&str>,
) -> HandshakeResult<(CryptoReader<R>, CryptoWriter<W>, HandshakeSuccess), R, W>
where
R: AsyncRead + Unpin + Send,
@@ -239,11 +261,9 @@ where
let enc_prekey_iv: Vec<u8> = dec_prekey_iv.iter().rev().copied().collect();
for (user, secret_hex) in &config.access.users {
let secret = match hex::decode(secret_hex) {
Ok(s) => s,
Err(_) => continue,
};
let decoded_users = decode_user_secrets(config, preferred_user);
for (user, secret) in decoded_users {
let dec_prekey = &dec_prekey_iv[..PREKEY_LEN];
let dec_iv_bytes = &dec_prekey_iv[PREKEY_LEN..];
@@ -311,7 +331,7 @@ where
is_tls,
};
info!(
debug!(
peer = %peer,
user = %user,
dc = dc_idx,

View File

@@ -8,7 +8,7 @@ use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info, trace, warn};
use tracing::{debug, trace, warn};
use crate::config::ProxyConfig;
use crate::crypto::SecureRandom;
@@ -210,7 +210,7 @@ where
let proto_tag = success.proto_tag;
let pool_generation = me_pool.current_generation();
info!(
debug!(
user = %user,
peer = %peer,
dc = success.dc_idx,

View File

@@ -846,16 +846,30 @@ impl Stats {
if !self.telemetry_user_enabled() {
return;
}
self.user_stats.entry(user.to_string()).or_default()
.connects.fetch_add(1, Ordering::Relaxed);
if let Some(stats) = self.user_stats.get(user) {
stats.connects.fetch_add(1, Ordering::Relaxed);
return;
}
self.user_stats
.entry(user.to_string())
.or_default()
.connects
.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_user_curr_connects(&self, user: &str) {
if !self.telemetry_user_enabled() {
return;
}
self.user_stats.entry(user.to_string()).or_default()
.curr_connects.fetch_add(1, Ordering::Relaxed);
if let Some(stats) = self.user_stats.get(user) {
stats.curr_connects.fetch_add(1, Ordering::Relaxed);
return;
}
self.user_stats
.entry(user.to_string())
.or_default()
.curr_connects
.fetch_add(1, Ordering::Relaxed);
}
pub fn decrement_user_curr_connects(&self, user: &str) {
@@ -889,32 +903,60 @@ impl Stats {
if !self.telemetry_user_enabled() {
return;
}
self.user_stats.entry(user.to_string()).or_default()
.octets_from_client.fetch_add(bytes, Ordering::Relaxed);
if let Some(stats) = self.user_stats.get(user) {
stats.octets_from_client.fetch_add(bytes, Ordering::Relaxed);
return;
}
self.user_stats
.entry(user.to_string())
.or_default()
.octets_from_client
.fetch_add(bytes, Ordering::Relaxed);
}
pub fn add_user_octets_to(&self, user: &str, bytes: u64) {
if !self.telemetry_user_enabled() {
return;
}
self.user_stats.entry(user.to_string()).or_default()
.octets_to_client.fetch_add(bytes, Ordering::Relaxed);
if let Some(stats) = self.user_stats.get(user) {
stats.octets_to_client.fetch_add(bytes, Ordering::Relaxed);
return;
}
self.user_stats
.entry(user.to_string())
.or_default()
.octets_to_client
.fetch_add(bytes, Ordering::Relaxed);
}
pub fn increment_user_msgs_from(&self, user: &str) {
if !self.telemetry_user_enabled() {
return;
}
self.user_stats.entry(user.to_string()).or_default()
.msgs_from_client.fetch_add(1, Ordering::Relaxed);
if let Some(stats) = self.user_stats.get(user) {
stats.msgs_from_client.fetch_add(1, Ordering::Relaxed);
return;
}
self.user_stats
.entry(user.to_string())
.or_default()
.msgs_from_client
.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_user_msgs_to(&self, user: &str) {
if !self.telemetry_user_enabled() {
return;
}
self.user_stats.entry(user.to_string()).or_default()
.msgs_to_client.fetch_add(1, Ordering::Relaxed);
if let Some(stats) = self.user_stats.get(user) {
stats.msgs_to_client.fetch_add(1, Ordering::Relaxed);
return;
}
self.user_stats
.entry(user.to_string())
.or_default()
.msgs_to_client
.fetch_add(1, Ordering::Relaxed);
}
pub fn get_user_total_octets(&self, user: &str) -> u64 {

View File

@@ -119,6 +119,8 @@ pub struct MePool {
pub(super) ping_tracker: Arc<Mutex<HashMap<i64, (std::time::Instant, u64)>>>,
pub(super) rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>,
pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>,
pub(super) nat_reflection_singleflight_v4: Arc<Mutex<()>>,
pub(super) nat_reflection_singleflight_v6: Arc<Mutex<()>>,
pub(super) writer_available: Arc<Notify>,
pub(super) refill_inflight: Arc<Mutex<HashSet<SocketAddr>>>,
pub(super) refill_inflight_dc: Arc<Mutex<HashSet<RefillDcKey>>>,
@@ -323,6 +325,8 @@ impl MePool {
ping_tracker: Arc::new(Mutex::new(HashMap::new())),
rtt_stats: Arc::new(Mutex::new(HashMap::new())),
nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())),
nat_reflection_singleflight_v4: Arc::new(Mutex::new(())),
nat_reflection_singleflight_v6: Arc::new(Mutex::new(())),
writer_available: Arc::new(Notify::new()),
refill_inflight: Arc::new(Mutex::new(HashSet::new())),
refill_inflight_dc: Arc::new(Mutex::new(HashSet::new())),

View File

@@ -248,6 +248,43 @@ impl MePool {
}
}
let _singleflight_guard = if use_shared_cache {
Some(match family {
IpFamily::V4 => self.nat_reflection_singleflight_v4.lock().await,
IpFamily::V6 => self.nat_reflection_singleflight_v6.lock().await,
})
} else {
None
};
if use_shared_cache
&& let Some(until) = *self.stun_backoff_until.read().await
&& Instant::now() < until
{
if let Ok(cache) = self.nat_reflection_cache.try_lock() {
let slot = match family {
IpFamily::V4 => cache.v4,
IpFamily::V6 => cache.v6,
};
return slot.map(|(_, addr)| addr);
}
return None;
}
if use_shared_cache
&& let Ok(mut cache) = self.nat_reflection_cache.try_lock()
{
let slot = match family {
IpFamily::V4 => &mut cache.v4,
IpFamily::V6 => &mut cache.v6,
};
if let Some((ts, addr)) = slot
&& ts.elapsed() < STUN_CACHE_TTL
{
return Some(*addr);
}
}
let attempt = if use_shared_cache {
self.nat_probe_attempts.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
} else {

View File

@@ -124,7 +124,7 @@ pub(crate) async fn reader_loop(
let data = Bytes::copy_from_slice(&body[12..]);
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
let routed = reg.route(cid, MeResponse::Data { flags, data }).await;
let routed = reg.route_nowait(cid, MeResponse::Data { flags, data }).await;
if !matches!(routed, RouteResult::Routed) {
match routed {
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
@@ -147,7 +147,7 @@ pub(crate) async fn reader_loop(
let cfm = u32::from_le_bytes(body[8..12].try_into().unwrap());
trace!(cid, cfm, "RPC_SIMPLE_ACK");
let routed = reg.route(cid, MeResponse::Ack(cfm)).await;
let routed = reg.route_nowait(cid, MeResponse::Ack(cfm)).await;
if !matches!(routed, RouteResult::Routed) {
match routed {
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),

View File

@@ -208,6 +208,23 @@ impl ConnRegistry {
}
}
pub async fn route_nowait(&self, id: u64, resp: MeResponse) -> RouteResult {
let tx = {
let inner = self.inner.read().await;
inner.map.get(&id).cloned()
};
let Some(tx) = tx else {
return RouteResult::NoConn;
};
match tx.try_send(resp) {
Ok(()) => RouteResult::Routed,
Err(TrySendError::Closed(_)) => RouteResult::ChannelClosed,
Err(TrySendError::Full(_)) => RouteResult::QueueFullBase,
}
}
pub async fn bind_writer(
&self,
conn_id: u64,

View File

@@ -225,6 +225,7 @@ pub struct UpstreamManager {
upstreams: Arc<RwLock<Vec<UpstreamState>>>,
connect_retry_attempts: u32,
connect_retry_backoff: Duration,
connect_budget: Duration,
unhealthy_fail_threshold: u32,
connect_failfast_hard_errors: bool,
stats: Arc<Stats>,
@@ -235,6 +236,7 @@ impl UpstreamManager {
configs: Vec<UpstreamConfig>,
connect_retry_attempts: u32,
connect_retry_backoff_ms: u64,
connect_budget_ms: u64,
unhealthy_fail_threshold: u32,
connect_failfast_hard_errors: bool,
stats: Arc<Stats>,
@@ -248,6 +250,7 @@ impl UpstreamManager {
upstreams: Arc::new(RwLock::new(states)),
connect_retry_attempts: connect_retry_attempts.max(1),
connect_retry_backoff: Duration::from_millis(connect_retry_backoff_ms),
connect_budget: Duration::from_millis(connect_budget_ms.max(1)),
unhealthy_fail_threshold: unhealthy_fail_threshold.max(1),
connect_failfast_hard_errors,
stats,
@@ -593,11 +596,27 @@ impl UpstreamManager {
let mut last_error: Option<ProxyError> = None;
let mut attempts_used = 0u32;
for attempt in 1..=self.connect_retry_attempts {
let elapsed = connect_started_at.elapsed();
if elapsed >= self.connect_budget {
last_error = Some(ProxyError::ConnectionTimeout {
addr: target.to_string(),
});
break;
}
let remaining_budget = self.connect_budget.saturating_sub(elapsed);
let attempt_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS)
.min(remaining_budget);
if attempt_timeout.is_zero() {
last_error = Some(ProxyError::ConnectionTimeout {
addr: target.to_string(),
});
break;
}
attempts_used = attempt;
self.stats.increment_upstream_connect_attempt_total();
let start = Instant::now();
match self
.connect_via_upstream(&upstream, target, bind_rr.clone())
.connect_via_upstream(&upstream, target, bind_rr.clone(), attempt_timeout)
.await
{
Ok((stream, egress)) => {
@@ -707,6 +726,7 @@ impl UpstreamManager {
config: &UpstreamConfig,
target: SocketAddr,
bind_rr: Option<Arc<AtomicUsize>>,
connect_timeout: Duration,
) -> Result<(TcpStream, UpstreamEgressInfo)> {
match &config.upstream_type {
UpstreamType::Direct { interface, bind_addresses } => {
@@ -735,7 +755,6 @@ impl UpstreamManager {
let std_stream: std::net::TcpStream = socket.into();
let stream = TcpStream::from_std(std_stream)?;
let connect_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS);
match tokio::time::timeout(connect_timeout, stream.writable()).await {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(ProxyError::Io(e)),
@@ -762,7 +781,6 @@ impl UpstreamManager {
))
},
UpstreamType::Socks4 { address, interface, user_id } => {
let connect_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS);
// Try to parse as SocketAddr first (IP:port), otherwise treat as hostname:port
let mut stream = if let Ok(proxy_addr) = address.parse::<SocketAddr>() {
// IP:port format - use socket with optional interface binding
@@ -841,7 +859,6 @@ impl UpstreamManager {
))
},
UpstreamType::Socks5 { address, interface, username, password } => {
let connect_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS);
// Try to parse as SocketAddr first (IP:port), otherwise treat as hostname:port
let mut stream = if let Ok(proxy_addr) = address.parse::<SocketAddr>() {
// IP:port format - use socket with optional interface binding
@@ -1165,7 +1182,14 @@ impl UpstreamManager {
target: SocketAddr,
) -> Result<f64> {
let start = Instant::now();
let _ = self.connect_via_upstream(config, target, bind_rr).await?;
let _ = self
.connect_via_upstream(
config,
target,
bind_rr,
Duration::from_secs(DC_PING_TIMEOUT_SECS),
)
.await?;
Ok(start.elapsed().as_secs_f64() * 1000.0)
}
@@ -1337,7 +1361,12 @@ impl UpstreamManager {
let start = Instant::now();
let result = tokio::time::timeout(
Duration::from_secs(HEALTH_CHECK_CONNECT_TIMEOUT_SECS),
self.connect_via_upstream(&config, endpoint, Some(bind_rr.clone())),
self.connect_via_upstream(
&config,
endpoint,
Some(bind_rr.clone()),
Duration::from_secs(HEALTH_CHECK_CONNECT_TIMEOUT_SECS),
),
)
.await;