diff --git a/docs/API.md b/docs/API.md index bd8f892..50cfb4e 100644 --- a/docs/API.md +++ b/docs/API.md @@ -13,13 +13,18 @@ API runtime is configured in `[server.api]`. | `listen` | `string` (`IP:PORT`) | `127.0.0.1:9091` | API bind address. | | `whitelist` | `CIDR[]` | `127.0.0.1/32, ::1/128` | Source IP allowlist. Empty list means allow all. | | `auth_header` | `string` | `""` | Exact value for `Authorization` header. Empty disables header auth. | -| `request_body_limit_bytes` | `usize` | `65536` | Maximum request body size. | +| `request_body_limit_bytes` | `usize` | `65536` | Maximum request body size. Must be `> 0`. | | `minimal_runtime_enabled` | `bool` | `false` | Enables runtime snapshot endpoints requiring ME pool read-lock aggregation. | -| `minimal_runtime_cache_ttl_ms` | `u64` | `1000` | Cache TTL for minimal snapshots. `0` disables cache. | +| `minimal_runtime_cache_ttl_ms` | `u64` | `1000` | Cache TTL for minimal snapshots. `0` disables cache; valid range is `[0, 60000]`. | | `read_only` | `bool` | `false` | Disables mutating endpoints. | `server.admin_api` is accepted as an alias for backward compatibility. +Runtime validation for API config: +- `server.api.listen` must be a valid `IP:PORT`. +- `server.api.request_body_limit_bytes` must be `> 0`. +- `server.api.minimal_runtime_cache_ttl_ms` must be within `[0, 60000]`. + ## Protocol Contract | Item | Value | @@ -51,6 +56,21 @@ API runtime is configured in `[server.api]`. } ``` +## Request Processing Order + +Requests are processed in this order: +1. `api_enabled` gate (`503 api_disabled` if disabled). +2. Source IP whitelist gate (`403 forbidden`). +3. `Authorization` header gate when configured (`401 unauthorized`). +4. Route and method matching (`404 not_found` or `405 method_not_allowed`). +5. `read_only` gate for mutating routes (`403 read_only`). +6. Request body read/limit/JSON decode (`413 payload_too_large`, `400 bad_request`). +7. Business validation and config write path. + +Notes: +- Whitelist is evaluated against the direct TCP peer IP (`SocketAddr::ip`), without `X-Forwarded-For` support. +- `Authorization` check is exact string equality against configured `auth_header`. + ## Endpoint Matrix | Method | Path | Body | Success | `data` contract | @@ -58,6 +78,7 @@ API runtime is configured in `[server.api]`. | `GET` | `/v1/health` | none | `200` | `HealthData` | | `GET` | `/v1/stats/summary` | none | `200` | `SummaryData` | | `GET` | `/v1/stats/zero/all` | none | `200` | `ZeroAllData` | +| `GET` | `/v1/stats/upstreams` | none | `200` | `UpstreamsData` | | `GET` | `/v1/stats/minimal/all` | none | `200` | `MinimalAllData` | | `GET` | `/v1/stats/me-writers` | none | `200` | `MeWritersData` | | `GET` | `/v1/stats/dcs` | none | `200` | `DcStatusData` | @@ -67,7 +88,7 @@ API runtime is configured in `[server.api]`. | `GET` | `/v1/users/{username}` | none | `200` | `UserInfo` | | `PATCH` | `/v1/users/{username}` | `PatchUserRequest` | `200` | `UserInfo` | | `DELETE` | `/v1/users/{username}` | none | `200` | `string` (deleted username) | -| `POST` | `/v1/users/{username}/rotate-secret` | `RotateSecretRequest` or empty body | `200` | `CreateUserResponse` | +| `POST` | `/v1/users/{username}/rotate-secret` | `RotateSecretRequest` or empty body | `404` | `ErrorResponse` (`not_found`, current runtime behavior) | ## Common Error Codes @@ -77,8 +98,8 @@ API runtime is configured in `[server.api]`. | `401` | `unauthorized` | Missing/invalid `Authorization` when `auth_header` is configured. | | `403` | `forbidden` | Source IP is not allowed by whitelist. | | `403` | `read_only` | Mutating endpoint called while `read_only=true`. | -| `404` | `not_found` | Unknown route or unknown user. | -| `405` | `method_not_allowed` | Unsupported method for an existing user route. | +| `404` | `not_found` | Unknown route, unknown user, or unsupported sub-route (including current `rotate-secret` route). | +| `405` | `method_not_allowed` | Unsupported method for `/v1/users/{username}` route shape. | | `409` | `revision_conflict` | `If-Match` revision mismatch. | | `409` | `user_exists` | User already exists on create. | | `409` | `last_user_forbidden` | Attempt to delete last configured user. | @@ -86,6 +107,28 @@ API runtime is configured in `[server.api]`. | `500` | `internal_error` | Internal error (I/O, serialization, config load/save). | | `503` | `api_disabled` | API disabled in config. | +## Routing and Method Edge Cases + +| Case | Behavior | +| --- | --- | +| Path matching | Exact match on `req.uri().path()`. Query string does not affect route matching. | +| Trailing slash | Not normalized. Example: `/v1/users/` is `404`. | +| Username route with extra slash | `/v1/users/{username}/...` is not treated as user route and returns `404`. | +| `PUT /v1/users/{username}` | `405 method_not_allowed`. | +| `POST /v1/users/{username}` | `404 not_found`. | +| `POST /v1/users/{username}/rotate-secret` | `404 not_found` in current release due route matcher limitation. | + +## Body and JSON Semantics + +- Request body is read only for mutating routes that define a body contract. +- Body size limit is enforced during streaming read (`413 payload_too_large`). +- Invalid transport body frame returns `400 bad_request` (`Invalid request body`). +- Invalid JSON returns `400 bad_request` (`Invalid JSON body`). +- `Content-Type` is not required for JSON parsing. +- Unknown JSON fields are ignored by deserialization. +- `PATCH` updates only provided fields and does not support explicit clearing of optional fields. +- `If-Match` supports both quoted and unquoted values; surrounding whitespace is trimmed. + ## Request Contracts ### `CreateUserRequest` @@ -114,6 +157,8 @@ API runtime is configured in `[server.api]`. | --- | --- | --- | --- | | `secret` | `string` | no | Exactly 32 hex chars. If missing, generated automatically. | +Note: the request contract is defined, but the corresponding route currently returns `404` (see routing edge cases). + ## Response Data Contracts ### `HealthData` @@ -173,6 +218,47 @@ API runtime is configured in `[server.api]`. | `connect_duration_fail_bucket_501_1000ms` | `u64` | Failed connects 501-1000 ms. | | `connect_duration_fail_bucket_gt_1000ms` | `u64` | Failed connects >1000 ms. | +### `UpstreamsData` +| Field | Type | Description | +| --- | --- | --- | +| `enabled` | `bool` | Runtime upstream snapshot availability according to API config. | +| `reason` | `string?` | `feature_disabled` or `source_unavailable` when runtime snapshot is unavailable. | +| `generated_at_epoch_secs` | `u64` | Snapshot generation time. | +| `zero` | `ZeroUpstreamData` | Always available zero-cost upstream counters block. | +| `summary` | `UpstreamSummaryData?` | Runtime upstream aggregate view, null when unavailable. | +| `upstreams` | `UpstreamStatus[]?` | Per-upstream runtime status rows, null when unavailable. | + +#### `UpstreamSummaryData` +| Field | Type | Description | +| --- | --- | --- | +| `configured_total` | `usize` | Total configured upstream entries. | +| `healthy_total` | `usize` | Upstreams currently marked healthy. | +| `unhealthy_total` | `usize` | Upstreams currently marked unhealthy. | +| `direct_total` | `usize` | Number of direct upstream entries. | +| `socks4_total` | `usize` | Number of SOCKS4 upstream entries. | +| `socks5_total` | `usize` | Number of SOCKS5 upstream entries. | + +#### `UpstreamStatus` +| Field | Type | Description | +| --- | --- | --- | +| `upstream_id` | `usize` | Runtime upstream index. | +| `route_kind` | `string` | Upstream route kind: `direct`, `socks4`, `socks5`. | +| `address` | `string` | Upstream address (`direct` for direct route kind). Authentication fields are intentionally omitted. | +| `weight` | `u16` | Selection weight. | +| `scopes` | `string` | Configured scope selector string. | +| `healthy` | `bool` | Current health flag. | +| `fails` | `u32` | Consecutive fail counter. | +| `last_check_age_secs` | `u64` | Seconds since the last health-check update. | +| `effective_latency_ms` | `f64?` | Effective upstream latency used by selector. | +| `dc` | `UpstreamDcStatus[]` | Per-DC latency/IP preference snapshot. | + +#### `UpstreamDcStatus` +| Field | Type | Description | +| --- | --- | --- | +| `dc` | `i16` | Telegram DC id. | +| `latency_ema_ms` | `f64?` | Per-DC latency EMA value. | +| `ip_preference` | `string` | Per-DC IP family preference: `unknown`, `prefer_v4`, `prefer_v6`, `both_work`, `unavailable`. | + #### `ZeroMiddleProxyData` | Field | Type | Description | | --- | --- | --- | @@ -392,8 +478,11 @@ API runtime is configured in `[server.api]`. Link generation uses active config and enabled modes: - `[general.links].public_host/public_port` have priority. +- If `public_host` is not set, startup-detected public IPs are used (`IPv4`, `IPv6`, or both when available). - Fallback host sources: listener `announce`, `announce_ip`, explicit listener `ip`. - Legacy fallback: `listen_addr_ipv4` and `listen_addr_ipv6` when routable. +- Startup-detected IPs are fixed for process lifetime and refreshed on restart. +- User rows are sorted by `username` in ascending lexical order. ### `CreateUserResponse` | Field | Type | Description | @@ -407,21 +496,53 @@ Link generation uses active config and enabled modes: | --- | --- | | `POST /v1/users` | Creates user and validates resulting config before atomic save. | | `PATCH /v1/users/{username}` | Partial update of provided fields only. Missing fields remain unchanged. | -| `POST /v1/users/{username}/rotate-secret` | Replaces secret. Empty body is allowed and auto-generates secret. | +| `POST /v1/users/{username}/rotate-secret` | Currently returns `404` in runtime route matcher; request schema is reserved for intended behavior. | | `DELETE /v1/users/{username}` | Deletes user and related optional settings. Last user deletion is blocked. | All mutating endpoints: - Respect `read_only` mode. - Accept optional `If-Match` for optimistic concurrency. - Return new `revision` after successful write. +- Use process-local mutation lock + atomic write (`tmp + rename`) for config persistence. + +## Runtime State Matrix + +| Endpoint | `minimal_runtime_enabled=false` | `minimal_runtime_enabled=true` + source unavailable | `minimal_runtime_enabled=true` + source available | +| --- | --- | --- | --- | +| `/v1/stats/minimal/all` | `enabled=false`, `reason=feature_disabled`, `data=null` | `enabled=true`, `reason=source_unavailable`, fallback `data` with disabled ME blocks | `enabled=true`, `reason` omitted, full payload | +| `/v1/stats/me-writers` | `middle_proxy_enabled=false`, `reason=feature_disabled` | `middle_proxy_enabled=false`, `reason=source_unavailable` | `middle_proxy_enabled=true`, runtime snapshot | +| `/v1/stats/dcs` | `middle_proxy_enabled=false`, `reason=feature_disabled` | `middle_proxy_enabled=false`, `reason=source_unavailable` | `middle_proxy_enabled=true`, runtime snapshot | +| `/v1/stats/upstreams` | `enabled=false`, `reason=feature_disabled`, `summary/upstreams` omitted, `zero` still present | `enabled=true`, `reason=source_unavailable`, `summary/upstreams` omitted, `zero` present | `enabled=true`, `reason` omitted, `summary/upstreams` present, `zero` present | + +`source_unavailable` conditions: +- ME endpoints: ME pool is absent (for example direct-only mode or failed ME initialization). +- Upstreams endpoint: non-blocking upstream snapshot lock is unavailable at request time. + +## Serialization Rules + +- Success responses always include `revision`. +- Error responses never include `revision`; they include `request_id`. +- Optional fields with `skip_serializing_if` are omitted when absent. +- Nullable payload fields may still be `null` where contract uses `?` (for example `UserInfo` option fields). +- For `/v1/stats/upstreams`, authentication details of SOCKS upstreams are intentionally omitted. ## Operational Notes | Topic | Details | | --- | --- | -| API startup | API binds only when `[server.api].enabled=true`. | -| Restart requirements | Changes in `server.api` settings require process restart. | +| API startup | API listener is spawned only when `[server.api].enabled=true`. | +| `listen` port `0` | API spawn is skipped when parsed listen port is `0` (treated as disabled bind target). | +| Bind failure | Failed API bind logs warning and API task exits (no auto-retry loop). | +| ME runtime status endpoints | `/v1/stats/me-writers`, `/v1/stats/dcs`, `/v1/stats/minimal/all` require `[server.api].minimal_runtime_enabled=true`; otherwise they return disabled payload with `reason=feature_disabled`. | +| Upstream runtime endpoint | `/v1/stats/upstreams` always returns `zero`, but runtime fields (`summary`, `upstreams`) require `[server.api].minimal_runtime_enabled=true`. | +| Restart requirements | `server.api` changes are restart-required for predictable behavior. | +| Hot-reload nuance | A pure `server.api`-only config change may not propagate through watcher broadcast; a mixed change (with hot fields) may propagate API flags while still warning that restart is required. | | Runtime apply path | Successful writes are picked up by existing config watcher/hot-reload path. | | Exposure | Built-in TLS/mTLS is not provided. Use loopback bind + reverse proxy if needed. | | Pagination | User list currently has no pagination/filtering. | | Serialization side effect | Config comments/manual formatting are not preserved on write. | + +## Known Limitations (Current Release) + +- `POST /v1/users/{username}/rotate-secret` is currently unreachable in route matcher and returns `404`. +- API runtime controls under `server.api` are documented as restart-required; hot-reload behavior for these fields is not strictly uniform in all change combinations. diff --git a/src/api/users.rs b/src/api/users.rs index c907070..8e90c7f 100644 --- a/src/api/users.rs +++ b/src/api/users.rs @@ -287,6 +287,7 @@ pub(super) async fn delete_user( .map_err(|e| ApiFailure::bad_request(format!("config validation failed: {}", e)))?; let revision = save_config_to_disk(&shared.config_path, &cfg).await?; drop(_guard); + shared.ip_tracker.remove_user_limit(user).await; shared.ip_tracker.clear_user_ips(user).await; Ok((user.to_string(), revision)) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 86f569b..b73013a 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -12,6 +12,7 @@ const DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS: u8 = 2; const DEFAULT_ME_ADAPTIVE_FLOOR_IDLE_SECS: u64 = 90; const DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT: u8 = 1; const DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS: u64 = 180; +const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5; const DEFAULT_LISTEN_ADDR_IPV6: &str = "::"; @@ -152,6 +153,14 @@ pub(crate) fn default_middle_proxy_warm_standby() -> usize { DEFAULT_MIDDLE_PROXY_WARM_STANDBY } +pub(crate) fn default_me_init_retry_attempts() -> u32 { + 0 +} + +pub(crate) fn default_me2dc_fallback() -> bool { + true +} + pub(crate) fn default_keepalive_interval() -> u64 { 8 } @@ -464,6 +473,10 @@ pub(crate) fn default_access_users() -> HashMap { )]) } +pub(crate) fn default_user_max_unique_ips_window_secs() -> u64 { + DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS +} + // Custom deserializer helpers #[derive(Deserialize)] diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index d752d45..b03f83e 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -9,20 +9,17 @@ //! | `general` | `log_level` | Filter updated via `log_level_tx` | //! | `access` | `user_ad_tags` | Passed on next connection | //! | `general` | `ad_tag` | Passed on next connection (fallback per-user) | -//! | `general` | `middle_proxy_pool_size` | Passed on next connection | -//! | `general` | `me_keepalive_*` | Passed on next connection | //! | `general` | `desync_all_full` | Applied immediately | //! | `general` | `update_every` | Applied to ME updater immediately | -//! | `general` | `hardswap` | Applied on next ME map update | -//! | `general` | `me_pool_drain_ttl_secs` | Applied on next ME map update | -//! | `general` | `me_pool_min_fresh_ratio` | Applied on next ME map update | -//! | `general` | `me_reinit_drain_timeout_secs` | Applied on next ME map update | +//! | `general` | `me_reinit_*` | Applied to ME reinit scheduler immediately | +//! | `general` | `hardswap` / `me_*_reinit` | Applied on next ME map update | //! | `general` | `telemetry` / `me_*_policy` | Applied immediately | //! | `network` | `dns_overrides` | Applied immediately | //! | `access` | All user/quota fields | Effective immediately | //! //! Fields that require re-binding sockets (`server.port`, `censorship.*`, //! `network.*`, `use_middle_proxy`) are **not** applied; a warning is emitted. +//! Non-hot changes are never mixed into the runtime config snapshot. use std::net::IpAddr; use std::path::PathBuf; @@ -32,7 +29,7 @@ use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher}; use tokio::sync::{mpsc, watch}; use tracing::{error, info, warn}; -use crate::config::{LogLevel, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel}; +use crate::config::{LogLevel, MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel}; use super::load::ProxyConfig; // ── Hot fields ──────────────────────────────────────────────────────────────── @@ -43,17 +40,37 @@ pub struct HotFields { pub log_level: LogLevel, pub ad_tag: Option, pub dns_overrides: Vec, - pub middle_proxy_pool_size: usize, pub desync_all_full: bool, pub update_every_secs: u64, + pub me_reinit_every_secs: u64, + pub me_reinit_singleflight: bool, + pub me_reinit_coalesce_window_ms: u64, pub hardswap: bool, pub me_pool_drain_ttl_secs: u64, pub me_pool_min_fresh_ratio: f32, pub me_reinit_drain_timeout_secs: u64, - pub me_keepalive_enabled: bool, - pub me_keepalive_interval_secs: u64, - pub me_keepalive_jitter_secs: u64, - pub me_keepalive_payload_random: bool, + pub me_hardswap_warmup_delay_min_ms: u64, + pub me_hardswap_warmup_delay_max_ms: u64, + pub me_hardswap_warmup_extra_passes: u8, + pub me_hardswap_warmup_pass_backoff_base_ms: u64, + pub me_bind_stale_mode: MeBindStaleMode, + pub me_bind_stale_ttl_secs: u64, + pub me_secret_atomic_snapshot: bool, + pub me_deterministic_writer_sort: bool, + pub me_single_endpoint_shadow_writers: u8, + pub me_single_endpoint_outage_mode_enabled: bool, + pub me_single_endpoint_outage_disable_quarantine: bool, + pub me_single_endpoint_outage_backoff_min_ms: u64, + pub me_single_endpoint_outage_backoff_max_ms: u64, + pub me_single_endpoint_shadow_rotate_every_secs: u64, + pub me_config_stable_snapshots: u8, + pub me_config_apply_cooldown_secs: u64, + pub me_snapshot_require_http_2xx: bool, + pub me_snapshot_reject_empty_map: bool, + pub me_snapshot_min_proxy_for_lines: u32, + pub proxy_secret_stable_snapshots: u8, + pub proxy_secret_rotate_runtime: bool, + pub proxy_secret_len_max: usize, pub telemetry_core_enabled: bool, pub telemetry_user_enabled: bool, pub telemetry_me_level: MeTelemetryLevel, @@ -65,7 +82,14 @@ pub struct HotFields { pub me_route_backpressure_base_timeout_ms: u64, pub me_route_backpressure_high_timeout_ms: u64, pub me_route_backpressure_high_watermark_pct: u8, - pub access: crate::config::AccessConfig, + pub users: std::collections::HashMap, + pub user_ad_tags: std::collections::HashMap, + pub user_max_tcp_conns: std::collections::HashMap, + pub user_expirations: std::collections::HashMap>, + pub user_data_quota: std::collections::HashMap, + pub user_max_unique_ips: std::collections::HashMap, + pub user_max_unique_ips_mode: crate::config::UserMaxUniqueIpsMode, + pub user_max_unique_ips_window_secs: u64, } impl HotFields { @@ -74,17 +98,49 @@ impl HotFields { log_level: cfg.general.log_level.clone(), ad_tag: cfg.general.ad_tag.clone(), dns_overrides: cfg.network.dns_overrides.clone(), - middle_proxy_pool_size: cfg.general.middle_proxy_pool_size, desync_all_full: cfg.general.desync_all_full, update_every_secs: cfg.general.effective_update_every_secs(), + me_reinit_every_secs: cfg.general.me_reinit_every_secs, + me_reinit_singleflight: cfg.general.me_reinit_singleflight, + me_reinit_coalesce_window_ms: cfg.general.me_reinit_coalesce_window_ms, hardswap: cfg.general.hardswap, me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs, me_pool_min_fresh_ratio: cfg.general.me_pool_min_fresh_ratio, me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs, - me_keepalive_enabled: cfg.general.me_keepalive_enabled, - me_keepalive_interval_secs: cfg.general.me_keepalive_interval_secs, - me_keepalive_jitter_secs: cfg.general.me_keepalive_jitter_secs, - me_keepalive_payload_random: cfg.general.me_keepalive_payload_random, + me_hardswap_warmup_delay_min_ms: cfg.general.me_hardswap_warmup_delay_min_ms, + me_hardswap_warmup_delay_max_ms: cfg.general.me_hardswap_warmup_delay_max_ms, + me_hardswap_warmup_extra_passes: cfg.general.me_hardswap_warmup_extra_passes, + me_hardswap_warmup_pass_backoff_base_ms: cfg + .general + .me_hardswap_warmup_pass_backoff_base_ms, + me_bind_stale_mode: cfg.general.me_bind_stale_mode, + me_bind_stale_ttl_secs: cfg.general.me_bind_stale_ttl_secs, + me_secret_atomic_snapshot: cfg.general.me_secret_atomic_snapshot, + me_deterministic_writer_sort: cfg.general.me_deterministic_writer_sort, + me_single_endpoint_shadow_writers: cfg.general.me_single_endpoint_shadow_writers, + me_single_endpoint_outage_mode_enabled: cfg + .general + .me_single_endpoint_outage_mode_enabled, + me_single_endpoint_outage_disable_quarantine: cfg + .general + .me_single_endpoint_outage_disable_quarantine, + me_single_endpoint_outage_backoff_min_ms: cfg + .general + .me_single_endpoint_outage_backoff_min_ms, + me_single_endpoint_outage_backoff_max_ms: cfg + .general + .me_single_endpoint_outage_backoff_max_ms, + me_single_endpoint_shadow_rotate_every_secs: cfg + .general + .me_single_endpoint_shadow_rotate_every_secs, + me_config_stable_snapshots: cfg.general.me_config_stable_snapshots, + me_config_apply_cooldown_secs: cfg.general.me_config_apply_cooldown_secs, + me_snapshot_require_http_2xx: cfg.general.me_snapshot_require_http_2xx, + me_snapshot_reject_empty_map: cfg.general.me_snapshot_reject_empty_map, + me_snapshot_min_proxy_for_lines: cfg.general.me_snapshot_min_proxy_for_lines, + proxy_secret_stable_snapshots: cfg.general.proxy_secret_stable_snapshots, + proxy_secret_rotate_runtime: cfg.general.proxy_secret_rotate_runtime, + proxy_secret_len_max: cfg.general.proxy_secret_len_max, telemetry_core_enabled: cfg.general.telemetry.core_enabled, telemetry_user_enabled: cfg.general.telemetry.user_enabled, telemetry_me_level: cfg.general.telemetry.me_level, @@ -100,16 +156,149 @@ impl HotFields { me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms, me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms, me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct, - access: cfg.access.clone(), + users: cfg.access.users.clone(), + user_ad_tags: cfg.access.user_ad_tags.clone(), + user_max_tcp_conns: cfg.access.user_max_tcp_conns.clone(), + user_expirations: cfg.access.user_expirations.clone(), + user_data_quota: cfg.access.user_data_quota.clone(), + user_max_unique_ips: cfg.access.user_max_unique_ips.clone(), + user_max_unique_ips_mode: cfg.access.user_max_unique_ips_mode, + user_max_unique_ips_window_secs: cfg.access.user_max_unique_ips_window_secs, } } } // ── Helpers ─────────────────────────────────────────────────────────────────── +fn canonicalize_json(value: &mut serde_json::Value) { + match value { + serde_json::Value::Object(map) => { + let mut pairs: Vec<(String, serde_json::Value)> = + std::mem::take(map).into_iter().collect(); + pairs.sort_by(|a, b| a.0.cmp(&b.0)); + for (_, item) in pairs.iter_mut() { + canonicalize_json(item); + } + for (key, item) in pairs { + map.insert(key, item); + } + } + serde_json::Value::Array(items) => { + for item in items { + canonicalize_json(item); + } + } + _ => {} + } +} + +fn config_equal(lhs: &ProxyConfig, rhs: &ProxyConfig) -> bool { + let mut left = match serde_json::to_value(lhs) { + Ok(value) => value, + Err(_) => return false, + }; + let mut right = match serde_json::to_value(rhs) { + Ok(value) => value, + Err(_) => return false, + }; + canonicalize_json(&mut left); + canonicalize_json(&mut right); + left == right +} + +fn listeners_equal( + lhs: &[crate::config::ListenerConfig], + rhs: &[crate::config::ListenerConfig], +) -> bool { + if lhs.len() != rhs.len() { + return false; + } + lhs.iter().zip(rhs.iter()).all(|(a, b)| { + a.ip == b.ip + && a.announce == b.announce + && a.announce_ip == b.announce_ip + && a.proxy_protocol == b.proxy_protocol + && a.reuse_allow == b.reuse_allow + }) +} + +fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig { + let mut cfg = old.clone(); + + cfg.general.log_level = new.general.log_level.clone(); + cfg.general.ad_tag = new.general.ad_tag.clone(); + cfg.network.dns_overrides = new.network.dns_overrides.clone(); + cfg.general.desync_all_full = new.general.desync_all_full; + cfg.general.update_every = new.general.update_every; + cfg.general.proxy_secret_auto_reload_secs = new.general.proxy_secret_auto_reload_secs; + cfg.general.proxy_config_auto_reload_secs = new.general.proxy_config_auto_reload_secs; + cfg.general.me_reinit_every_secs = new.general.me_reinit_every_secs; + cfg.general.me_reinit_singleflight = new.general.me_reinit_singleflight; + cfg.general.me_reinit_coalesce_window_ms = new.general.me_reinit_coalesce_window_ms; + cfg.general.hardswap = new.general.hardswap; + cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs; + cfg.general.me_pool_min_fresh_ratio = new.general.me_pool_min_fresh_ratio; + cfg.general.me_reinit_drain_timeout_secs = new.general.me_reinit_drain_timeout_secs; + cfg.general.me_hardswap_warmup_delay_min_ms = new.general.me_hardswap_warmup_delay_min_ms; + cfg.general.me_hardswap_warmup_delay_max_ms = new.general.me_hardswap_warmup_delay_max_ms; + cfg.general.me_hardswap_warmup_extra_passes = new.general.me_hardswap_warmup_extra_passes; + cfg.general.me_hardswap_warmup_pass_backoff_base_ms = + new.general.me_hardswap_warmup_pass_backoff_base_ms; + cfg.general.me_bind_stale_mode = new.general.me_bind_stale_mode; + cfg.general.me_bind_stale_ttl_secs = new.general.me_bind_stale_ttl_secs; + cfg.general.me_secret_atomic_snapshot = new.general.me_secret_atomic_snapshot; + cfg.general.me_deterministic_writer_sort = new.general.me_deterministic_writer_sort; + cfg.general.me_single_endpoint_shadow_writers = new.general.me_single_endpoint_shadow_writers; + cfg.general.me_single_endpoint_outage_mode_enabled = + new.general.me_single_endpoint_outage_mode_enabled; + cfg.general.me_single_endpoint_outage_disable_quarantine = + new.general.me_single_endpoint_outage_disable_quarantine; + cfg.general.me_single_endpoint_outage_backoff_min_ms = + new.general.me_single_endpoint_outage_backoff_min_ms; + cfg.general.me_single_endpoint_outage_backoff_max_ms = + new.general.me_single_endpoint_outage_backoff_max_ms; + cfg.general.me_single_endpoint_shadow_rotate_every_secs = + new.general.me_single_endpoint_shadow_rotate_every_secs; + cfg.general.me_config_stable_snapshots = new.general.me_config_stable_snapshots; + cfg.general.me_config_apply_cooldown_secs = new.general.me_config_apply_cooldown_secs; + cfg.general.me_snapshot_require_http_2xx = new.general.me_snapshot_require_http_2xx; + cfg.general.me_snapshot_reject_empty_map = new.general.me_snapshot_reject_empty_map; + cfg.general.me_snapshot_min_proxy_for_lines = new.general.me_snapshot_min_proxy_for_lines; + cfg.general.proxy_secret_stable_snapshots = new.general.proxy_secret_stable_snapshots; + cfg.general.proxy_secret_rotate_runtime = new.general.proxy_secret_rotate_runtime; + cfg.general.proxy_secret_len_max = new.general.proxy_secret_len_max; + cfg.general.telemetry = new.general.telemetry.clone(); + cfg.general.me_socks_kdf_policy = new.general.me_socks_kdf_policy; + cfg.general.me_floor_mode = new.general.me_floor_mode; + cfg.general.me_adaptive_floor_idle_secs = new.general.me_adaptive_floor_idle_secs; + cfg.general.me_adaptive_floor_min_writers_single_endpoint = + new.general.me_adaptive_floor_min_writers_single_endpoint; + cfg.general.me_adaptive_floor_recover_grace_secs = + new.general.me_adaptive_floor_recover_grace_secs; + cfg.general.me_route_backpressure_base_timeout_ms = + new.general.me_route_backpressure_base_timeout_ms; + cfg.general.me_route_backpressure_high_timeout_ms = + new.general.me_route_backpressure_high_timeout_ms; + cfg.general.me_route_backpressure_high_watermark_pct = + new.general.me_route_backpressure_high_watermark_pct; + + cfg.access.users = new.access.users.clone(); + cfg.access.user_ad_tags = new.access.user_ad_tags.clone(); + cfg.access.user_max_tcp_conns = new.access.user_max_tcp_conns.clone(); + cfg.access.user_expirations = new.access.user_expirations.clone(); + cfg.access.user_data_quota = new.access.user_data_quota.clone(); + cfg.access.user_max_unique_ips = new.access.user_max_unique_ips.clone(); + cfg.access.user_max_unique_ips_mode = new.access.user_max_unique_ips_mode; + cfg.access.user_max_unique_ips_window_secs = new.access.user_max_unique_ips_window_secs; + + cfg +} + /// Warn if any non-hot fields changed (require restart). -fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig) { +fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: bool) { + let mut warned = false; if old.server.port != new.server.port { + warned = true; warn!( "config reload: server.port changed ({} → {}); restart required", old.server.port, new.server.port @@ -125,23 +314,89 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig) { != new.server.api.minimal_runtime_cache_ttl_ms || old.server.api.read_only != new.server.api.read_only { + warned = true; warn!("config reload: server.api changed; restart required"); } + if old.server.proxy_protocol != new.server.proxy_protocol + || !listeners_equal(&old.server.listeners, &new.server.listeners) + || old.server.listen_addr_ipv4 != new.server.listen_addr_ipv4 + || old.server.listen_addr_ipv6 != new.server.listen_addr_ipv6 + || old.server.listen_tcp != new.server.listen_tcp + || old.server.listen_unix_sock != new.server.listen_unix_sock + || old.server.listen_unix_sock_perm != new.server.listen_unix_sock_perm + { + warned = true; + warn!("config reload: server listener settings changed; restart required"); + } + if old.censorship.tls_domain != new.censorship.tls_domain + || old.censorship.tls_domains != new.censorship.tls_domains + || old.censorship.mask != new.censorship.mask + || old.censorship.mask_host != new.censorship.mask_host + || old.censorship.mask_port != new.censorship.mask_port + || old.censorship.mask_unix_sock != new.censorship.mask_unix_sock + || old.censorship.fake_cert_len != new.censorship.fake_cert_len + || old.censorship.tls_emulation != new.censorship.tls_emulation + || old.censorship.tls_front_dir != new.censorship.tls_front_dir + || old.censorship.server_hello_delay_min_ms != new.censorship.server_hello_delay_min_ms + || old.censorship.server_hello_delay_max_ms != new.censorship.server_hello_delay_max_ms + || old.censorship.tls_new_session_tickets != new.censorship.tls_new_session_tickets + || old.censorship.tls_full_cert_ttl_secs != new.censorship.tls_full_cert_ttl_secs + || old.censorship.alpn_enforce != new.censorship.alpn_enforce + || old.censorship.mask_proxy_protocol != new.censorship.mask_proxy_protocol + { + warned = true; + warn!("config reload: censorship settings changed; restart required"); + } if old.censorship.tls_domain != new.censorship.tls_domain { + warned = true; warn!( "config reload: censorship.tls_domain changed ('{}' → '{}'); restart required", old.censorship.tls_domain, new.censorship.tls_domain ); } if old.network.ipv4 != new.network.ipv4 || old.network.ipv6 != new.network.ipv6 { + warned = true; warn!("config reload: network.ipv4/ipv6 changed; restart required"); } + if old.network.prefer != new.network.prefer + || old.network.multipath != new.network.multipath + || old.network.stun_use != new.network.stun_use + || old.network.stun_servers != new.network.stun_servers + || old.network.stun_tcp_fallback != new.network.stun_tcp_fallback + || old.network.http_ip_detect_urls != new.network.http_ip_detect_urls + || old.network.cache_public_ip_path != new.network.cache_public_ip_path + { + warned = true; + warn!("config reload: non-hot network settings changed; restart required"); + } if old.general.use_middle_proxy != new.general.use_middle_proxy { + warned = true; warn!("config reload: use_middle_proxy changed; restart required"); } if old.general.stun_nat_probe_concurrency != new.general.stun_nat_probe_concurrency { + warned = true; warn!("config reload: general.stun_nat_probe_concurrency changed; restart required"); } + if old.general.middle_proxy_pool_size != new.general.middle_proxy_pool_size { + warned = true; + warn!("config reload: general.middle_proxy_pool_size changed; restart required"); + } + if old.general.me_init_retry_attempts != new.general.me_init_retry_attempts { + warned = true; + warn!("config reload: general.me_init_retry_attempts changed; restart required"); + } + if old.general.me2dc_fallback != new.general.me2dc_fallback { + warned = true; + warn!("config reload: general.me2dc_fallback changed; restart required"); + } + if old.general.me_keepalive_enabled != new.general.me_keepalive_enabled + || old.general.me_keepalive_interval_secs != new.general.me_keepalive_interval_secs + || old.general.me_keepalive_jitter_secs != new.general.me_keepalive_jitter_secs + || old.general.me_keepalive_payload_random != new.general.me_keepalive_payload_random + { + warned = true; + warn!("config reload: general.me_keepalive_* changed; restart required"); + } if old.general.upstream_connect_retry_attempts != new.general.upstream_connect_retry_attempts || old.general.upstream_connect_retry_backoff_ms != new.general.upstream_connect_retry_backoff_ms @@ -151,8 +406,12 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig) { != new.general.upstream_connect_failfast_hard_errors || old.general.rpc_proxy_req_every != new.general.rpc_proxy_req_every { + warned = true; warn!("config reload: general.upstream_* changed; restart required"); } + if non_hot_changed && !warned { + warn!("config reload: one or more non-hot fields changed; restart required"); + } } /// Resolve the public host for link generation — mirrors the logic in main.rs. @@ -235,10 +494,10 @@ fn log_changes( log_tx.send(new_hot.log_level.clone()).ok(); } - if old_hot.access.user_ad_tags != new_hot.access.user_ad_tags { + if old_hot.user_ad_tags != new_hot.user_ad_tags { info!( "config reload: user_ad_tags updated ({} entries)", - new_hot.access.user_ad_tags.len(), + new_hot.user_ad_tags.len(), ); } @@ -253,13 +512,6 @@ fn log_changes( ); } - if old_hot.middle_proxy_pool_size != new_hot.middle_proxy_pool_size { - info!( - "config reload: middle_proxy_pool_size: {} → {}", - old_hot.middle_proxy_pool_size, new_hot.middle_proxy_pool_size, - ); - } - if old_hot.desync_all_full != new_hot.desync_all_full { info!( "config reload: desync_all_full: {} → {}", @@ -273,6 +525,17 @@ fn log_changes( old_hot.update_every_secs, new_hot.update_every_secs, ); } + if old_hot.me_reinit_every_secs != new_hot.me_reinit_every_secs + || old_hot.me_reinit_singleflight != new_hot.me_reinit_singleflight + || old_hot.me_reinit_coalesce_window_ms != new_hot.me_reinit_coalesce_window_ms + { + info!( + "config reload: me_reinit: interval={}s singleflight={} coalesce={}ms", + new_hot.me_reinit_every_secs, + new_hot.me_reinit_singleflight, + new_hot.me_reinit_coalesce_window_ms + ); + } if old_hot.hardswap != new_hot.hardswap { info!( @@ -301,18 +564,84 @@ fn log_changes( old_hot.me_reinit_drain_timeout_secs, new_hot.me_reinit_drain_timeout_secs, ); } - - if old_hot.me_keepalive_enabled != new_hot.me_keepalive_enabled - || old_hot.me_keepalive_interval_secs != new_hot.me_keepalive_interval_secs - || old_hot.me_keepalive_jitter_secs != new_hot.me_keepalive_jitter_secs - || old_hot.me_keepalive_payload_random != new_hot.me_keepalive_payload_random + if old_hot.me_hardswap_warmup_delay_min_ms != new_hot.me_hardswap_warmup_delay_min_ms + || old_hot.me_hardswap_warmup_delay_max_ms != new_hot.me_hardswap_warmup_delay_max_ms + || old_hot.me_hardswap_warmup_extra_passes != new_hot.me_hardswap_warmup_extra_passes + || old_hot.me_hardswap_warmup_pass_backoff_base_ms + != new_hot.me_hardswap_warmup_pass_backoff_base_ms { info!( - "config reload: me_keepalive: enabled={} interval={}s jitter={}s random_payload={}", - new_hot.me_keepalive_enabled, - new_hot.me_keepalive_interval_secs, - new_hot.me_keepalive_jitter_secs, - new_hot.me_keepalive_payload_random, + "config reload: me_hardswap_warmup: min={}ms max={}ms extra_passes={} pass_backoff={}ms", + new_hot.me_hardswap_warmup_delay_min_ms, + new_hot.me_hardswap_warmup_delay_max_ms, + new_hot.me_hardswap_warmup_extra_passes, + new_hot.me_hardswap_warmup_pass_backoff_base_ms + ); + } + if old_hot.me_bind_stale_mode != new_hot.me_bind_stale_mode + || old_hot.me_bind_stale_ttl_secs != new_hot.me_bind_stale_ttl_secs + { + info!( + "config reload: me_bind_stale: mode={:?} ttl={}s", + new_hot.me_bind_stale_mode, + new_hot.me_bind_stale_ttl_secs + ); + } + if old_hot.me_secret_atomic_snapshot != new_hot.me_secret_atomic_snapshot + || old_hot.me_deterministic_writer_sort != new_hot.me_deterministic_writer_sort + { + info!( + "config reload: me_runtime_flags: secret_atomic_snapshot={} deterministic_sort={}", + new_hot.me_secret_atomic_snapshot, + new_hot.me_deterministic_writer_sort + ); + } + if old_hot.me_single_endpoint_shadow_writers != new_hot.me_single_endpoint_shadow_writers + || old_hot.me_single_endpoint_outage_mode_enabled + != new_hot.me_single_endpoint_outage_mode_enabled + || old_hot.me_single_endpoint_outage_disable_quarantine + != new_hot.me_single_endpoint_outage_disable_quarantine + || old_hot.me_single_endpoint_outage_backoff_min_ms + != new_hot.me_single_endpoint_outage_backoff_min_ms + || old_hot.me_single_endpoint_outage_backoff_max_ms + != new_hot.me_single_endpoint_outage_backoff_max_ms + || old_hot.me_single_endpoint_shadow_rotate_every_secs + != new_hot.me_single_endpoint_shadow_rotate_every_secs + { + info!( + "config reload: me_single_endpoint: shadow={} outage_enabled={} disable_quarantine={} backoff=[{}..{}]ms rotate={}s", + new_hot.me_single_endpoint_shadow_writers, + new_hot.me_single_endpoint_outage_mode_enabled, + new_hot.me_single_endpoint_outage_disable_quarantine, + new_hot.me_single_endpoint_outage_backoff_min_ms, + new_hot.me_single_endpoint_outage_backoff_max_ms, + new_hot.me_single_endpoint_shadow_rotate_every_secs + ); + } + if old_hot.me_config_stable_snapshots != new_hot.me_config_stable_snapshots + || old_hot.me_config_apply_cooldown_secs != new_hot.me_config_apply_cooldown_secs + || old_hot.me_snapshot_require_http_2xx != new_hot.me_snapshot_require_http_2xx + || old_hot.me_snapshot_reject_empty_map != new_hot.me_snapshot_reject_empty_map + || old_hot.me_snapshot_min_proxy_for_lines != new_hot.me_snapshot_min_proxy_for_lines + { + info!( + "config reload: me_snapshot_guard: stable={} cooldown={}s require_2xx={} reject_empty={} min_proxy_for={}", + new_hot.me_config_stable_snapshots, + new_hot.me_config_apply_cooldown_secs, + new_hot.me_snapshot_require_http_2xx, + new_hot.me_snapshot_reject_empty_map, + new_hot.me_snapshot_min_proxy_for_lines + ); + } + if old_hot.proxy_secret_stable_snapshots != new_hot.proxy_secret_stable_snapshots + || old_hot.proxy_secret_rotate_runtime != new_hot.proxy_secret_rotate_runtime + || old_hot.proxy_secret_len_max != new_hot.proxy_secret_len_max + { + info!( + "config reload: proxy_secret_runtime: stable={} rotate={} len_max={}", + new_hot.proxy_secret_stable_snapshots, + new_hot.proxy_secret_rotate_runtime, + new_hot.proxy_secret_len_max ); } @@ -367,21 +696,21 @@ fn log_changes( ); } - if old_hot.access.users != new_hot.access.users { - let mut added: Vec<&String> = new_hot.access.users.keys() - .filter(|u| !old_hot.access.users.contains_key(*u)) + if old_hot.users != new_hot.users { + let mut added: Vec<&String> = new_hot.users.keys() + .filter(|u| !old_hot.users.contains_key(*u)) .collect(); added.sort(); - let mut removed: Vec<&String> = old_hot.access.users.keys() - .filter(|u| !new_hot.access.users.contains_key(*u)) + let mut removed: Vec<&String> = old_hot.users.keys() + .filter(|u| !new_hot.users.contains_key(*u)) .collect(); removed.sort(); - let mut changed: Vec<&String> = new_hot.access.users.keys() + let mut changed: Vec<&String> = new_hot.users.keys() .filter(|u| { - old_hot.access.users.get(*u) - .map(|s| s != &new_hot.access.users[*u]) + old_hot.users.get(*u) + .map(|s| s != &new_hot.users[*u]) .unwrap_or(false) }) .collect(); @@ -395,7 +724,7 @@ fn log_changes( let host = resolve_link_host(new_cfg, detected_ip_v4, detected_ip_v6); let port = new_cfg.general.links.public_port.unwrap_or(new_cfg.server.port); for user in &added { - if let Some(secret) = new_hot.access.users.get(*user) { + if let Some(secret) = new_hot.users.get(*user) { print_user_links(user, secret, &host, port, new_cfg); } } @@ -414,28 +743,38 @@ fn log_changes( } } - if old_hot.access.user_max_tcp_conns != new_hot.access.user_max_tcp_conns { + if old_hot.user_max_tcp_conns != new_hot.user_max_tcp_conns { info!( "config reload: user_max_tcp_conns updated ({} entries)", - new_hot.access.user_max_tcp_conns.len() + new_hot.user_max_tcp_conns.len() ); } - if old_hot.access.user_expirations != new_hot.access.user_expirations { + if old_hot.user_expirations != new_hot.user_expirations { info!( "config reload: user_expirations updated ({} entries)", - new_hot.access.user_expirations.len() + new_hot.user_expirations.len() ); } - if old_hot.access.user_data_quota != new_hot.access.user_data_quota { + if old_hot.user_data_quota != new_hot.user_data_quota { info!( "config reload: user_data_quota updated ({} entries)", - new_hot.access.user_data_quota.len() + new_hot.user_data_quota.len() ); } - if old_hot.access.user_max_unique_ips != new_hot.access.user_max_unique_ips { + if old_hot.user_max_unique_ips != new_hot.user_max_unique_ips { info!( "config reload: user_max_unique_ips updated ({} entries)", - new_hot.access.user_max_unique_ips.len() + new_hot.user_max_unique_ips.len() + ); + } + if old_hot.user_max_unique_ips_mode != new_hot.user_max_unique_ips_mode + || old_hot.user_max_unique_ips_window_secs + != new_hot.user_max_unique_ips_window_secs + { + info!( + "config reload: user_max_unique_ips policy mode={:?} window={}s", + new_hot.user_max_unique_ips_mode, + new_hot.user_max_unique_ips_window_secs ); } } @@ -462,15 +801,22 @@ fn reload_config( } let old_cfg = config_tx.borrow().clone(); + let applied_cfg = overlay_hot_fields(&old_cfg, &new_cfg); let old_hot = HotFields::from_config(&old_cfg); - let new_hot = HotFields::from_config(&new_cfg); + let applied_hot = HotFields::from_config(&applied_cfg); + let non_hot_changed = !config_equal(&applied_cfg, &new_cfg); + let hot_changed = old_hot != applied_hot; - if old_hot == new_hot { + if non_hot_changed { + warn_non_hot_changes(&old_cfg, &new_cfg, non_hot_changed); + } + + if !hot_changed { return; } - if old_hot.dns_overrides != new_hot.dns_overrides - && let Err(e) = crate::network::dns_overrides::install_entries(&new_hot.dns_overrides) + if old_hot.dns_overrides != applied_hot.dns_overrides + && let Err(e) = crate::network::dns_overrides::install_entries(&applied_hot.dns_overrides) { error!( "config reload: invalid network.dns_overrides: {}; keeping old config", @@ -479,9 +825,15 @@ fn reload_config( return; } - warn_non_hot_changes(&old_cfg, &new_cfg); - log_changes(&old_hot, &new_hot, &new_cfg, log_tx, detected_ip_v4, detected_ip_v6); - config_tx.send(Arc::new(new_cfg)).ok(); + log_changes( + &old_hot, + &applied_hot, + &applied_cfg, + log_tx, + detected_ip_v4, + detected_ip_v6, + ); + config_tx.send(Arc::new(applied_cfg)).ok(); } // ── Public API ──────────────────────────────────────────────────────────────── @@ -607,3 +959,80 @@ pub fn spawn_config_watcher( (config_rx, log_rx) } + +#[cfg(test)] +mod tests { + use super::*; + + fn sample_config() -> ProxyConfig { + ProxyConfig::default() + } + + #[test] + fn overlay_applies_hot_and_preserves_non_hot() { + let old = sample_config(); + let mut new = old.clone(); + new.general.hardswap = !old.general.hardswap; + new.server.port = old.server.port.saturating_add(1); + + let applied = overlay_hot_fields(&old, &new); + assert_eq!(applied.general.hardswap, new.general.hardswap); + assert_eq!(applied.server.port, old.server.port); + } + + #[test] + fn non_hot_only_change_does_not_change_hot_snapshot() { + let old = sample_config(); + let mut new = old.clone(); + new.server.port = old.server.port.saturating_add(1); + + let applied = overlay_hot_fields(&old, &new); + assert_eq!(HotFields::from_config(&old), HotFields::from_config(&applied)); + assert_eq!(applied.server.port, old.server.port); + } + + #[test] + fn bind_stale_mode_is_hot() { + let old = sample_config(); + let mut new = old.clone(); + new.general.me_bind_stale_mode = match old.general.me_bind_stale_mode { + MeBindStaleMode::Never => MeBindStaleMode::Ttl, + MeBindStaleMode::Ttl => MeBindStaleMode::Always, + MeBindStaleMode::Always => MeBindStaleMode::Never, + }; + + let applied = overlay_hot_fields(&old, &new); + assert_eq!( + applied.general.me_bind_stale_mode, + new.general.me_bind_stale_mode + ); + assert_ne!(HotFields::from_config(&old), HotFields::from_config(&applied)); + } + + #[test] + fn keepalive_is_not_hot() { + let old = sample_config(); + let mut new = old.clone(); + new.general.me_keepalive_interval_secs = old.general.me_keepalive_interval_secs + 5; + + let applied = overlay_hot_fields(&old, &new); + assert_eq!( + applied.general.me_keepalive_interval_secs, + old.general.me_keepalive_interval_secs + ); + assert_eq!(HotFields::from_config(&old), HotFields::from_config(&applied)); + } + + #[test] + fn mixed_hot_and_non_hot_change_applies_only_hot_subset() { + let old = sample_config(); + let mut new = old.clone(); + new.general.hardswap = !old.general.hardswap; + new.general.use_middle_proxy = !old.general.use_middle_proxy; + + let applied = overlay_hot_fields(&old, &new); + assert_eq!(applied.general.hardswap, new.general.hardswap); + assert_eq!(applied.general.use_middle_proxy, old.general.use_middle_proxy); + assert!(!config_equal(&applied, &new)); + } +} diff --git a/src/config/load.rs b/src/config/load.rs index b469299..a2ee5f0 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -237,6 +237,12 @@ impl ProxyConfig { )); } + if config.general.me_init_retry_attempts > 1_000_000 { + return Err(ProxyError::Config( + "general.me_init_retry_attempts must be within [0, 1000000]".to_string(), + )); + } + if config.general.upstream_connect_retry_attempts == 0 { return Err(ProxyError::Config( "general.upstream_connect_retry_attempts must be > 0".to_string(), @@ -257,6 +263,12 @@ impl ProxyConfig { )); } + if config.access.user_max_unique_ips_window_secs == 0 { + return Err(ProxyError::Config( + "access.user_max_unique_ips_window_secs must be > 0".to_string(), + )); + } + if config.general.me_reinit_every_secs == 0 { return Err(ProxyError::Config( "general.me_reinit_every_secs must be > 0".to_string(), @@ -653,6 +665,14 @@ mod tests { cfg.general.me_reconnect_fast_retry_count, default_me_reconnect_fast_retry_count() ); + assert_eq!( + cfg.general.me_init_retry_attempts, + default_me_init_retry_attempts() + ); + assert_eq!( + cfg.general.me2dc_fallback, + default_me2dc_fallback() + ); assert_eq!( cfg.general.me_single_endpoint_shadow_writers, default_me_single_endpoint_shadow_writers() @@ -728,6 +748,14 @@ mod tests { default_api_minimal_runtime_cache_ttl_ms() ); assert_eq!(cfg.access.users, default_access_users()); + assert_eq!( + cfg.access.user_max_unique_ips_mode, + UserMaxUniqueIpsMode::default() + ); + assert_eq!( + cfg.access.user_max_unique_ips_window_secs, + default_user_max_unique_ips_window_secs() + ); } #[test] @@ -750,6 +778,11 @@ mod tests { general.me_reconnect_fast_retry_count, default_me_reconnect_fast_retry_count() ); + assert_eq!( + general.me_init_retry_attempts, + default_me_init_retry_attempts() + ); + assert_eq!(general.me2dc_fallback, default_me2dc_fallback()); assert_eq!( general.me_single_endpoint_shadow_writers, default_me_single_endpoint_shadow_writers() diff --git a/src/config/types.rs b/src/config/types.rs index ee17108..5dc1c87 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -183,6 +183,19 @@ impl MeFloorMode { } } +/// Per-user unique source IP limit mode. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum UserMaxUniqueIpsMode { + /// Count only currently active source IPs. + #[default] + ActiveWindow, + /// Count source IPs seen within the recent time window. + TimeWindow, + /// Enforce both active and recent-window limits at the same time. + Combined, +} + /// Telemetry controls for hot-path counters and ME diagnostics. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct TelemetryConfig { @@ -340,6 +353,15 @@ pub struct GeneralConfig { #[serde(default = "default_middle_proxy_warm_standby")] pub middle_proxy_warm_standby: usize, + /// Startup retries for Middle-End pool initialization before ME→Direct fallback. + /// 0 means unlimited retries. + #[serde(default = "default_me_init_retry_attempts")] + pub me_init_retry_attempts: u32, + + /// Allow fallback from Middle-End mode to direct DC when ME startup cannot be initialized. + #[serde(default = "default_me2dc_fallback")] + pub me2dc_fallback: bool, + /// Enable ME keepalive padding frames. #[serde(default = "default_true")] pub me_keepalive_enabled: bool, @@ -667,6 +689,8 @@ impl Default for GeneralConfig { stun_nat_probe_concurrency: default_stun_nat_probe_concurrency(), middle_proxy_pool_size: default_pool_size(), middle_proxy_warm_standby: default_middle_proxy_warm_standby(), + me_init_retry_attempts: default_me_init_retry_attempts(), + me2dc_fallback: default_me2dc_fallback(), me_keepalive_enabled: default_true(), me_keepalive_interval_secs: default_keepalive_interval(), me_keepalive_jitter_secs: default_keepalive_jitter(), @@ -1045,6 +1069,12 @@ pub struct AccessConfig { #[serde(default)] pub user_max_unique_ips: HashMap, + #[serde(default)] + pub user_max_unique_ips_mode: UserMaxUniqueIpsMode, + + #[serde(default = "default_user_max_unique_ips_window_secs")] + pub user_max_unique_ips_window_secs: u64, + #[serde(default = "default_replay_check_len")] pub replay_check_len: usize, @@ -1064,6 +1094,8 @@ impl Default for AccessConfig { user_expirations: HashMap::new(), user_data_quota: HashMap::new(), user_max_unique_ips: HashMap::new(), + user_max_unique_ips_mode: UserMaxUniqueIpsMode::default(), + user_max_unique_ips_window_secs: default_user_max_unique_ips_window_secs(), replay_check_len: default_replay_check_len(), replay_window_secs: default_replay_window_secs(), ignore_time_skew: false, diff --git a/src/ip_tracker.rs b/src/ip_tracker.rs index 32fcbe3..626d591 100644 --- a/src/ip_tracker.rs +++ b/src/ip_tracker.rs @@ -1,153 +1,151 @@ -// src/ip_tracker.rs -// IP address tracking and limiting for users +// IP address tracking and per-user unique IP limiting. #![allow(dead_code)] use std::collections::{HashMap, HashSet}; use std::net::IpAddr; use std::sync::Arc; +use std::time::{Duration, Instant}; + use tokio::sync::RwLock; -/// Трекер уникальных IP-адресов для каждого пользователя MTProxy -/// -/// Предоставляет thread-safe механизм для: -/// - Отслеживания активных IP-адресов каждого пользователя -/// - Ограничения количества уникальных IP на пользователя -/// - Автоматической очистки при отключении клиентов +use crate::config::UserMaxUniqueIpsMode; + #[derive(Debug, Clone)] pub struct UserIpTracker { - /// Маппинг: Имя пользователя -> Множество активных IP-адресов active_ips: Arc>>>, - - /// Маппинг: Имя пользователя -> Максимально разрешенное количество уникальных IP + recent_ips: Arc>>>, max_ips: Arc>>, + limit_mode: Arc>, + limit_window: Arc>, } impl UserIpTracker { - /// Создать новый пустой трекер pub fn new() -> Self { Self { active_ips: Arc::new(RwLock::new(HashMap::new())), + recent_ips: Arc::new(RwLock::new(HashMap::new())), max_ips: Arc::new(RwLock::new(HashMap::new())), + limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)), + limit_window: Arc::new(RwLock::new(Duration::from_secs(30))), } } - /// Установить лимит уникальных IP для конкретного пользователя - /// - /// # Arguments - /// * `username` - Имя пользователя - /// * `max_ips` - Максимальное количество одновременно активных IP-адресов + pub async fn set_limit_policy(&self, mode: UserMaxUniqueIpsMode, window_secs: u64) { + { + let mut current_mode = self.limit_mode.write().await; + *current_mode = mode; + } + let mut current_window = self.limit_window.write().await; + *current_window = Duration::from_secs(window_secs.max(1)); + } + pub async fn set_user_limit(&self, username: &str, max_ips: usize) { let mut limits = self.max_ips.write().await; limits.insert(username.to_string(), max_ips); } - /// Загрузить лимиты из конфигурации - /// - /// # Arguments - /// * `limits` - HashMap с лимитами из config.toml - pub async fn load_limits(&self, limits: &HashMap) { - let mut max_ips = self.max_ips.write().await; - for (user, limit) in limits { - max_ips.insert(user.clone(), *limit); - } + pub async fn remove_user_limit(&self, username: &str) { + let mut limits = self.max_ips.write().await; + limits.remove(username); } - /// Проверить, может ли пользователь подключиться с данного IP-адреса - /// и добавить IP в список активных, если проверка успешна - /// - /// # Arguments - /// * `username` - Имя пользователя - /// * `ip` - IP-адрес клиента - /// - /// # Returns - /// * `Ok(())` - Подключение разрешено, IP добавлен в активные - /// * `Err(String)` - Подключение отклонено с описанием причины - pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> { - // Получаем лимит для пользователя - let max_ips = self.max_ips.read().await; - let limit = match max_ips.get(username) { - Some(limit) => *limit, - None => { - // Если лимит не задан - разрешаем безлимитный доступ - drop(max_ips); - let mut active_ips = self.active_ips.write().await; - let user_ips = active_ips - .entry(username.to_string()) - .or_insert_with(HashSet::new); - user_ips.insert(ip); - return Ok(()); - } - }; - drop(max_ips); + pub async fn load_limits(&self, limits: &HashMap) { + let mut max_ips = self.max_ips.write().await; + max_ips.clone_from(limits); + } + + fn prune_recent(user_recent: &mut HashMap, now: Instant, window: Duration) { + if user_recent.is_empty() { + return; + } + user_recent.retain(|_, seen_at| now.duration_since(*seen_at) <= window); + } + + pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> { + let limit = { + let max_ips = self.max_ips.read().await; + max_ips.get(username).copied() + }; - // Проверяем и обновляем активные IP let mut active_ips = self.active_ips.write().await; - let user_ips = active_ips + let user_active = active_ips .entry(username.to_string()) .or_insert_with(HashSet::new); - // Если IP уже есть в списке - это повторное подключение, разрешаем - if user_ips.contains(&ip) { + if limit.is_none() { + user_active.insert(ip); return Ok(()); } - // Проверяем, не превышен ли лимит - if user_ips.len() >= limit { + let limit = limit.unwrap_or_default(); + let mode = *self.limit_mode.read().await; + let window = *self.limit_window.read().await; + let now = Instant::now(); + + let mut recent_ips = self.recent_ips.write().await; + let user_recent = recent_ips + .entry(username.to_string()) + .or_insert_with(HashMap::new); + Self::prune_recent(user_recent, now, window); + + if user_active.contains(&ip) { + user_recent.insert(ip, now); + return Ok(()); + } + + let active_limit_reached = user_active.len() >= limit; + let recent_limit_reached = user_recent.len() >= limit; + let deny = match mode { + UserMaxUniqueIpsMode::ActiveWindow => active_limit_reached, + UserMaxUniqueIpsMode::TimeWindow => recent_limit_reached, + UserMaxUniqueIpsMode::Combined => active_limit_reached || recent_limit_reached, + }; + + if deny { return Err(format!( - "IP limit reached for user '{}': {}/{} unique IPs already connected", + "IP limit reached for user '{}': active={}/{} recent={}/{} mode={:?}", username, - user_ips.len(), - limit + user_active.len(), + limit, + user_recent.len(), + limit, + mode )); } - // Лимит не превышен - добавляем новый IP - user_ips.insert(ip); + user_active.insert(ip); + user_recent.insert(ip, now); Ok(()) } - /// Удалить IP-адрес из списка активных при отключении клиента - /// - /// # Arguments - /// * `username` - Имя пользователя - /// * `ip` - IP-адрес отключившегося клиента pub async fn remove_ip(&self, username: &str, ip: IpAddr) { let mut active_ips = self.active_ips.write().await; - if let Some(user_ips) = active_ips.get_mut(username) { user_ips.remove(&ip); - - // Если у пользователя не осталось активных IP - удаляем запись - // для экономии памяти if user_ips.is_empty() { active_ips.remove(username); } } + drop(active_ips); + + let mode = *self.limit_mode.read().await; + if matches!(mode, UserMaxUniqueIpsMode::ActiveWindow) { + let mut recent_ips = self.recent_ips.write().await; + if let Some(user_recent) = recent_ips.get_mut(username) { + user_recent.remove(&ip); + if user_recent.is_empty() { + recent_ips.remove(username); + } + } + } } - /// Получить текущее количество активных IP-адресов для пользователя - /// - /// # Arguments - /// * `username` - Имя пользователя - /// - /// # Returns - /// Количество уникальных активных IP-адресов pub async fn get_active_ip_count(&self, username: &str) -> usize { let active_ips = self.active_ips.read().await; - active_ips - .get(username) - .map(|ips| ips.len()) - .unwrap_or(0) + active_ips.get(username).map(|ips| ips.len()).unwrap_or(0) } - /// Получить список всех активных IP-адресов для пользователя - /// - /// # Arguments - /// * `username` - Имя пользователя - /// - /// # Returns - /// Вектор с активными IP-адресами pub async fn get_active_ips(&self, username: &str) -> Vec { let active_ips = self.active_ips.read().await; active_ips @@ -156,49 +154,38 @@ impl UserIpTracker { .unwrap_or_else(Vec::new) } - /// Получить статистику по всем пользователям - /// - /// # Returns - /// Вектор кортежей: (имя_пользователя, количество_активных_IP, лимит) pub async fn get_stats(&self) -> Vec<(String, usize, usize)> { let active_ips = self.active_ips.read().await; let max_ips = self.max_ips.read().await; let mut stats = Vec::new(); - - // Собираем статистику по пользователям с активными подключениями for (username, user_ips) in active_ips.iter() { let limit = max_ips.get(username).copied().unwrap_or(0); stats.push((username.clone(), user_ips.len(), limit)); } - - stats.sort_by(|a, b| a.0.cmp(&b.0)); // Сортируем по имени пользователя + + stats.sort_by(|a, b| a.0.cmp(&b.0)); stats } - /// Очистить все активные IP для пользователя (при необходимости) - /// - /// # Arguments - /// * `username` - Имя пользователя pub async fn clear_user_ips(&self, username: &str) { let mut active_ips = self.active_ips.write().await; active_ips.remove(username); + drop(active_ips); + + let mut recent_ips = self.recent_ips.write().await; + recent_ips.remove(username); } - /// Очистить всю статистику (использовать с осторожностью!) pub async fn clear_all(&self) { let mut active_ips = self.active_ips.write().await; active_ips.clear(); + drop(active_ips); + + let mut recent_ips = self.recent_ips.write().await; + recent_ips.clear(); } - /// Проверить, подключен ли пользователь с данного IP - /// - /// # Arguments - /// * `username` - Имя пользователя - /// * `ip` - IP-адрес для проверки - /// - /// # Returns - /// `true` если IP активен, `false` если нет pub async fn is_ip_active(&self, username: &str, ip: IpAddr) -> bool { let active_ips = self.active_ips.read().await; active_ips @@ -207,46 +194,39 @@ impl UserIpTracker { .unwrap_or(false) } - /// Получить лимит для пользователя - /// - /// # Arguments - /// * `username` - Имя пользователя - /// - /// # Returns - /// Лимит IP-адресов или None, если лимит не установлен pub async fn get_user_limit(&self, username: &str) -> Option { let max_ips = self.max_ips.read().await; max_ips.get(username).copied() } - /// Форматировать статистику в читаемый текст - /// - /// # Returns - /// Строка со статистикой для логов или мониторинга pub async fn format_stats(&self) -> String { let stats = self.get_stats().await; - + if stats.is_empty() { return String::from("No active users"); } - + let mut output = String::from("User IP Statistics:\n"); output.push_str("==================\n"); - + for (username, active_count, limit) in stats { output.push_str(&format!( "User: {:<20} Active IPs: {}/{}\n", username, active_count, - if limit > 0 { limit.to_string() } else { "unlimited".to_string() } + if limit > 0 { + limit.to_string() + } else { + "unlimited".to_string() + } )); - + let ips = self.get_active_ips(&username).await; for ip in ips { - output.push_str(&format!(" └─ {}\n", ip)); + output.push_str(&format!(" - {}\n", ip)); } } - + output } } @@ -257,10 +237,6 @@ impl Default for UserIpTracker { } } -// ============================================================================ -// ТЕСТЫ -// ============================================================================ - #[cfg(test)] mod tests { use super::*; @@ -283,14 +259,10 @@ mod tests { let ip2 = test_ipv4(192, 168, 1, 2); let ip3 = test_ipv4(192, 168, 1, 3); - // Первые два IP должны быть приняты assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); assert!(tracker.check_and_add("test_user", ip2).await.is_ok()); - - // Третий IP должен быть отклонен assert!(tracker.check_and_add("test_user", ip3).await.is_err()); - // Проверяем счетчик assert_eq!(tracker.get_active_ip_count("test_user").await, 2); } @@ -301,13 +273,8 @@ mod tests { let ip1 = test_ipv4(192, 168, 1, 1); - // Первое подключение assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); - - // Повторное подключение с того же IP должно пройти assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); - - // Счетчик не должен увеличиться assert_eq!(tracker.get_active_ip_count("test_user").await, 1); } @@ -320,36 +287,28 @@ mod tests { let ip2 = test_ipv4(192, 168, 1, 2); let ip3 = test_ipv4(192, 168, 1, 3); - // Добавляем два IP assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); assert!(tracker.check_and_add("test_user", ip2).await.is_ok()); - - // Третий не должен пройти assert!(tracker.check_and_add("test_user", ip3).await.is_err()); - // Удаляем первый IP tracker.remove_ip("test_user", ip1).await; - - // Теперь третий должен пройти + assert!(tracker.check_and_add("test_user", ip3).await.is_ok()); - assert_eq!(tracker.get_active_ip_count("test_user").await, 2); } #[tokio::test] async fn test_no_limit() { let tracker = UserIpTracker::new(); - // Не устанавливаем лимит для test_user let ip1 = test_ipv4(192, 168, 1, 1); let ip2 = test_ipv4(192, 168, 1, 2); let ip3 = test_ipv4(192, 168, 1, 3); - // Без лимита все IP должны проходить assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); assert!(tracker.check_and_add("test_user", ip2).await.is_ok()); assert!(tracker.check_and_add("test_user", ip3).await.is_ok()); - + assert_eq!(tracker.get_active_ip_count("test_user").await, 3); } @@ -362,11 +321,9 @@ mod tests { let ip1 = test_ipv4(192, 168, 1, 1); let ip2 = test_ipv4(192, 168, 1, 2); - // user1 может использовать 2 IP assert!(tracker.check_and_add("user1", ip1).await.is_ok()); assert!(tracker.check_and_add("user1", ip2).await.is_ok()); - // user2 может использовать только 1 IP assert!(tracker.check_and_add("user2", ip1).await.is_ok()); assert!(tracker.check_and_add("user2", ip2).await.is_err()); } @@ -379,10 +336,9 @@ mod tests { let ipv4 = test_ipv4(192, 168, 1, 1); let ipv6 = test_ipv6(); - // Должны работать оба типа адресов assert!(tracker.check_and_add("test_user", ipv4).await.is_ok()); assert!(tracker.check_and_add("test_user", ipv6).await.is_ok()); - + assert_eq!(tracker.get_active_ip_count("test_user").await, 2); } @@ -417,8 +373,7 @@ mod tests { let stats = tracker.get_stats().await; assert_eq!(stats.len(), 2); - - // Проверяем наличие обоих пользователей в статистике + assert!(stats.iter().any(|(name, _, _)| name == "user1")); assert!(stats.iter().any(|(name, _, _)| name == "user2")); } @@ -427,10 +382,10 @@ mod tests { async fn test_clear_user_ips() { let tracker = UserIpTracker::new(); let ip1 = test_ipv4(192, 168, 1, 1); - + tracker.check_and_add("test_user", ip1).await.unwrap(); assert_eq!(tracker.get_active_ip_count("test_user").await, 1); - + tracker.clear_user_ips("test_user").await; assert_eq!(tracker.get_active_ip_count("test_user").await, 0); } @@ -440,9 +395,9 @@ mod tests { let tracker = UserIpTracker::new(); let ip1 = test_ipv4(192, 168, 1, 1); let ip2 = test_ipv4(192, 168, 1, 2); - + tracker.check_and_add("test_user", ip1).await.unwrap(); - + assert!(tracker.is_ip_active("test_user", ip1).await); assert!(!tracker.is_ip_active("test_user", ip2).await); } @@ -450,15 +405,85 @@ mod tests { #[tokio::test] async fn test_load_limits_from_config() { let tracker = UserIpTracker::new(); - + let mut config_limits = HashMap::new(); config_limits.insert("user1".to_string(), 5); config_limits.insert("user2".to_string(), 3); - + tracker.load_limits(&config_limits).await; - + assert_eq!(tracker.get_user_limit("user1").await, Some(5)); assert_eq!(tracker.get_user_limit("user2").await, Some(3)); assert_eq!(tracker.get_user_limit("user3").await, None); } + + #[tokio::test] + async fn test_load_limits_replaces_previous_map() { + let tracker = UserIpTracker::new(); + + let mut first = HashMap::new(); + first.insert("user1".to_string(), 2); + first.insert("user2".to_string(), 3); + tracker.load_limits(&first).await; + + let mut second = HashMap::new(); + second.insert("user2".to_string(), 5); + tracker.load_limits(&second).await; + + assert_eq!(tracker.get_user_limit("user1").await, None); + assert_eq!(tracker.get_user_limit("user2").await, Some(5)); + } + + #[tokio::test] + async fn test_time_window_mode_blocks_recent_ip_churn() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("test_user", 1).await; + tracker + .set_limit_policy(UserMaxUniqueIpsMode::TimeWindow, 30) + .await; + + let ip1 = test_ipv4(10, 0, 0, 1); + let ip2 = test_ipv4(10, 0, 0, 2); + + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + tracker.remove_ip("test_user", ip1).await; + assert!(tracker.check_and_add("test_user", ip2).await.is_err()); + } + + #[tokio::test] + async fn test_combined_mode_enforces_active_and_recent_limits() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("test_user", 1).await; + tracker + .set_limit_policy(UserMaxUniqueIpsMode::Combined, 30) + .await; + + let ip1 = test_ipv4(10, 0, 1, 1); + let ip2 = test_ipv4(10, 0, 1, 2); + + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + assert!(tracker.check_and_add("test_user", ip2).await.is_err()); + + tracker.remove_ip("test_user", ip1).await; + assert!(tracker.check_and_add("test_user", ip2).await.is_err()); + } + + #[tokio::test] + async fn test_time_window_expires() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("test_user", 1).await; + tracker + .set_limit_policy(UserMaxUniqueIpsMode::TimeWindow, 1) + .await; + + let ip1 = test_ipv4(10, 1, 0, 1); + let ip2 = test_ipv4(10, 1, 0, 2); + + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + tracker.remove_ip("test_user", ip1).await; + assert!(tracker.check_and_add("test_user", ip2).await.is_err()); + + tokio::time::sleep(Duration::from_millis(1100)).await; + assert!(tracker.check_and_add("test_user", ip2).await.is_ok()); + } } diff --git a/src/main.rs b/src/main.rs index 0aec195..c4a9c37 100644 --- a/src/main.rs +++ b/src/main.rs @@ -416,13 +416,19 @@ async fn main() -> std::result::Result<(), Box> { log_probe_result(&probe, &decision); let prefer_ipv6 = decision.prefer_ipv6(); - let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me); + let mut use_middle_proxy = config.general.use_middle_proxy; let beobachten = Arc::new(BeobachtenStore::new()); let rng = Arc::new(SecureRandom::new()); // IP Tracker initialization let ip_tracker = Arc::new(UserIpTracker::new()); ip_tracker.load_limits(&config.access.user_max_unique_ips).await; + ip_tracker + .set_limit_policy( + config.access.user_max_unique_ips_mode, + config.access.user_max_unique_ips_window_secs, + ) + .await; if !config.access.user_max_unique_ips.is_empty() { info!("IP limits configured for {} users", config.access.user_max_unique_ips.len()); @@ -437,9 +443,17 @@ async fn main() -> std::result::Result<(), Box> { // Connection concurrency limit let max_connections = Arc::new(Semaphore::new(10_000)); + let me2dc_fallback = config.general.me2dc_fallback; + let me_init_retry_attempts = config.general.me_init_retry_attempts; if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me { - warn!("No usable IP family for Middle Proxy detected; falling back to direct DC"); - use_middle_proxy = false; + if me2dc_fallback { + warn!("No usable IP family for Middle Proxy detected; falling back to direct DC"); + use_middle_proxy = false; + } else { + warn!( + "No usable IP family for Middle Proxy detected; me2dc_fallback=false, ME init retries stay active" + ); + } } // ===================================================================== @@ -469,142 +483,189 @@ async fn main() -> std::result::Result<(), Box> { // proxy-secret is from: https://core.telegram.org/getProxySecret // ============================================================= let proxy_secret_path = config.general.proxy_secret_path.as_deref(); - match crate::transport::middle_proxy::fetch_proxy_secret( - proxy_secret_path, - config.general.proxy_secret_len_max, - ) - .await - { - Ok(proxy_secret) => { - info!( - secret_len = proxy_secret.len(), - key_sig = format_args!( - "0x{:08x}", - if proxy_secret.len() >= 4 { - u32::from_le_bytes([ - proxy_secret[0], - proxy_secret[1], - proxy_secret[2], - proxy_secret[3], - ]) - } else { - 0 - } - ), - "Proxy-secret loaded" - ); + let pool_size = config.general.middle_proxy_pool_size.max(1); + let mut init_attempt: u32 = 0; + loop { + init_attempt = init_attempt.saturating_add(1); - // Load ME config (v4/v6) + default DC - let mut cfg_v4 = fetch_proxy_config( - "https://core.telegram.org/getProxyConfig", - ) - .await - .unwrap_or_default(); - let mut cfg_v6 = fetch_proxy_config( - "https://core.telegram.org/getProxyConfigV6", - ) - .await - .unwrap_or_default(); - - if cfg_v4.map.is_empty() { - cfg_v4.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V4.clone(); - } - if cfg_v6.map.is_empty() { - cfg_v6.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V6.clone(); - } - - let pool = MePool::new( - proxy_tag, - proxy_secret, - config.general.middle_proxy_nat_ip, - me_nat_probe, - None, - config.network.stun_servers.clone(), - config.general.stun_nat_probe_concurrency, - probe.detected_ipv6, - config.timeouts.me_one_retry, - config.timeouts.me_one_timeout_ms, - cfg_v4.map.clone(), - cfg_v6.map.clone(), - cfg_v4.default_dc.or(cfg_v6.default_dc), - decision.clone(), - Some(upstream_manager.clone()), - rng.clone(), - stats.clone(), - config.general.me_keepalive_enabled, - config.general.me_keepalive_interval_secs, - config.general.me_keepalive_jitter_secs, - config.general.me_keepalive_payload_random, - config.general.rpc_proxy_req_every, - config.general.me_warmup_stagger_enabled, - config.general.me_warmup_step_delay_ms, - config.general.me_warmup_step_jitter_ms, - config.general.me_reconnect_max_concurrent_per_dc, - config.general.me_reconnect_backoff_base_ms, - config.general.me_reconnect_backoff_cap_ms, - config.general.me_reconnect_fast_retry_count, - config.general.me_single_endpoint_shadow_writers, - config.general.me_single_endpoint_outage_mode_enabled, - config.general.me_single_endpoint_outage_disable_quarantine, - config.general.me_single_endpoint_outage_backoff_min_ms, - config.general.me_single_endpoint_outage_backoff_max_ms, - config.general.me_single_endpoint_shadow_rotate_every_secs, - config.general.me_floor_mode, - config.general.me_adaptive_floor_idle_secs, - config.general.me_adaptive_floor_min_writers_single_endpoint, - config.general.me_adaptive_floor_recover_grace_secs, - config.general.hardswap, - config.general.me_pool_drain_ttl_secs, - config.general.effective_me_pool_force_close_secs(), - config.general.me_pool_min_fresh_ratio, - config.general.me_hardswap_warmup_delay_min_ms, - config.general.me_hardswap_warmup_delay_max_ms, - config.general.me_hardswap_warmup_extra_passes, - config.general.me_hardswap_warmup_pass_backoff_base_ms, - config.general.me_bind_stale_mode, - config.general.me_bind_stale_ttl_secs, - config.general.me_secret_atomic_snapshot, - config.general.me_deterministic_writer_sort, - config.general.me_socks_kdf_policy, - config.general.me_route_backpressure_base_timeout_ms, - config.general.me_route_backpressure_high_timeout_ms, - config.general.me_route_backpressure_high_watermark_pct, - ); - - let pool_size = config.general.middle_proxy_pool_size.max(1); - loop { - match pool.init(pool_size, &rng).await { - Ok(()) => { - info!("Middle-End pool initialized successfully"); - - // Phase 4: Start health monitor - let pool_clone = pool.clone(); - let rng_clone = rng.clone(); - let min_conns = pool_size; - tokio::spawn(async move { - crate::transport::middle_proxy::me_health_monitor( - pool_clone, rng_clone, min_conns, - ) - .await; - }); - - break Some(pool); - } - Err(e) => { - warn!( - error = %e, - retry_in_secs = 2, - "ME pool is not ready yet; retrying startup initialization" - ); - pool.reset_stun_state(); - tokio::time::sleep(Duration::from_secs(2)).await; - } + let proxy_secret = match crate::transport::middle_proxy::fetch_proxy_secret( + proxy_secret_path, + config.general.proxy_secret_len_max, + ) + .await + { + Ok(proxy_secret) => proxy_secret, + Err(e) => { + let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; + if retries_limited && init_attempt >= me_init_retry_attempts { + error!( + error = %e, + attempt = init_attempt, + retry_limit = me_init_retry_attempts, + "ME startup retries exhausted while loading proxy-secret; falling back to direct mode" + ); + break None; } + + warn!( + error = %e, + attempt = init_attempt, + retry_limit = if me_init_retry_attempts == 0 { + String::from("unlimited") + } else { + me_init_retry_attempts.to_string() + }, + me2dc_fallback = me2dc_fallback, + retry_in_secs = 2, + "Failed to fetch proxy-secret; retrying ME startup" + ); + tokio::time::sleep(Duration::from_secs(2)).await; + continue; } + }; + + info!( + secret_len = proxy_secret.len(), + key_sig = format_args!( + "0x{:08x}", + if proxy_secret.len() >= 4 { + u32::from_le_bytes([ + proxy_secret[0], + proxy_secret[1], + proxy_secret[2], + proxy_secret[3], + ]) + } else { + 0 + } + ), + "Proxy-secret loaded" + ); + + // Load ME config (v4/v6) + default DC + let mut cfg_v4 = fetch_proxy_config( + "https://core.telegram.org/getProxyConfig", + ) + .await + .unwrap_or_default(); + let mut cfg_v6 = fetch_proxy_config( + "https://core.telegram.org/getProxyConfigV6", + ) + .await + .unwrap_or_default(); + + if cfg_v4.map.is_empty() { + cfg_v4.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V4.clone(); } - Err(e) => { - error!(error = %e, "Failed to fetch proxy-secret. Falling back to direct mode."); - None + if cfg_v6.map.is_empty() { + cfg_v6.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V6.clone(); + } + + let pool = MePool::new( + proxy_tag.clone(), + proxy_secret, + config.general.middle_proxy_nat_ip, + me_nat_probe, + None, + config.network.stun_servers.clone(), + config.general.stun_nat_probe_concurrency, + probe.detected_ipv6, + config.timeouts.me_one_retry, + config.timeouts.me_one_timeout_ms, + cfg_v4.map.clone(), + cfg_v6.map.clone(), + cfg_v4.default_dc.or(cfg_v6.default_dc), + decision.clone(), + Some(upstream_manager.clone()), + rng.clone(), + stats.clone(), + config.general.me_keepalive_enabled, + config.general.me_keepalive_interval_secs, + config.general.me_keepalive_jitter_secs, + config.general.me_keepalive_payload_random, + config.general.rpc_proxy_req_every, + config.general.me_warmup_stagger_enabled, + config.general.me_warmup_step_delay_ms, + config.general.me_warmup_step_jitter_ms, + config.general.me_reconnect_max_concurrent_per_dc, + config.general.me_reconnect_backoff_base_ms, + config.general.me_reconnect_backoff_cap_ms, + config.general.me_reconnect_fast_retry_count, + config.general.me_single_endpoint_shadow_writers, + config.general.me_single_endpoint_outage_mode_enabled, + config.general.me_single_endpoint_outage_disable_quarantine, + config.general.me_single_endpoint_outage_backoff_min_ms, + config.general.me_single_endpoint_outage_backoff_max_ms, + config.general.me_single_endpoint_shadow_rotate_every_secs, + config.general.me_floor_mode, + config.general.me_adaptive_floor_idle_secs, + config.general.me_adaptive_floor_min_writers_single_endpoint, + config.general.me_adaptive_floor_recover_grace_secs, + config.general.hardswap, + config.general.me_pool_drain_ttl_secs, + config.general.effective_me_pool_force_close_secs(), + config.general.me_pool_min_fresh_ratio, + config.general.me_hardswap_warmup_delay_min_ms, + config.general.me_hardswap_warmup_delay_max_ms, + config.general.me_hardswap_warmup_extra_passes, + config.general.me_hardswap_warmup_pass_backoff_base_ms, + config.general.me_bind_stale_mode, + config.general.me_bind_stale_ttl_secs, + config.general.me_secret_atomic_snapshot, + config.general.me_deterministic_writer_sort, + config.general.me_socks_kdf_policy, + config.general.me_route_backpressure_base_timeout_ms, + config.general.me_route_backpressure_high_timeout_ms, + config.general.me_route_backpressure_high_watermark_pct, + ); + + match pool.init(pool_size, &rng).await { + Ok(()) => { + info!( + attempt = init_attempt, + "Middle-End pool initialized successfully" + ); + + // Phase 4: Start health monitor + let pool_clone = pool.clone(); + let rng_clone = rng.clone(); + let min_conns = pool_size; + tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + pool_clone, rng_clone, min_conns, + ) + .await; + }); + + break Some(pool); + } + Err(e) => { + let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; + if retries_limited && init_attempt >= me_init_retry_attempts { + error!( + error = %e, + attempt = init_attempt, + retry_limit = me_init_retry_attempts, + "ME pool init retries exhausted; falling back to direct mode" + ); + break None; + } + + warn!( + error = %e, + attempt = init_attempt, + retry_limit = if me_init_retry_attempts == 0 { + String::from("unlimited") + } else { + me_init_retry_attempts.to_string() + }, + me2dc_fallback = me2dc_fallback, + retry_in_secs = 2, + "ME pool is not ready yet; retrying startup initialization" + ); + pool.reset_stun_state(); + tokio::time::sleep(Duration::from_secs(2)).await; + } } } } else { @@ -847,6 +908,51 @@ async fn main() -> std::result::Result<(), Box> { } }); + let ip_tracker_policy = ip_tracker.clone(); + let mut config_rx_ip_limits = config_rx.clone(); + tokio::spawn(async move { + let mut prev_limits = config_rx_ip_limits + .borrow() + .access + .user_max_unique_ips + .clone(); + let mut prev_mode = config_rx_ip_limits + .borrow() + .access + .user_max_unique_ips_mode; + let mut prev_window = config_rx_ip_limits + .borrow() + .access + .user_max_unique_ips_window_secs; + + loop { + if config_rx_ip_limits.changed().await.is_err() { + break; + } + let cfg = config_rx_ip_limits.borrow_and_update().clone(); + + if prev_limits != cfg.access.user_max_unique_ips { + ip_tracker_policy + .load_limits(&cfg.access.user_max_unique_ips) + .await; + prev_limits = cfg.access.user_max_unique_ips.clone(); + } + + if prev_mode != cfg.access.user_max_unique_ips_mode + || prev_window != cfg.access.user_max_unique_ips_window_secs + { + ip_tracker_policy + .set_limit_policy( + cfg.access.user_max_unique_ips_mode, + cfg.access.user_max_unique_ips_window_secs, + ) + .await; + prev_mode = cfg.access.user_max_unique_ips_mode; + prev_window = cfg.access.user_max_unique_ips_window_secs; + } + } + }); + let beobachten_writer = beobachten.clone(); let config_rx_beobachten = config_rx.clone(); tokio::spawn(async move {