mirror of
https://github.com/telemt/telemt.git
synced 2026-04-16 18:14:10 +03:00
Compare commits
32 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aae3e2665e | ||
|
|
a5c7a41c49 | ||
|
|
7cc78a5746 | ||
|
|
cf96e686d1 | ||
|
|
d4d867156a | ||
|
|
8c1d66a03e | ||
|
|
6ff29e43d3 | ||
|
|
208020817a | ||
|
|
6864f49292 | ||
|
|
726fb77ccc | ||
|
|
69be44b2b6 | ||
|
|
07ca94ce57 | ||
|
|
d050c4794a | ||
|
|
197f9867e0 | ||
|
|
78dfc2bc39 | ||
|
|
fcf37a1a69 | ||
|
|
cc9e71a737 | ||
|
|
eb96fcbf76 | ||
|
|
ad167f9b1a | ||
|
|
df7bd39f25 | ||
|
|
f4c047748d | ||
|
|
c5f5b43494 | ||
|
|
b2aaf404e1 | ||
|
|
d552ae84d0 | ||
|
|
3ab56f55e9 | ||
|
|
06d2cdef78 | ||
|
|
1be4422431 | ||
|
|
3d3428ad4d | ||
|
|
eaff96b8c1 | ||
|
|
7bf6f3e071 | ||
|
|
c3ebb42120 | ||
|
|
6ce25c6600 |
26
.github/workflows/release.yml
vendored
26
.github/workflows/release.yml
vendored
@@ -84,6 +84,32 @@ jobs:
|
|||||||
target/${{ matrix.target }}/release/${{ matrix.asset_name }}.tar.gz
|
target/${{ matrix.target }}/release/${{ matrix.asset_name }}.tar.gz
|
||||||
target/${{ matrix.target }}/release/${{ matrix.asset_name }}.sha256
|
target/${{ matrix.target }}/release/${{ matrix.asset_name }}.sha256
|
||||||
|
|
||||||
|
build-docker-image:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v3
|
||||||
|
|
||||||
|
- name: Set up QEMU
|
||||||
|
uses: docker/setup-qemu-action@v3
|
||||||
|
|
||||||
|
- name: Set up Docker Buildx
|
||||||
|
uses: docker/setup-buildx-action@v2
|
||||||
|
|
||||||
|
- name: Login to GitHub Container Registry
|
||||||
|
uses: docker/login-action@v2
|
||||||
|
with:
|
||||||
|
registry: ghcr.io
|
||||||
|
username: ${{ github.repository_owner }}
|
||||||
|
password: ${{ secrets.TOKEN_GH_DEPLOY }}
|
||||||
|
|
||||||
|
- name: Build and push
|
||||||
|
uses: docker/build-push-action@v6
|
||||||
|
with:
|
||||||
|
context: .
|
||||||
|
push: true
|
||||||
|
tags: ${{ github.ref }}
|
||||||
|
|
||||||
release:
|
release:
|
||||||
name: Create Release
|
name: Create Release
|
||||||
needs: build
|
needs: build
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
## System Prompt — Production Rust Codebase: Modification and Architecture Guidelines
|
## System Prompt — Production Rust Codebase: Modification and Architecture Guidelines
|
||||||
|
|
||||||
You are a senior Rust systems engineer acting as a strict code reviewer and implementation partner. Your responses are precise, minimal, and architecturally sound. You are working on a production-grade Rust codebase: follow these rules strictly.
|
You are a senior Rust Engineer and pricipal Rust Architect acting as a strict code reviewer and implementation partner.
|
||||||
|
Your responses are precise, minimal, and architecturally sound. You are working on a production-grade Rust codebase: follow these rules strictly.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -32,6 +33,11 @@ The user can override this behavior with explicit commands:
|
|||||||
- `"Make minimal changes"` — no coordinated fixes, narrowest possible diff.
|
- `"Make minimal changes"` — no coordinated fixes, narrowest possible diff.
|
||||||
- `"Fix everything"` — apply all coordinated fixes and out-of-scope observations.
|
- `"Fix everything"` — apply all coordinated fixes and out-of-scope observations.
|
||||||
|
|
||||||
|
### Core Rule
|
||||||
|
|
||||||
|
The codebase must never enter an invalid intermediate state.
|
||||||
|
No response may leave the repository in a condition that requires follow-up fixes.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
### 1. Comments and Documentation
|
### 1. Comments and Documentation
|
||||||
@@ -131,16 +137,32 @@ You MUST:
|
|||||||
- Document non-obvious logic with comments that describe *why*, not *what*.
|
- Document non-obvious logic with comments that describe *why*, not *what*.
|
||||||
- Limit changes strictly to the requested scope (plus coordinated fixes per Section 0).
|
- Limit changes strictly to the requested scope (plus coordinated fixes per Section 0).
|
||||||
- Keep all existing symbol names unless renaming is explicitly requested.
|
- Keep all existing symbol names unless renaming is explicitly requested.
|
||||||
- Preserve global formatting as-is.
|
- Preserve global formatting as-is
|
||||||
|
- Result every modification in a self-contained, compilable, runnable state of the codebase
|
||||||
|
|
||||||
You MUST NOT:
|
You MUST NOT:
|
||||||
|
|
||||||
- Use placeholders: no `// ... rest of code`, no `// implement here`, no `/* TODO */` stubs that replace existing working code. Write full, working implementation. If the implementation is unclear, ask first.
|
- Use placeholders: no `// ... rest of code`, no `// implement here`, no `/* TODO */` stubs that replace existing working code. Write full, working implementation. If the implementation is unclear, ask first
|
||||||
- Refactor code outside the requested scope.
|
- Refactor code outside the requested scope
|
||||||
- Make speculative improvements.
|
- Make speculative improvements
|
||||||
|
- Spawn multiple agents for EDITING
|
||||||
|
- Produce partial changes
|
||||||
|
- Introduce references to entities that are not yet implemented
|
||||||
|
- Leave TODO placeholders in production paths
|
||||||
|
|
||||||
Note: `todo!()` and `unimplemented!()` are allowed as idiomatic Rust markers for genuinely unfinished code paths.
|
Note: `todo!()` and `unimplemented!()` are allowed as idiomatic Rust markers for genuinely unfinished code paths.
|
||||||
|
|
||||||
|
Every change must:
|
||||||
|
- compile,
|
||||||
|
- pass type checks,
|
||||||
|
- have no broken imports,
|
||||||
|
- preserve invariants,
|
||||||
|
- not rely on future patches.
|
||||||
|
|
||||||
|
If the task requires multiple phases:
|
||||||
|
- either implement all required phases,
|
||||||
|
- or explicitly refuse and explain missing dependencies.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
### 8. Decision Process for Complex Changes
|
### 8. Decision Process for Complex Changes
|
||||||
@@ -160,6 +182,7 @@ When facing a non-trivial modification, follow this sequence:
|
|||||||
- When provided with partial code, assume the rest of the codebase exists and functions correctly unless stated otherwise.
|
- When provided with partial code, assume the rest of the codebase exists and functions correctly unless stated otherwise.
|
||||||
- Reference existing types, functions, and module structures by their actual names as shown in the provided code.
|
- Reference existing types, functions, and module structures by their actual names as shown in the provided code.
|
||||||
- When the provided context is insufficient to make a safe change, request the missing context explicitly.
|
- When the provided context is insufficient to make a safe change, request the missing context explicitly.
|
||||||
|
- Spawn multiple agents for SEARCHING information, code, functions
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -167,14 +190,14 @@ When facing a non-trivial modification, follow this sequence:
|
|||||||
|
|
||||||
#### Language Policy
|
#### Language Policy
|
||||||
|
|
||||||
- Code, comments, commit messages, documentation: **English**.
|
- Code, comments, commit messages, documentation ONLY ON **English**!
|
||||||
- Reasoning and explanations in response text: **Russian**.
|
- Reasoning and explanations in response text on language from promt
|
||||||
|
|
||||||
#### Response Structure
|
#### Response Structure
|
||||||
|
|
||||||
Your response MUST consist of two sections:
|
Your response MUST consist of two sections:
|
||||||
|
|
||||||
**Section 1: `## Reasoning` (in Russian)**
|
**Section 1: `## Reasoning`**
|
||||||
|
|
||||||
- What needs to be done and why.
|
- What needs to be done and why.
|
||||||
- Which files and modules are affected.
|
- Which files and modules are affected.
|
||||||
@@ -205,3 +228,183 @@ If the response exceeds the output limit:
|
|||||||
2. List the files that will be provided in subsequent parts.
|
2. List the files that will be provided in subsequent parts.
|
||||||
3. Wait for user confirmation before continuing.
|
3. Wait for user confirmation before continuing.
|
||||||
4. No single file may be split across parts.
|
4. No single file may be split across parts.
|
||||||
|
|
||||||
|
## 11. Anti-LLM Degeneration Safeguards (Principal-Paranoid, Visionary)
|
||||||
|
|
||||||
|
This section exists to prevent common LLM failure modes: scope creep, semantic drift, cargo-cult refactors, performance regressions, contract breakage, and hidden behavior changes.
|
||||||
|
|
||||||
|
### 11.1 Non-Negotiable Invariants
|
||||||
|
|
||||||
|
- **No semantic drift:** Do not reinterpret requirements, rename concepts, or change meaning of existing terms.
|
||||||
|
- **No “helpful refactors”:** Any refactor not explicitly requested is forbidden.
|
||||||
|
- **No architectural drift:** Do not introduce new layers, patterns, abstractions, or “clean architecture” migrations unless requested.
|
||||||
|
- **No dependency drift:** Do not add crates, features, or versions unless explicitly requested.
|
||||||
|
- **No behavior drift:** If a change could alter runtime behavior, you MUST call it out explicitly in `## Reasoning` and justify it.
|
||||||
|
|
||||||
|
### 11.2 Minimal Surface Area Rule
|
||||||
|
|
||||||
|
- Touch the smallest number of files possible.
|
||||||
|
- Prefer local changes over cross-cutting edits.
|
||||||
|
- Do not “align style” across a file/module—only adjust the modified region.
|
||||||
|
- Do not reorder items, imports, or code unless required for correctness.
|
||||||
|
|
||||||
|
### 11.3 No Implicit Contract Changes
|
||||||
|
|
||||||
|
Contracts include:
|
||||||
|
- public APIs, trait bounds, visibility, error types, timeouts/retries, logging semantics, metrics semantics,
|
||||||
|
- protocol formats, framing, padding, keepalive cadence, state machine transitions,
|
||||||
|
- concurrency guarantees, cancellation behavior, backpressure behavior.
|
||||||
|
|
||||||
|
Rule:
|
||||||
|
- If you change a contract, you MUST update all dependents in the same patch AND document the contract delta explicitly.
|
||||||
|
|
||||||
|
### 11.4 Hot-Path Preservation (Performance Paranoia)
|
||||||
|
|
||||||
|
- Do not introduce extra allocations, cloning, or formatting in hot paths.
|
||||||
|
- Do not add logging/metrics on hot paths unless requested.
|
||||||
|
- Do not add new locks or broaden lock scope.
|
||||||
|
- Prefer `&str` / slices / borrowed data where the codebase already does so.
|
||||||
|
- Avoid `String` building for errors/logs if it changes current patterns.
|
||||||
|
|
||||||
|
If you cannot prove performance neutrality, label it as risk in `## Reasoning`.
|
||||||
|
|
||||||
|
### 11.5 Async / Concurrency Safety (Cancellation & Backpressure)
|
||||||
|
|
||||||
|
- No blocking calls inside async contexts.
|
||||||
|
- Preserve cancellation safety: do not introduce `await` between lock acquisition and critical invariants unless already present.
|
||||||
|
- Preserve backpressure: do not replace bounded channels with unbounded, do not remove flow control.
|
||||||
|
- Do not change task lifecycle semantics (spawn patterns, join handles, shutdown order) unless requested.
|
||||||
|
- Do not introduce `tokio::spawn` / background tasks unless explicitly requested.
|
||||||
|
|
||||||
|
### 11.6 Error Semantics Integrity
|
||||||
|
|
||||||
|
- Do not replace structured errors with generic strings.
|
||||||
|
- Do not widen/narrow error types or change error categories without explicit approval.
|
||||||
|
- Avoid introducing panics in production paths (`unwrap`, `expect`) unless the codebase already treats that path as impossible and documented.
|
||||||
|
|
||||||
|
### 11.7 “No New Abstractions” Default
|
||||||
|
|
||||||
|
Default stance:
|
||||||
|
- No new traits, generics, macros, builder patterns, type-level cleverness, or “frameworking”.
|
||||||
|
- If abstraction is necessary, prefer the smallest possible local helper (private function) and justify it.
|
||||||
|
|
||||||
|
### 11.8 Negative-Diff Protection
|
||||||
|
|
||||||
|
Avoid “diff inflation” patterns:
|
||||||
|
- mass edits,
|
||||||
|
- moving code between files,
|
||||||
|
- rewrapping long lines,
|
||||||
|
- rearranging module order,
|
||||||
|
- renaming for aesthetics.
|
||||||
|
|
||||||
|
If a diff becomes large, STOP and ask before proceeding.
|
||||||
|
|
||||||
|
### 11.9 Consistency with Existing Style (But Not Style Refactors)
|
||||||
|
|
||||||
|
- Follow existing conventions of the touched module (naming, error style, return patterns).
|
||||||
|
- Do not enforce global “best practices” that the codebase does not already use.
|
||||||
|
|
||||||
|
### 11.10 Two-Phase Safety Gate (Plan → Patch)
|
||||||
|
|
||||||
|
For non-trivial changes:
|
||||||
|
1) Provide a micro-plan (1–5 bullets): what files, what functions, what invariants, what risks.
|
||||||
|
2) Implement exactly that plan—no extra improvements.
|
||||||
|
|
||||||
|
### 11.11 Pre-Response Checklist (Hard Gate)
|
||||||
|
|
||||||
|
Before final output, verify internally:
|
||||||
|
|
||||||
|
- No unresolved symbols / broken imports.
|
||||||
|
- No partially updated call sites.
|
||||||
|
- No new public surface changes unless requested.
|
||||||
|
- No transitional states / TODO placeholders replacing working code.
|
||||||
|
- Changes are atomic: the repository remains buildable and runnable.
|
||||||
|
- Any behavior change is explicitly stated.
|
||||||
|
|
||||||
|
If any check fails: fix it before responding.
|
||||||
|
|
||||||
|
### 11.12 Truthfulness Policy (No Hallucinated Claims)
|
||||||
|
|
||||||
|
- Do not claim “this compiles” or “tests pass” unless you actually verified with the available tooling/context.
|
||||||
|
- If verification is not possible, state: “Not executed; reasoning-based consistency check only.”
|
||||||
|
|
||||||
|
### 11.13 Visionary Guardrail: Preserve Optionality
|
||||||
|
|
||||||
|
When multiple valid designs exist, prefer the one that:
|
||||||
|
- minimally constrains future evolution,
|
||||||
|
- preserves existing extension points,
|
||||||
|
- avoids locking the project into a new paradigm,
|
||||||
|
- keeps interfaces stable and implementation local.
|
||||||
|
|
||||||
|
Default to reversible changes.
|
||||||
|
|
||||||
|
### 11.14 Stop Conditions
|
||||||
|
|
||||||
|
STOP and ask targeted questions if:
|
||||||
|
- required context is missing,
|
||||||
|
- a change would cross module boundaries,
|
||||||
|
- a contract might change,
|
||||||
|
- concurrency/protocol invariants are unclear,
|
||||||
|
- the diff is growing beyond a minimal patch.
|
||||||
|
|
||||||
|
No guessing.
|
||||||
|
|
||||||
|
### 12. Invariant Preservation
|
||||||
|
|
||||||
|
You MUST explicitly preserve:
|
||||||
|
- Thread-safety guarantees (`Send` / `Sync` expectations).
|
||||||
|
- Memory safety assumptions (no hidden `unsafe` expansions).
|
||||||
|
- Lock ordering and deadlock invariants.
|
||||||
|
- State machine correctness (no new invalid transitions).
|
||||||
|
- Backward compatibility of serialized formats (if applicable).
|
||||||
|
|
||||||
|
If a change touches concurrency, networking, protocol logic, or state machines,
|
||||||
|
you MUST explain why existing invariants remain valid.
|
||||||
|
|
||||||
|
### 13. Error Handling Policy
|
||||||
|
|
||||||
|
- Do not replace structured errors with generic strings.
|
||||||
|
- Preserve existing error propagation semantics.
|
||||||
|
- Do not widen or narrow error types without approval.
|
||||||
|
- Avoid introducing panics in production paths.
|
||||||
|
- Prefer explicit error mapping over implicit conversions.
|
||||||
|
|
||||||
|
### 14. Test Safety
|
||||||
|
|
||||||
|
- Do not modify existing tests unless the task explicitly requires it.
|
||||||
|
- Do not weaken assertions.
|
||||||
|
- Preserve determinism in testable components.
|
||||||
|
|
||||||
|
### 15. Security Constraints
|
||||||
|
|
||||||
|
- Do not weaken cryptographic assumptions.
|
||||||
|
- Do not modify key derivation logic without explicit request.
|
||||||
|
- Do not change constant-time behavior.
|
||||||
|
- Do not introduce logging of secrets.
|
||||||
|
- Preserve TLS/MTProto protocol correctness.
|
||||||
|
|
||||||
|
### 16. Logging Policy
|
||||||
|
|
||||||
|
- Do not introduce excessive logging in hot paths.
|
||||||
|
- Do not log sensitive data.
|
||||||
|
- Preserve existing log levels and style.
|
||||||
|
|
||||||
|
### 17. Pre-Response Verification Checklist
|
||||||
|
|
||||||
|
Before producing the final answer, verify internally:
|
||||||
|
|
||||||
|
- The change compiles conceptually.
|
||||||
|
- No unresolved symbols exist.
|
||||||
|
- All modified call sites are updated.
|
||||||
|
- No accidental behavioral changes were introduced.
|
||||||
|
- Architectural boundaries remain intact.
|
||||||
|
|
||||||
|
### 18. Atomic Change Principle
|
||||||
|
Every patch must be **atomic and production-safe**.
|
||||||
|
* **Self-contained** — no dependency on future patches or unimplemented components.
|
||||||
|
* **Build-safe** — the project must compile successfully after the change.
|
||||||
|
* **Contract-consistent** — no partial interface or behavioral changes; all dependent code must be updated within the same patch.
|
||||||
|
* **No transitional states** — no placeholders, incomplete refactors, or temporary inconsistencies.
|
||||||
|
|
||||||
|
**Invariant:** After any single patch, the repository remains fully functional and buildable.
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "telemt"
|
name = "telemt"
|
||||||
version = "3.0.7"
|
version = "3.0.9"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
@@ -20,6 +20,7 @@ sha1 = "0.10"
|
|||||||
md-5 = "0.10"
|
md-5 = "0.10"
|
||||||
hmac = "0.12"
|
hmac = "0.12"
|
||||||
crc32fast = "1.4"
|
crc32fast = "1.4"
|
||||||
|
crc32c = "0.6"
|
||||||
zeroize = { version = "1.8", features = ["derive"] }
|
zeroize = { version = "1.8", features = ["derive"] }
|
||||||
|
|
||||||
# Network
|
# Network
|
||||||
|
|||||||
126
README.md
126
README.md
@@ -178,147 +178,21 @@ then Ctrl+X -> Y -> Enter to save
|
|||||||
```toml
|
```toml
|
||||||
# === General Settings ===
|
# === General Settings ===
|
||||||
[general]
|
[general]
|
||||||
fast_mode = true
|
|
||||||
use_middle_proxy = true
|
|
||||||
# ad_tag = "00000000000000000000000000000000"
|
# ad_tag = "00000000000000000000000000000000"
|
||||||
# Path to proxy-secret binary (auto-downloaded if missing).
|
|
||||||
proxy_secret_path = "proxy-secret"
|
|
||||||
# disable_colors = false # Disable colored output in logs (useful for files/systemd)
|
|
||||||
|
|
||||||
# === Log Level ===
|
|
||||||
# Log level: debug | verbose | normal | silent
|
|
||||||
# Can be overridden with --silent or --log-level CLI flags
|
|
||||||
# RUST_LOG env var takes absolute priority over all of these
|
|
||||||
log_level = "normal"
|
|
||||||
|
|
||||||
# === Middle Proxy - ME ===
|
|
||||||
# Public IP override for ME KDF when behind NAT; leave unset to auto-detect.
|
|
||||||
# middle_proxy_nat_ip = "203.0.113.10"
|
|
||||||
# Enable STUN probing to discover public IP:port for ME.
|
|
||||||
middle_proxy_nat_probe = true
|
|
||||||
# Primary STUN server (host:port); defaults to Telegram STUN when empty.
|
|
||||||
middle_proxy_nat_stun = "stun.l.google.com:19302"
|
|
||||||
# Optional fallback STUN servers list.
|
|
||||||
middle_proxy_nat_stun_servers = ["stun1.l.google.com:19302", "stun2.l.google.com:19302"]
|
|
||||||
# Desired number of concurrent ME writers in pool.
|
|
||||||
middle_proxy_pool_size = 16
|
|
||||||
# Pre-initialized warm-standby ME connections kept idle.
|
|
||||||
middle_proxy_warm_standby = 8
|
|
||||||
# Ignore STUN/interface mismatch and keep ME enabled even if IP differs.
|
|
||||||
stun_iface_mismatch_ignore = false
|
|
||||||
# Keepalive padding frames - fl==4
|
|
||||||
me_keepalive_enabled = true
|
|
||||||
me_keepalive_interval_secs = 25 # Period between keepalives
|
|
||||||
me_keepalive_jitter_secs = 5 # Jitter added to interval
|
|
||||||
me_keepalive_payload_random = true # Randomize 4-byte payload (vs zeros)
|
|
||||||
# Stagger extra ME connections on warmup to de-phase lifecycles.
|
|
||||||
me_warmup_stagger_enabled = true
|
|
||||||
me_warmup_step_delay_ms = 500 # Base delay between extra connects
|
|
||||||
me_warmup_step_jitter_ms = 300 # Jitter for warmup delay
|
|
||||||
# Reconnect policy knobs.
|
|
||||||
me_reconnect_max_concurrent_per_dc = 1 # Parallel reconnects per DC - EXPERIMENTAL! UNSTABLE!
|
|
||||||
me_reconnect_backoff_base_ms = 500 # Backoff start
|
|
||||||
me_reconnect_backoff_cap_ms = 30000 # Backoff cap
|
|
||||||
me_reconnect_fast_retry_count = 11 # Quick retries before backoff
|
|
||||||
|
|
||||||
[general.modes]
|
[general.modes]
|
||||||
classic = false
|
classic = false
|
||||||
secure = false
|
secure = false
|
||||||
tls = true
|
tls = true
|
||||||
|
|
||||||
[general.links]
|
|
||||||
show = "*"
|
|
||||||
# show = ["alice", "bob"] # Only show links for alice and bob
|
|
||||||
# show = "*" # Show links for all users
|
|
||||||
# public_host = "proxy.example.com" # Host (IP or domain) for tg:// links
|
|
||||||
# public_port = 443 # Port for tg:// links (default: server.port)
|
|
||||||
|
|
||||||
# === Network Parameters ===
|
|
||||||
[network]
|
|
||||||
# Enable/disable families: true/false/auto(None)
|
|
||||||
ipv4 = true
|
|
||||||
ipv6 = false # UNSTABLE WITH ME
|
|
||||||
# prefer = 4 or 6
|
|
||||||
prefer = 4
|
|
||||||
multipath = false # EXPERIMENTAL!
|
|
||||||
|
|
||||||
# === Server Binding ===
|
|
||||||
[server]
|
|
||||||
port = 443
|
|
||||||
listen_addr_ipv4 = "0.0.0.0"
|
|
||||||
listen_addr_ipv6 = "::"
|
|
||||||
# listen_unix_sock = "/var/run/telemt.sock" # Unix socket
|
|
||||||
# listen_unix_sock_perm = "0666" # Socket file permissions
|
|
||||||
# metrics_port = 9090
|
|
||||||
# metrics_whitelist = [
|
|
||||||
# "192.168.0.0/24",
|
|
||||||
# "172.16.0.0/12",
|
|
||||||
# "127.0.0.1/32",
|
|
||||||
# "::1/128"
|
|
||||||
#]
|
|
||||||
|
|
||||||
# Listen on multiple interfaces/IPs - IPv4
|
|
||||||
[[server.listeners]]
|
|
||||||
ip = "0.0.0.0"
|
|
||||||
|
|
||||||
# Listen on multiple interfaces/IPs - IPv6
|
|
||||||
[[server.listeners]]
|
|
||||||
ip = "::"
|
|
||||||
|
|
||||||
# === Timeouts (in seconds) ===
|
|
||||||
[timeouts]
|
|
||||||
client_handshake = 30
|
|
||||||
tg_connect = 10
|
|
||||||
client_keepalive = 60
|
|
||||||
client_ack = 300
|
|
||||||
# Quick ME reconnects for single-address DCs (count and per-attempt timeout, ms).
|
|
||||||
me_one_retry = 12
|
|
||||||
me_one_timeout_ms = 1200
|
|
||||||
|
|
||||||
# === Anti-Censorship & Masking ===
|
# === Anti-Censorship & Masking ===
|
||||||
[censorship]
|
[censorship]
|
||||||
tls_domain = "petrovich.ru"
|
tls_domain = "petrovich.ru"
|
||||||
mask = true
|
|
||||||
mask_port = 443
|
|
||||||
# mask_host = "petrovich.ru" # Defaults to tls_domain if not set
|
|
||||||
# mask_unix_sock = "/var/run/nginx.sock" # Unix socket (mutually exclusive with mask_host)
|
|
||||||
fake_cert_len = 2048
|
|
||||||
|
|
||||||
# === Access Control & Users ===
|
|
||||||
[access]
|
|
||||||
replay_check_len = 65536
|
|
||||||
replay_window_secs = 1800
|
|
||||||
ignore_time_skew = false
|
|
||||||
|
|
||||||
[access.users]
|
[access.users]
|
||||||
# format: "username" = "32_hex_chars_secret"
|
# format: "username" = "32_hex_chars_secret"
|
||||||
hello = "00000000000000000000000000000000"
|
hello = "00000000000000000000000000000000"
|
||||||
|
|
||||||
# [access.user_max_tcp_conns]
|
|
||||||
# hello = 50
|
|
||||||
|
|
||||||
# [access.user_max_unique_ips]
|
|
||||||
# hello = 5
|
|
||||||
|
|
||||||
# [access.user_data_quota]
|
|
||||||
# hello = 1073741824 # 1 GB
|
|
||||||
|
|
||||||
# === Upstreams & Routing ===
|
|
||||||
[[upstreams]]
|
|
||||||
type = "direct"
|
|
||||||
enabled = true
|
|
||||||
weight = 10
|
|
||||||
|
|
||||||
# [[upstreams]]
|
|
||||||
# type = "socks5"
|
|
||||||
# address = "127.0.0.1:1080"
|
|
||||||
# enabled = false
|
|
||||||
# weight = 1
|
|
||||||
|
|
||||||
# === DC Address Overrides ===
|
|
||||||
# [dc_overrides]
|
|
||||||
# "203" = "91.105.192.100:443"
|
|
||||||
|
|
||||||
```
|
```
|
||||||
### Advanced
|
### Advanced
|
||||||
#### Adtag
|
#### Adtag
|
||||||
|
|||||||
@@ -213,6 +213,7 @@ listen_addr_ipv6 = "::"
|
|||||||
|
|
||||||
[[server.listeners]]
|
[[server.listeners]]
|
||||||
ip = "0.0.0.0"
|
ip = "0.0.0.0"
|
||||||
|
# reuse_allow = false # Set true only when intentionally running multiple telemt instances on same port
|
||||||
|
|
||||||
[[server.listeners]]
|
[[server.listeners]]
|
||||||
ip = "::"
|
ip = "::"
|
||||||
|
|||||||
@@ -227,6 +227,7 @@ impl ProxyConfig {
|
|||||||
announce: None,
|
announce: None,
|
||||||
announce_ip: None,
|
announce_ip: None,
|
||||||
proxy_protocol: None,
|
proxy_protocol: None,
|
||||||
|
reuse_allow: false,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
if let Some(ipv6_str) = &config.server.listen_addr_ipv6 {
|
if let Some(ipv6_str) = &config.server.listen_addr_ipv6 {
|
||||||
@@ -236,6 +237,7 @@ impl ProxyConfig {
|
|||||||
announce: None,
|
announce: None,
|
||||||
announce_ip: None,
|
announce_ip: None,
|
||||||
proxy_protocol: None,
|
proxy_protocol: None,
|
||||||
|
reuse_allow: false,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,8 +74,8 @@ pub struct ProxyModes {
|
|||||||
impl Default for ProxyModes {
|
impl Default for ProxyModes {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
classic: true,
|
classic: false,
|
||||||
secure: true,
|
secure: false,
|
||||||
tls: true,
|
tls: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -118,7 +118,7 @@ impl Default for NetworkConfig {
|
|||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
ipv4: true,
|
ipv4: true,
|
||||||
ipv6: None,
|
ipv6: Some(false),
|
||||||
prefer: 4,
|
prefer: 4,
|
||||||
multipath: false,
|
multipath: false,
|
||||||
stun_servers: default_stun_servers(),
|
stun_servers: default_stun_servers(),
|
||||||
@@ -291,7 +291,7 @@ impl Default for GeneralConfig {
|
|||||||
middle_proxy_nat_stun: None,
|
middle_proxy_nat_stun: None,
|
||||||
middle_proxy_nat_stun_servers: Vec::new(),
|
middle_proxy_nat_stun_servers: Vec::new(),
|
||||||
middle_proxy_pool_size: default_pool_size(),
|
middle_proxy_pool_size: default_pool_size(),
|
||||||
middle_proxy_warm_standby: 0,
|
middle_proxy_warm_standby: 8,
|
||||||
me_keepalive_enabled: true,
|
me_keepalive_enabled: true,
|
||||||
me_keepalive_interval_secs: default_keepalive_interval(),
|
me_keepalive_interval_secs: default_keepalive_interval(),
|
||||||
me_keepalive_jitter_secs: default_keepalive_jitter(),
|
me_keepalive_jitter_secs: default_keepalive_jitter(),
|
||||||
@@ -299,10 +299,10 @@ impl Default for GeneralConfig {
|
|||||||
me_warmup_stagger_enabled: true,
|
me_warmup_stagger_enabled: true,
|
||||||
me_warmup_step_delay_ms: default_warmup_step_delay_ms(),
|
me_warmup_step_delay_ms: default_warmup_step_delay_ms(),
|
||||||
me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(),
|
me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(),
|
||||||
me_reconnect_max_concurrent_per_dc: 1,
|
me_reconnect_max_concurrent_per_dc: 4,
|
||||||
me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(),
|
me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(),
|
||||||
me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(),
|
me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(),
|
||||||
me_reconnect_fast_retry_count: 1,
|
me_reconnect_fast_retry_count: 8,
|
||||||
stun_iface_mismatch_ignore: false,
|
stun_iface_mismatch_ignore: false,
|
||||||
unknown_dc_log_path: default_unknown_dc_log_path(),
|
unknown_dc_log_path: default_unknown_dc_log_path(),
|
||||||
log_level: LogLevel::Normal,
|
log_level: LogLevel::Normal,
|
||||||
@@ -603,6 +603,10 @@ pub struct ListenerConfig {
|
|||||||
/// Per-listener PROXY protocol override. When set, overrides global server.proxy_protocol.
|
/// Per-listener PROXY protocol override. When set, overrides global server.proxy_protocol.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub proxy_protocol: Option<bool>,
|
pub proxy_protocol: Option<bool>,
|
||||||
|
/// Allow multiple telemt instances to listen on the same IP:port (SO_REUSEPORT).
|
||||||
|
/// Default is false for safety.
|
||||||
|
#[serde(default)]
|
||||||
|
pub reuse_allow: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============= ShowLink =============
|
// ============= ShowLink =============
|
||||||
|
|||||||
@@ -55,6 +55,11 @@ pub fn crc32(data: &[u8]) -> u32 {
|
|||||||
crc32fast::hash(data)
|
crc32fast::hash(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// CRC32C (Castagnoli)
|
||||||
|
pub fn crc32c(data: &[u8]) -> u32 {
|
||||||
|
crc32c::crc32c(data)
|
||||||
|
}
|
||||||
|
|
||||||
/// Build the exact prekey buffer used by Telegram Middle Proxy KDF.
|
/// Build the exact prekey buffer used by Telegram Middle Proxy KDF.
|
||||||
///
|
///
|
||||||
/// Returned buffer layout (IPv4):
|
/// Returned buffer layout (IPv4):
|
||||||
|
|||||||
@@ -5,5 +5,8 @@ pub mod hash;
|
|||||||
pub mod random;
|
pub mod random;
|
||||||
|
|
||||||
pub use aes::{AesCtr, AesCbc};
|
pub use aes::{AesCtr, AesCbc};
|
||||||
pub use hash::{sha256, sha256_hmac, sha1, md5, crc32, derive_middleproxy_keys, build_middleproxy_prekey};
|
pub use hash::{
|
||||||
|
build_middleproxy_prekey, crc32, crc32c, derive_middleproxy_keys, md5, sha1, sha256,
|
||||||
|
sha256_hmac,
|
||||||
|
};
|
||||||
pub use random::SecureRandom;
|
pub use random::SecureRandom;
|
||||||
|
|||||||
70
src/main.rs
70
src/main.rs
@@ -38,7 +38,7 @@ use crate::stream::BufferPool;
|
|||||||
use crate::transport::middle_proxy::{
|
use crate::transport::middle_proxy::{
|
||||||
MePool, fetch_proxy_config, run_me_ping, MePingFamily, MePingSample, format_sample_line,
|
MePool, fetch_proxy_config, run_me_ping, MePingFamily, MePingSample, format_sample_line,
|
||||||
};
|
};
|
||||||
use crate::transport::{ListenOptions, UpstreamManager, create_listener};
|
use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes};
|
||||||
use crate::tls_front::TlsFrontCache;
|
use crate::tls_front::TlsFrontCache;
|
||||||
|
|
||||||
fn parse_cli() -> (String, bool, Option<String>) {
|
fn parse_cli() -> (String, bool, Option<String>) {
|
||||||
@@ -461,10 +461,12 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
|||||||
cache.load_from_disk().await;
|
cache.load_from_disk().await;
|
||||||
|
|
||||||
let port = config.censorship.mask_port;
|
let port = config.censorship.mask_port;
|
||||||
|
let mask_host = config.censorship.mask_host.clone()
|
||||||
|
.unwrap_or_else(|| config.censorship.tls_domain.clone());
|
||||||
// Initial synchronous fetch to warm cache before serving clients.
|
// Initial synchronous fetch to warm cache before serving clients.
|
||||||
for domain in tls_domains.clone() {
|
for domain in tls_domains.clone() {
|
||||||
match crate::tls_front::fetcher::fetch_real_tls(
|
match crate::tls_front::fetcher::fetch_real_tls(
|
||||||
&domain,
|
&mask_host,
|
||||||
port,
|
port,
|
||||||
&domain,
|
&domain,
|
||||||
Duration::from_secs(5),
|
Duration::from_secs(5),
|
||||||
@@ -488,7 +490,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
|||||||
tokio::time::sleep(Duration::from_secs(base_secs + jitter_secs)).await;
|
tokio::time::sleep(Duration::from_secs(base_secs + jitter_secs)).await;
|
||||||
for domain in &domains {
|
for domain in &domains {
|
||||||
match crate::tls_front::fetcher::fetch_real_tls(
|
match crate::tls_front::fetcher::fetch_real_tls(
|
||||||
domain,
|
&mask_host,
|
||||||
port,
|
port,
|
||||||
domain,
|
domain,
|
||||||
Duration::from_secs(5),
|
Duration::from_secs(5),
|
||||||
@@ -713,6 +715,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let options = ListenOptions {
|
let options = ListenOptions {
|
||||||
|
reuse_port: listener_conf.reuse_allow,
|
||||||
ipv6_only: listener_conf.ip.is_ipv6(),
|
ipv6_only: listener_conf.ip.is_ipv6(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
@@ -751,10 +754,36 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
|||||||
listeners.push((listener, listener_proxy_protocol));
|
listeners.push((listener, listener_proxy_protocol));
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
if e.kind() == std::io::ErrorKind::AddrInUse {
|
||||||
|
let owners = find_listener_processes(addr);
|
||||||
|
if owners.is_empty() {
|
||||||
|
error!(
|
||||||
|
%addr,
|
||||||
|
"Failed to bind: address already in use (owner process unresolved)"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
for owner in owners {
|
||||||
|
error!(
|
||||||
|
%addr,
|
||||||
|
pid = owner.pid,
|
||||||
|
process = %owner.process,
|
||||||
|
"Failed to bind: address already in use"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !listener_conf.reuse_allow {
|
||||||
|
error!(
|
||||||
|
%addr,
|
||||||
|
"reuse_allow=false; set [[server.listeners]].reuse_allow=true to allow multi-instance listening"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
error!("Failed to bind to {}: {}", addr, e);
|
error!("Failed to bind to {}: {}", addr, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Show proxy links once when public_host is set, OR when there are no TCP listeners
|
// Show proxy links once when public_host is set, OR when there are no TCP listeners
|
||||||
// (unix-only mode) — use detected IP as fallback
|
// (unix-only mode) — use detected IP as fallback
|
||||||
@@ -938,7 +967,40 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
|||||||
.run()
|
.run()
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
warn!(peer = %peer_addr, error = %e, "Connection closed with error");
|
let peer_closed = matches!(
|
||||||
|
&e,
|
||||||
|
crate::error::ProxyError::Io(ioe)
|
||||||
|
if matches!(
|
||||||
|
ioe.kind(),
|
||||||
|
std::io::ErrorKind::ConnectionReset
|
||||||
|
| std::io::ErrorKind::ConnectionAborted
|
||||||
|
| std::io::ErrorKind::BrokenPipe
|
||||||
|
| std::io::ErrorKind::NotConnected
|
||||||
|
)
|
||||||
|
) || matches!(
|
||||||
|
&e,
|
||||||
|
crate::error::ProxyError::Stream(
|
||||||
|
crate::error::StreamError::Io(ioe)
|
||||||
|
)
|
||||||
|
if matches!(
|
||||||
|
ioe.kind(),
|
||||||
|
std::io::ErrorKind::ConnectionReset
|
||||||
|
| std::io::ErrorKind::ConnectionAborted
|
||||||
|
| std::io::ErrorKind::BrokenPipe
|
||||||
|
| std::io::ErrorKind::NotConnected
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
let me_closed = matches!(
|
||||||
|
&e,
|
||||||
|
crate::error::ProxyError::Proxy(msg) if msg == "ME connection lost"
|
||||||
|
);
|
||||||
|
|
||||||
|
match (peer_closed, me_closed) {
|
||||||
|
(true, _) => debug!(peer = %peer_addr, error = %e, "Connection closed by client"),
|
||||||
|
(_, true) => warn!(peer = %peer_addr, error = %e, "Connection closed: Middle-End dropped session"),
|
||||||
|
_ => warn!(peer = %peer_addr, error = %e, "Connection closed with error"),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -100,6 +100,14 @@ fn render_metrics(stats: &Stats) -> String {
|
|||||||
let _ = writeln!(out, "# TYPE telemt_me_keepalive_failed_total counter");
|
let _ = writeln!(out, "# TYPE telemt_me_keepalive_failed_total counter");
|
||||||
let _ = writeln!(out, "telemt_me_keepalive_failed_total {}", stats.get_me_keepalive_failed());
|
let _ = writeln!(out, "telemt_me_keepalive_failed_total {}", stats.get_me_keepalive_failed());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_keepalive_pong_total ME keepalive pong replies");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_keepalive_pong_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_keepalive_pong_total {}", stats.get_me_keepalive_pong());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_keepalive_timeout_total ME keepalive ping timeouts");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_keepalive_timeout_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_keepalive_timeout_total {}", stats.get_me_keepalive_timeout());
|
||||||
|
|
||||||
let _ = writeln!(out, "# HELP telemt_me_reconnect_attempts_total ME reconnect attempts");
|
let _ = writeln!(out, "# HELP telemt_me_reconnect_attempts_total ME reconnect attempts");
|
||||||
let _ = writeln!(out, "# TYPE telemt_me_reconnect_attempts_total counter");
|
let _ = writeln!(out, "# TYPE telemt_me_reconnect_attempts_total counter");
|
||||||
let _ = writeln!(out, "telemt_me_reconnect_attempts_total {}", stats.get_me_reconnect_attempts());
|
let _ = writeln!(out, "telemt_me_reconnect_attempts_total {}", stats.get_me_reconnect_attempts());
|
||||||
@@ -108,6 +116,30 @@ fn render_metrics(stats: &Stats) -> String {
|
|||||||
let _ = writeln!(out, "# TYPE telemt_me_reconnect_success_total counter");
|
let _ = writeln!(out, "# TYPE telemt_me_reconnect_success_total counter");
|
||||||
let _ = writeln!(out, "telemt_me_reconnect_success_total {}", stats.get_me_reconnect_success());
|
let _ = writeln!(out, "telemt_me_reconnect_success_total {}", stats.get_me_reconnect_success());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_crc_mismatch_total ME CRC mismatches");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_crc_mismatch_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_crc_mismatch_total {}", stats.get_me_crc_mismatch());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_seq_mismatch_total ME sequence mismatches");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_seq_mismatch_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_seq_mismatch_total {}", stats.get_me_seq_mismatch());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_route_drop_no_conn_total ME route drops: no conn");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_route_drop_no_conn_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_route_drop_no_conn_total {}", stats.get_me_route_drop_no_conn());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_route_drop_channel_closed_total ME route drops: channel closed");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_route_drop_channel_closed_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_route_drop_channel_closed_total {}", stats.get_me_route_drop_channel_closed());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_route_drop_queue_full_total ME route drops: queue full");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_route_drop_queue_full_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_route_drop_queue_full_total {}", stats.get_me_route_drop_queue_full());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_secure_padding_invalid_total {}", stats.get_secure_padding_invalid());
|
||||||
|
|
||||||
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
|
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
|
||||||
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
|
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
|
||||||
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");
|
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");
|
||||||
|
|||||||
@@ -156,14 +156,28 @@ pub const MAX_TLS_RECORD_SIZE: usize = 16384;
|
|||||||
/// RFC 8446 §5.2 allows up to 16384 + 256 bytes of ciphertext
|
/// RFC 8446 §5.2 allows up to 16384 + 256 bytes of ciphertext
|
||||||
pub const MAX_TLS_CHUNK_SIZE: usize = 16384 + 256;
|
pub const MAX_TLS_CHUNK_SIZE: usize = 16384 + 256;
|
||||||
|
|
||||||
/// Generate padding length for Secure Intermediate protocol.
|
/// Secure Intermediate payload is expected to be 4-byte aligned.
|
||||||
/// Total (data + padding) must not be divisible by 4 per MTProto spec.
|
pub fn is_valid_secure_payload_len(data_len: usize) -> bool {
|
||||||
pub fn secure_padding_len(data_len: usize, rng: &SecureRandom) -> usize {
|
data_len % 4 == 0
|
||||||
if data_len % 4 == 0 {
|
|
||||||
(rng.range(3) + 1) as usize // 1-3
|
|
||||||
} else {
|
|
||||||
rng.range(4) as usize // 0-3
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compute Secure Intermediate payload length from wire length.
|
||||||
|
/// Secure mode strips up to 3 random tail bytes by truncating to 4-byte boundary.
|
||||||
|
pub fn secure_payload_len_from_wire_len(wire_len: usize) -> Option<usize> {
|
||||||
|
if wire_len < 4 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(wire_len - (wire_len % 4))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generate padding length for Secure Intermediate protocol.
|
||||||
|
/// Data must be 4-byte aligned; padding is 1..=3 so total is never divisible by 4.
|
||||||
|
pub fn secure_padding_len(data_len: usize, rng: &SecureRandom) -> usize {
|
||||||
|
debug_assert!(
|
||||||
|
is_valid_secure_payload_len(data_len),
|
||||||
|
"Secure payload must be 4-byte aligned, got {data_len}"
|
||||||
|
);
|
||||||
|
(rng.range(3) + 1) as usize
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============= Timeouts =============
|
// ============= Timeouts =============
|
||||||
@@ -298,6 +312,10 @@ pub mod rpc_flags {
|
|||||||
pub const FLAG_QUICKACK: u32 = 0x80000000;
|
pub const FLAG_QUICKACK: u32 = 0x80000000;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub mod rpc_crypto_flags {
|
||||||
|
pub const USE_CRC32C: u32 = 0x800;
|
||||||
|
}
|
||||||
|
|
||||||
pub const ME_CONNECT_TIMEOUT_SECS: u64 = 5;
|
pub const ME_CONNECT_TIMEOUT_SECS: u64 = 5;
|
||||||
pub const ME_HANDSHAKE_TIMEOUT_SECS: u64 = 10;
|
pub const ME_HANDSHAKE_TIMEOUT_SECS: u64 = 10;
|
||||||
|
|
||||||
@@ -332,4 +350,43 @@ mod tests {
|
|||||||
assert_eq!(TG_DATACENTERS_V4.len(), 5);
|
assert_eq!(TG_DATACENTERS_V4.len(), 5);
|
||||||
assert_eq!(TG_DATACENTERS_V6.len(), 5);
|
assert_eq!(TG_DATACENTERS_V6.len(), 5);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn secure_padding_never_produces_aligned_total() {
|
||||||
|
let rng = SecureRandom::new();
|
||||||
|
for data_len in (0..1000).step_by(4) {
|
||||||
|
for _ in 0..100 {
|
||||||
|
let padding = secure_padding_len(data_len, &rng);
|
||||||
|
assert!(
|
||||||
|
padding <= 3,
|
||||||
|
"padding out of range: data_len={data_len}, padding={padding}"
|
||||||
|
);
|
||||||
|
assert_ne!(
|
||||||
|
(data_len + padding) % 4,
|
||||||
|
0,
|
||||||
|
"invariant violated: data_len={data_len}, padding={padding}, total={}",
|
||||||
|
data_len + padding
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn secure_wire_len_roundtrip_for_aligned_payload() {
|
||||||
|
for payload_len in (4..4096).step_by(4) {
|
||||||
|
for padding in 0..=3usize {
|
||||||
|
let wire_len = payload_len + padding;
|
||||||
|
let recovered = secure_payload_len_from_wire_len(wire_len);
|
||||||
|
assert_eq!(recovered, Some(payload_len));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn secure_wire_len_rejects_too_short_frames() {
|
||||||
|
assert_eq!(secure_payload_len_from_wire_len(0), None);
|
||||||
|
assert_eq!(secure_payload_len_from_wire_len(1), None);
|
||||||
|
assert_eq!(secure_payload_len_from_wire_len(2), None);
|
||||||
|
assert_eq!(secure_payload_len_from_wire_len(3), None);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -253,7 +253,11 @@ where
|
|||||||
|
|
||||||
let mode_ok = match proto_tag {
|
let mode_ok = match proto_tag {
|
||||||
ProtoTag::Secure => {
|
ProtoTag::Secure => {
|
||||||
if is_tls { config.general.modes.tls } else { config.general.modes.secure }
|
if is_tls {
|
||||||
|
config.general.modes.tls || config.general.modes.secure
|
||||||
|
} else {
|
||||||
|
config.general.modes.secure || config.general.modes.tls
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ProtoTag::Intermediate | ProtoTag::Abridged => config.general.modes.classic,
|
ProtoTag::Intermediate | ProtoTag::Abridged => config.general.modes.classic,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ use std::net::SocketAddr;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use tracing::{debug, info, trace, warn};
|
use tracing::{debug, info, trace, warn};
|
||||||
|
|
||||||
use crate::config::ProxyConfig;
|
use crate::config::ProxyConfig;
|
||||||
@@ -14,6 +14,11 @@ use crate::stats::Stats;
|
|||||||
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
|
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
|
||||||
use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
|
use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
|
||||||
|
|
||||||
|
enum C2MeCommand {
|
||||||
|
Data { payload: Vec<u8>, flags: u32 },
|
||||||
|
Close,
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) async fn handle_via_middle_proxy<R, W>(
|
pub(crate) async fn handle_via_middle_proxy<R, W>(
|
||||||
mut crypto_reader: CryptoReader<R>,
|
mut crypto_reader: CryptoReader<R>,
|
||||||
crypto_writer: CryptoWriter<W>,
|
crypto_writer: CryptoWriter<W>,
|
||||||
@@ -59,6 +64,30 @@ where
|
|||||||
|
|
||||||
let frame_limit = config.general.max_client_frame;
|
let frame_limit = config.general.max_client_frame;
|
||||||
|
|
||||||
|
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(1024);
|
||||||
|
let me_pool_c2me = me_pool.clone();
|
||||||
|
let c2me_sender = tokio::spawn(async move {
|
||||||
|
while let Some(cmd) = c2me_rx.recv().await {
|
||||||
|
match cmd {
|
||||||
|
C2MeCommand::Data { payload, flags } => {
|
||||||
|
me_pool_c2me.send_proxy_req(
|
||||||
|
conn_id,
|
||||||
|
success.dc_idx,
|
||||||
|
peer,
|
||||||
|
translated_local_addr,
|
||||||
|
&payload,
|
||||||
|
flags,
|
||||||
|
).await?;
|
||||||
|
}
|
||||||
|
C2MeCommand::Close => {
|
||||||
|
let _ = me_pool_c2me.send_close(conn_id).await;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
|
let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
|
||||||
let mut me_rx_task = me_rx;
|
let mut me_rx_task = me_rx;
|
||||||
let stats_clone = stats.clone();
|
let stats_clone = stats.clone();
|
||||||
@@ -74,6 +103,34 @@ where
|
|||||||
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
|
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
|
||||||
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
|
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
|
||||||
write_client_payload(&mut writer, proto_tag, flags, &data, rng_clone.as_ref()).await?;
|
write_client_payload(&mut writer, proto_tag, flags, &data, rng_clone.as_ref()).await?;
|
||||||
|
|
||||||
|
// Drain all immediately queued ME responses and flush once.
|
||||||
|
while let Ok(next) = me_rx_task.try_recv() {
|
||||||
|
match next {
|
||||||
|
MeResponse::Data { flags, data } => {
|
||||||
|
trace!(conn_id, bytes = data.len(), flags, "ME->C data (batched)");
|
||||||
|
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
|
||||||
|
write_client_payload(
|
||||||
|
&mut writer,
|
||||||
|
proto_tag,
|
||||||
|
flags,
|
||||||
|
&data,
|
||||||
|
rng_clone.as_ref(),
|
||||||
|
).await?;
|
||||||
|
}
|
||||||
|
MeResponse::Ack(confirm) => {
|
||||||
|
trace!(conn_id, confirm, "ME->C quickack (batched)");
|
||||||
|
write_client_ack(&mut writer, proto_tag, confirm).await?;
|
||||||
|
}
|
||||||
|
MeResponse::Close => {
|
||||||
|
debug!(conn_id, "ME sent close (batched)");
|
||||||
|
let _ = writer.flush().await;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.flush().await.map_err(ProxyError::Io)?;
|
||||||
}
|
}
|
||||||
Some(MeResponse::Ack(confirm)) => {
|
Some(MeResponse::Ack(confirm)) => {
|
||||||
trace!(conn_id, confirm, "ME->C quickack");
|
trace!(conn_id, confirm, "ME->C quickack");
|
||||||
@@ -81,6 +138,7 @@ where
|
|||||||
}
|
}
|
||||||
Some(MeResponse::Close) => {
|
Some(MeResponse::Close) => {
|
||||||
debug!(conn_id, "ME sent close");
|
debug!(conn_id, "ME sent close");
|
||||||
|
let _ = writer.flush().await;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
@@ -98,8 +156,17 @@ where
|
|||||||
});
|
});
|
||||||
|
|
||||||
let mut main_result: Result<()> = Ok(());
|
let mut main_result: Result<()> = Ok(());
|
||||||
|
let mut client_closed = false;
|
||||||
|
let mut frame_counter: u64 = 0;
|
||||||
loop {
|
loop {
|
||||||
match read_client_payload(&mut crypto_reader, proto_tag, frame_limit, &user).await {
|
match read_client_payload(
|
||||||
|
&mut crypto_reader,
|
||||||
|
proto_tag,
|
||||||
|
frame_limit,
|
||||||
|
&user,
|
||||||
|
&mut frame_counter,
|
||||||
|
&stats,
|
||||||
|
).await {
|
||||||
Ok(Some((payload, quickack))) => {
|
Ok(Some((payload, quickack))) => {
|
||||||
trace!(conn_id, bytes = payload.len(), "C->ME frame");
|
trace!(conn_id, bytes = payload.len(), "C->ME frame");
|
||||||
stats.add_user_octets_from(&user, payload.len() as u64);
|
stats.add_user_octets_from(&user, payload.len() as u64);
|
||||||
@@ -110,21 +177,20 @@ where
|
|||||||
if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) {
|
if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) {
|
||||||
flags |= RPC_FLAG_NOT_ENCRYPTED;
|
flags |= RPC_FLAG_NOT_ENCRYPTED;
|
||||||
}
|
}
|
||||||
if let Err(e) = me_pool.send_proxy_req(
|
// Keep client read loop lightweight: route heavy ME send path via a dedicated task.
|
||||||
conn_id,
|
if c2me_tx
|
||||||
success.dc_idx,
|
.send(C2MeCommand::Data { payload, flags })
|
||||||
peer,
|
.await
|
||||||
translated_local_addr,
|
.is_err()
|
||||||
&payload,
|
{
|
||||||
flags,
|
main_result = Err(ProxyError::Proxy("ME sender channel closed".into()));
|
||||||
).await {
|
|
||||||
main_result = Err(e);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
debug!(conn_id, "Client EOF");
|
debug!(conn_id, "Client EOF");
|
||||||
let _ = me_pool.send_close(conn_id).await;
|
client_closed = true;
|
||||||
|
let _ = c2me_tx.send(C2MeCommand::Close).await;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -134,13 +200,31 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let _ = stop_tx.send(());
|
drop(c2me_tx);
|
||||||
let writer_result = me_writer.await.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}"))));
|
let c2me_result = c2me_sender
|
||||||
|
.await
|
||||||
|
.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME sender join error: {e}"))));
|
||||||
|
|
||||||
let result = match (main_result, writer_result) {
|
let _ = stop_tx.send(());
|
||||||
(Ok(()), Ok(())) => Ok(()),
|
let mut writer_result = me_writer
|
||||||
(Err(e), _) => Err(e),
|
.await
|
||||||
(_, Err(e)) => Err(e),
|
.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}"))));
|
||||||
|
|
||||||
|
// When client closes, but ME channel stopped as unregistered - it isnt error
|
||||||
|
if client_closed {
|
||||||
|
if matches!(
|
||||||
|
writer_result,
|
||||||
|
Err(ProxyError::Proxy(ref msg)) if msg == "ME connection lost"
|
||||||
|
) {
|
||||||
|
writer_result = Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let result = match (main_result, c2me_result, writer_result) {
|
||||||
|
(Ok(()), Ok(()), Ok(())) => Ok(()),
|
||||||
|
(Err(e), _, _) => Err(e),
|
||||||
|
(_, Err(e), _) => Err(e),
|
||||||
|
(_, _, Err(e)) => Err(e),
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!(user = %user, conn_id, "ME relay cleanup");
|
debug!(user = %user, conn_id, "ME relay cleanup");
|
||||||
@@ -154,11 +238,14 @@ async fn read_client_payload<R>(
|
|||||||
proto_tag: ProtoTag,
|
proto_tag: ProtoTag,
|
||||||
max_frame: usize,
|
max_frame: usize,
|
||||||
user: &str,
|
user: &str,
|
||||||
|
frame_counter: &mut u64,
|
||||||
|
stats: &Stats,
|
||||||
) -> Result<Option<(Vec<u8>, bool)>>
|
) -> Result<Option<(Vec<u8>, bool)>>
|
||||||
where
|
where
|
||||||
R: AsyncRead + Unpin + Send + 'static,
|
R: AsyncRead + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
let (len, quickack) = match proto_tag {
|
loop {
|
||||||
|
let (len, quickack, raw_len_bytes) = match proto_tag {
|
||||||
ProtoTag::Abridged => {
|
ProtoTag::Abridged => {
|
||||||
let mut first = [0u8; 1];
|
let mut first = [0u8; 1];
|
||||||
match client_reader.read_exact(&mut first).await {
|
match client_reader.read_exact(&mut first).await {
|
||||||
@@ -182,7 +269,7 @@ where
|
|||||||
let len = len_words
|
let len = len_words
|
||||||
.checked_mul(4)
|
.checked_mul(4)
|
||||||
.ok_or_else(|| ProxyError::Proxy("Abridged frame length overflow".into()))?;
|
.ok_or_else(|| ProxyError::Proxy("Abridged frame length overflow".into()))?;
|
||||||
(len, quickack)
|
(len, quickack, None)
|
||||||
}
|
}
|
||||||
ProtoTag::Intermediate | ProtoTag::Secure => {
|
ProtoTag::Intermediate | ProtoTag::Secure => {
|
||||||
let mut len_buf = [0u8; 4];
|
let mut len_buf = [0u8; 4];
|
||||||
@@ -192,35 +279,82 @@ where
|
|||||||
Err(e) => return Err(ProxyError::Io(e)),
|
Err(e) => return Err(ProxyError::Io(e)),
|
||||||
}
|
}
|
||||||
let quickack = (len_buf[3] & 0x80) != 0;
|
let quickack = (len_buf[3] & 0x80) != 0;
|
||||||
((u32::from_le_bytes(len_buf) & 0x7fff_ffff) as usize, quickack)
|
(
|
||||||
|
(u32::from_le_bytes(len_buf) & 0x7fff_ffff) as usize,
|
||||||
|
quickack,
|
||||||
|
Some(len_buf),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if len == 0 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if len < 4 && proto_tag != ProtoTag::Abridged {
|
||||||
|
warn!(
|
||||||
|
user = %user,
|
||||||
|
len,
|
||||||
|
proto = ?proto_tag,
|
||||||
|
"Frame too small — corrupt or probe"
|
||||||
|
);
|
||||||
|
return Err(ProxyError::Proxy(format!("Frame too small: {len}")));
|
||||||
|
}
|
||||||
|
|
||||||
if len > max_frame {
|
if len > max_frame {
|
||||||
|
let len_buf = raw_len_bytes.unwrap_or((len as u32).to_le_bytes());
|
||||||
|
let looks_like_tls = raw_len_bytes
|
||||||
|
.map(|b| b[0] == 0x16 && b[1] == 0x03)
|
||||||
|
.unwrap_or(false);
|
||||||
|
let looks_like_http = raw_len_bytes
|
||||||
|
.map(|b| matches!(b[0], b'G' | b'P' | b'H' | b'C' | b'D'))
|
||||||
|
.unwrap_or(false);
|
||||||
warn!(
|
warn!(
|
||||||
user = %user,
|
user = %user,
|
||||||
raw_len = len,
|
raw_len = len,
|
||||||
raw_len_hex = format_args!("0x{:08x}", len),
|
raw_len_hex = format_args!("0x{:08x}", len),
|
||||||
|
raw_bytes = format_args!(
|
||||||
|
"{:02x} {:02x} {:02x} {:02x}",
|
||||||
|
len_buf[0], len_buf[1], len_buf[2], len_buf[3]
|
||||||
|
),
|
||||||
proto = ?proto_tag,
|
proto = ?proto_tag,
|
||||||
"Frame too large — possible crypto desync or TLS record error"
|
tls_like = looks_like_tls,
|
||||||
|
http_like = looks_like_http,
|
||||||
|
frames_ok = *frame_counter,
|
||||||
|
"Frame too large — crypto desync forensics"
|
||||||
);
|
);
|
||||||
return Err(ProxyError::Proxy(format!("Frame too large: {len} (max {max_frame})")));
|
return Err(ProxyError::Proxy(format!(
|
||||||
|
"Frame too large: {len} (max {max_frame}), frames_ok={}",
|
||||||
|
*frame_counter
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let secure_payload_len = if proto_tag == ProtoTag::Secure {
|
||||||
|
match secure_payload_len_from_wire_len(len) {
|
||||||
|
Some(payload_len) => payload_len,
|
||||||
|
None => {
|
||||||
|
stats.increment_secure_padding_invalid();
|
||||||
|
return Err(ProxyError::Proxy(format!(
|
||||||
|
"Invalid secure frame length: {len}"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
len
|
||||||
|
};
|
||||||
|
|
||||||
let mut payload = vec![0u8; len];
|
let mut payload = vec![0u8; len];
|
||||||
client_reader
|
client_reader
|
||||||
.read_exact(&mut payload)
|
.read_exact(&mut payload)
|
||||||
.await
|
.await
|
||||||
.map_err(ProxyError::Io)?;
|
.map_err(ProxyError::Io)?;
|
||||||
|
|
||||||
// Secure Intermediate: remove random padding (last len%4 bytes)
|
// Secure Intermediate: strip validated trailing padding bytes.
|
||||||
if proto_tag == ProtoTag::Secure {
|
if proto_tag == ProtoTag::Secure {
|
||||||
let rem = len % 4;
|
payload.truncate(secure_payload_len);
|
||||||
if rem != 0 && payload.len() >= rem {
|
|
||||||
payload.truncate(len - rem);
|
|
||||||
}
|
}
|
||||||
|
*frame_counter += 1;
|
||||||
|
return Ok(Some((payload, quickack)));
|
||||||
}
|
}
|
||||||
Ok(Some((payload, quickack)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn write_client_payload<W>(
|
async fn write_client_payload<W>(
|
||||||
@@ -250,8 +384,11 @@ where
|
|||||||
if quickack {
|
if quickack {
|
||||||
first |= 0x80;
|
first |= 0x80;
|
||||||
}
|
}
|
||||||
|
let mut frame_buf = Vec::with_capacity(1 + data.len());
|
||||||
|
frame_buf.push(first);
|
||||||
|
frame_buf.extend_from_slice(data);
|
||||||
client_writer
|
client_writer
|
||||||
.write_all(&[first])
|
.write_all(&frame_buf)
|
||||||
.await
|
.await
|
||||||
.map_err(ProxyError::Io)?;
|
.map_err(ProxyError::Io)?;
|
||||||
} else if len_words < (1 << 24) {
|
} else if len_words < (1 << 24) {
|
||||||
@@ -260,8 +397,11 @@ where
|
|||||||
first |= 0x80;
|
first |= 0x80;
|
||||||
}
|
}
|
||||||
let lw = (len_words as u32).to_le_bytes();
|
let lw = (len_words as u32).to_le_bytes();
|
||||||
|
let mut frame_buf = Vec::with_capacity(4 + data.len());
|
||||||
|
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
|
||||||
|
frame_buf.extend_from_slice(data);
|
||||||
client_writer
|
client_writer
|
||||||
.write_all(&[first, lw[0], lw[1], lw[2]])
|
.write_all(&frame_buf)
|
||||||
.await
|
.await
|
||||||
.map_err(ProxyError::Io)?;
|
.map_err(ProxyError::Io)?;
|
||||||
} else {
|
} else {
|
||||||
@@ -270,46 +410,36 @@ where
|
|||||||
data.len()
|
data.len()
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
client_writer
|
|
||||||
.write_all(data)
|
|
||||||
.await
|
|
||||||
.map_err(ProxyError::Io)?;
|
|
||||||
}
|
}
|
||||||
ProtoTag::Intermediate | ProtoTag::Secure => {
|
ProtoTag::Intermediate | ProtoTag::Secure => {
|
||||||
let padding_len = if proto_tag == ProtoTag::Secure {
|
let padding_len = if proto_tag == ProtoTag::Secure {
|
||||||
|
if !is_valid_secure_payload_len(data.len()) {
|
||||||
|
return Err(ProxyError::Proxy(format!(
|
||||||
|
"Secure payload must be 4-byte aligned, got {}",
|
||||||
|
data.len()
|
||||||
|
)));
|
||||||
|
}
|
||||||
secure_padding_len(data.len(), rng)
|
secure_padding_len(data.len(), rng)
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
};
|
};
|
||||||
let mut len = (data.len() + padding_len) as u32;
|
let mut len_val = (data.len() + padding_len) as u32;
|
||||||
if quickack {
|
if quickack {
|
||||||
len |= 0x8000_0000;
|
len_val |= 0x8000_0000;
|
||||||
}
|
}
|
||||||
client_writer
|
let total = 4 + data.len() + padding_len;
|
||||||
.write_all(&len.to_le_bytes())
|
let mut frame_buf = Vec::with_capacity(total);
|
||||||
.await
|
frame_buf.extend_from_slice(&len_val.to_le_bytes());
|
||||||
.map_err(ProxyError::Io)?;
|
frame_buf.extend_from_slice(data);
|
||||||
client_writer
|
|
||||||
.write_all(data)
|
|
||||||
.await
|
|
||||||
.map_err(ProxyError::Io)?;
|
|
||||||
if padding_len > 0 {
|
if padding_len > 0 {
|
||||||
let pad = rng.bytes(padding_len);
|
frame_buf.extend_from_slice(&rng.bytes(padding_len));
|
||||||
|
}
|
||||||
client_writer
|
client_writer
|
||||||
.write_all(&pad)
|
.write_all(&frame_buf)
|
||||||
.await
|
.await
|
||||||
.map_err(ProxyError::Io)?;
|
.map_err(ProxyError::Io)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Avoid unconditional per-frame flush (throughput killer on large downloads).
|
|
||||||
// Flush only when low-latency ack semantics are requested or when
|
|
||||||
// CryptoWriter has buffered pending ciphertext that must be drained.
|
|
||||||
if quickack || client_writer.has_pending() {
|
|
||||||
client_writer.flush().await.map_err(ProxyError::Io)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,8 +21,16 @@ pub struct Stats {
|
|||||||
handshake_timeouts: AtomicU64,
|
handshake_timeouts: AtomicU64,
|
||||||
me_keepalive_sent: AtomicU64,
|
me_keepalive_sent: AtomicU64,
|
||||||
me_keepalive_failed: AtomicU64,
|
me_keepalive_failed: AtomicU64,
|
||||||
|
me_keepalive_pong: AtomicU64,
|
||||||
|
me_keepalive_timeout: AtomicU64,
|
||||||
me_reconnect_attempts: AtomicU64,
|
me_reconnect_attempts: AtomicU64,
|
||||||
me_reconnect_success: AtomicU64,
|
me_reconnect_success: AtomicU64,
|
||||||
|
me_crc_mismatch: AtomicU64,
|
||||||
|
me_seq_mismatch: AtomicU64,
|
||||||
|
me_route_drop_no_conn: AtomicU64,
|
||||||
|
me_route_drop_channel_closed: AtomicU64,
|
||||||
|
me_route_drop_queue_full: AtomicU64,
|
||||||
|
secure_padding_invalid: AtomicU64,
|
||||||
user_stats: DashMap<String, UserStats>,
|
user_stats: DashMap<String, UserStats>,
|
||||||
start_time: parking_lot::RwLock<Option<Instant>>,
|
start_time: parking_lot::RwLock<Option<Instant>>,
|
||||||
}
|
}
|
||||||
@@ -49,14 +57,45 @@ impl Stats {
|
|||||||
pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); }
|
pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); }
|
||||||
pub fn increment_me_keepalive_sent(&self) { self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); }
|
pub fn increment_me_keepalive_sent(&self) { self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); }
|
||||||
pub fn increment_me_keepalive_failed(&self) { self.me_keepalive_failed.fetch_add(1, Ordering::Relaxed); }
|
pub fn increment_me_keepalive_failed(&self) { self.me_keepalive_failed.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_keepalive_pong(&self) { self.me_keepalive_pong.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_keepalive_timeout(&self) { self.me_keepalive_timeout.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_keepalive_timeout_by(&self, value: u64) {
|
||||||
|
self.me_keepalive_timeout.fetch_add(value, Ordering::Relaxed);
|
||||||
|
}
|
||||||
pub fn increment_me_reconnect_attempt(&self) { self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); }
|
pub fn increment_me_reconnect_attempt(&self) { self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); }
|
||||||
pub fn increment_me_reconnect_success(&self) { self.me_reconnect_success.fetch_add(1, Ordering::Relaxed); }
|
pub fn increment_me_reconnect_success(&self) { self.me_reconnect_success.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_crc_mismatch(&self) { self.me_crc_mismatch.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_seq_mismatch(&self) { self.me_seq_mismatch.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_route_drop_no_conn(&self) { self.me_route_drop_no_conn.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_route_drop_channel_closed(&self) {
|
||||||
|
self.me_route_drop_channel_closed.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_me_route_drop_queue_full(&self) {
|
||||||
|
self.me_route_drop_queue_full.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_secure_padding_invalid(&self) {
|
||||||
|
self.secure_padding_invalid.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
||||||
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
||||||
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
|
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
|
||||||
pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) }
|
pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_keepalive_pong(&self) -> u64 { self.me_keepalive_pong.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_keepalive_timeout(&self) -> u64 { self.me_keepalive_timeout.load(Ordering::Relaxed) }
|
||||||
pub fn get_me_reconnect_attempts(&self) -> u64 { self.me_reconnect_attempts.load(Ordering::Relaxed) }
|
pub fn get_me_reconnect_attempts(&self) -> u64 { self.me_reconnect_attempts.load(Ordering::Relaxed) }
|
||||||
pub fn get_me_reconnect_success(&self) -> u64 { self.me_reconnect_success.load(Ordering::Relaxed) }
|
pub fn get_me_reconnect_success(&self) -> u64 { self.me_reconnect_success.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_crc_mismatch(&self) -> u64 { self.me_crc_mismatch.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_seq_mismatch(&self) -> u64 { self.me_seq_mismatch.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_route_drop_no_conn(&self) -> u64 { self.me_route_drop_no_conn.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_route_drop_channel_closed(&self) -> u64 {
|
||||||
|
self.me_route_drop_channel_closed.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_route_drop_queue_full(&self) -> u64 {
|
||||||
|
self.me_route_drop_queue_full.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_secure_padding_invalid(&self) -> u64 {
|
||||||
|
self.secure_padding_invalid.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn increment_user_connects(&self, user: &str) {
|
pub fn increment_user_connects(&self, user: &str) {
|
||||||
self.user_stats.entry(user.to_string()).or_default()
|
self.user_stats.entry(user.to_string()).or_default()
|
||||||
|
|||||||
@@ -8,7 +8,9 @@ use std::io::{self, Error, ErrorKind};
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio_util::codec::{Decoder, Encoder};
|
use tokio_util::codec::{Decoder, Encoder};
|
||||||
|
|
||||||
use crate::protocol::constants::ProtoTag;
|
use crate::protocol::constants::{
|
||||||
|
ProtoTag, is_valid_secure_payload_len, secure_padding_len, secure_payload_len_from_wire_len,
|
||||||
|
};
|
||||||
use crate::crypto::SecureRandom;
|
use crate::crypto::SecureRandom;
|
||||||
use super::frame::{Frame, FrameMeta, FrameCodec as FrameCodecTrait};
|
use super::frame::{Frame, FrameMeta, FrameCodec as FrameCodecTrait};
|
||||||
|
|
||||||
@@ -274,13 +276,13 @@ fn decode_secure(src: &mut BytesMut, max_size: usize) -> io::Result<Option<Frame
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate padding (indicated by length not divisible by 4)
|
let data_len = secure_payload_len_from_wire_len(len).ok_or_else(|| {
|
||||||
let padding_len = len % 4;
|
Error::new(
|
||||||
let data_len = if padding_len != 0 {
|
ErrorKind::InvalidData,
|
||||||
len - padding_len
|
format!("invalid secure frame length: {len}"),
|
||||||
} else {
|
)
|
||||||
len
|
})?;
|
||||||
};
|
let padding_len = len - data_len;
|
||||||
|
|
||||||
meta.padding_len = padding_len as u8;
|
meta.padding_len = padding_len as u8;
|
||||||
|
|
||||||
@@ -303,14 +305,15 @@ fn encode_secure(frame: &Frame, dst: &mut BytesMut, rng: &SecureRandom) -> io::R
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate padding to make length not divisible by 4
|
if !is_valid_secure_payload_len(data.len()) {
|
||||||
let padding_len = if data.len() % 4 == 0 {
|
return Err(Error::new(
|
||||||
// Add 1-3 bytes to make it non-aligned
|
ErrorKind::InvalidData,
|
||||||
(rng.range(3) + 1) as usize
|
format!("secure payload must be 4-byte aligned, got {}", data.len()),
|
||||||
} else {
|
));
|
||||||
// Already non-aligned, can add 0-3
|
}
|
||||||
rng.range(4) as usize
|
|
||||||
};
|
// Generate padding that keeps total length non-divisible by 4.
|
||||||
|
let padding_len = secure_padding_len(data.len(), rng);
|
||||||
|
|
||||||
let total_len = data.len() + padding_len;
|
let total_len = data.len() + padding_len;
|
||||||
dst.reserve(4 + total_len);
|
dst.reserve(4 + total_len);
|
||||||
|
|||||||
@@ -232,11 +232,13 @@ impl<R: AsyncRead + Unpin> SecureIntermediateFrameReader<R> {
|
|||||||
let mut data = vec![0u8; len];
|
let mut data = vec![0u8; len];
|
||||||
self.upstream.read_exact(&mut data).await?;
|
self.upstream.read_exact(&mut data).await?;
|
||||||
|
|
||||||
// Strip padding (not aligned to 4)
|
let payload_len = secure_payload_len_from_wire_len(len).ok_or_else(|| {
|
||||||
if len % 4 != 0 {
|
Error::new(
|
||||||
let actual_len = len - (len % 4);
|
ErrorKind::InvalidData,
|
||||||
data.truncate(actual_len);
|
format!("Invalid secure frame length: {len}"),
|
||||||
}
|
)
|
||||||
|
})?;
|
||||||
|
data.truncate(payload_len);
|
||||||
|
|
||||||
Ok((Bytes::from(data), meta))
|
Ok((Bytes::from(data), meta))
|
||||||
}
|
}
|
||||||
@@ -267,6 +269,13 @@ impl<W: AsyncWrite + Unpin> SecureIntermediateFrameWriter<W> {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !is_valid_secure_payload_len(data.len()) {
|
||||||
|
return Err(Error::new(
|
||||||
|
ErrorKind::InvalidData,
|
||||||
|
format!("Secure payload must be 4-byte aligned, got {}", data.len()),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
// Add padding so total length is never divisible by 4 (MTProto Secure)
|
// Add padding so total length is never divisible by 4 (MTProto Secure)
|
||||||
let padding_len = secure_padding_len(data.len(), &self.rng);
|
let padding_len = secure_padding_len(data.len(), &self.rng);
|
||||||
let padding = self.rng.bytes(padding_len);
|
let padding = self.rng.bytes(padding_len);
|
||||||
@@ -550,9 +559,7 @@ mod tests {
|
|||||||
writer.flush().await.unwrap();
|
writer.flush().await.unwrap();
|
||||||
|
|
||||||
let (received, _meta) = reader.read_frame().await.unwrap();
|
let (received, _meta) = reader.read_frame().await.unwrap();
|
||||||
// Received should have padding stripped to align to 4
|
assert_eq!(received.len(), data.len());
|
||||||
let expected_len = (data.len() / 4) * 4;
|
|
||||||
assert_eq!(received.len(), expected_len);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
|
||||||
use crate::crypto::{AesCbc, crc32};
|
use crate::crypto::{AesCbc, crc32, crc32c};
|
||||||
use crate::error::{ProxyError, Result};
|
use crate::error::{ProxyError, Result};
|
||||||
use crate::protocol::constants::*;
|
use crate::protocol::constants::*;
|
||||||
|
|
||||||
@@ -8,17 +8,46 @@ use crate::protocol::constants::*;
|
|||||||
pub(crate) enum WriterCommand {
|
pub(crate) enum WriterCommand {
|
||||||
Data(Vec<u8>),
|
Data(Vec<u8>),
|
||||||
DataAndFlush(Vec<u8>),
|
DataAndFlush(Vec<u8>),
|
||||||
Keepalive,
|
|
||||||
Close,
|
Close,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8]) -> Vec<u8> {
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub(crate) enum RpcChecksumMode {
|
||||||
|
Crc32,
|
||||||
|
Crc32c,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RpcChecksumMode {
|
||||||
|
pub(crate) fn from_handshake_flags(flags: u32) -> Self {
|
||||||
|
if (flags & rpc_crypto_flags::USE_CRC32C) != 0 {
|
||||||
|
Self::Crc32c
|
||||||
|
} else {
|
||||||
|
Self::Crc32
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn advertised_flags(self) -> u32 {
|
||||||
|
match self {
|
||||||
|
Self::Crc32 => 0,
|
||||||
|
Self::Crc32c => rpc_crypto_flags::USE_CRC32C,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn rpc_crc(mode: RpcChecksumMode, data: &[u8]) -> u32 {
|
||||||
|
match mode {
|
||||||
|
RpcChecksumMode::Crc32 => crc32(data),
|
||||||
|
RpcChecksumMode::Crc32c => crc32c(data),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8], crc_mode: RpcChecksumMode) -> Vec<u8> {
|
||||||
let total_len = (4 + 4 + payload.len() + 4) as u32;
|
let total_len = (4 + 4 + payload.len() + 4) as u32;
|
||||||
let mut frame = Vec::with_capacity(total_len as usize);
|
let mut frame = Vec::with_capacity(total_len as usize);
|
||||||
frame.extend_from_slice(&total_len.to_le_bytes());
|
frame.extend_from_slice(&total_len.to_le_bytes());
|
||||||
frame.extend_from_slice(&seq_no.to_le_bytes());
|
frame.extend_from_slice(&seq_no.to_le_bytes());
|
||||||
frame.extend_from_slice(payload);
|
frame.extend_from_slice(payload);
|
||||||
let c = crc32(&frame);
|
let c = rpc_crc(crc_mode, &frame);
|
||||||
frame.extend_from_slice(&c.to_le_bytes());
|
frame.extend_from_slice(&c.to_le_bytes());
|
||||||
frame
|
frame
|
||||||
}
|
}
|
||||||
@@ -45,7 +74,7 @@ pub(crate) async fn read_rpc_frame_plaintext(
|
|||||||
|
|
||||||
let crc_offset = total_len - 4;
|
let crc_offset = total_len - 4;
|
||||||
let expected_crc = u32::from_le_bytes(full[crc_offset..crc_offset + 4].try_into().unwrap());
|
let expected_crc = u32::from_le_bytes(full[crc_offset..crc_offset + 4].try_into().unwrap());
|
||||||
let actual_crc = crc32(&full[..crc_offset]);
|
let actual_crc = rpc_crc(RpcChecksumMode::Crc32, &full[..crc_offset]);
|
||||||
if expected_crc != actual_crc {
|
if expected_crc != actual_crc {
|
||||||
return Err(ProxyError::InvalidHandshake(format!(
|
return Err(ProxyError::InvalidHandshake(format!(
|
||||||
"CRC mismatch: 0x{expected_crc:08x} vs 0x{actual_crc:08x}"
|
"CRC mismatch: 0x{expected_crc:08x} vs 0x{actual_crc:08x}"
|
||||||
@@ -95,24 +124,52 @@ pub(crate) fn build_handshake_payload(
|
|||||||
our_port: u16,
|
our_port: u16,
|
||||||
peer_ip: [u8; 4],
|
peer_ip: [u8; 4],
|
||||||
peer_port: u16,
|
peer_port: u16,
|
||||||
|
flags: u32,
|
||||||
) -> [u8; 32] {
|
) -> [u8; 32] {
|
||||||
let mut p = [0u8; 32];
|
let mut p = [0u8; 32];
|
||||||
p[0..4].copy_from_slice(&RPC_HANDSHAKE_U32.to_le_bytes());
|
p[0..4].copy_from_slice(&RPC_HANDSHAKE_U32.to_le_bytes());
|
||||||
|
p[4..8].copy_from_slice(&flags.to_le_bytes());
|
||||||
|
|
||||||
// Keep C memory layout compatibility for PID IPv4 bytes.
|
// process_id sender_pid
|
||||||
p[8..12].copy_from_slice(&our_ip);
|
p[8..12].copy_from_slice(&our_ip);
|
||||||
p[12..14].copy_from_slice(&our_port.to_le_bytes());
|
p[12..14].copy_from_slice(&our_port.to_le_bytes());
|
||||||
let pid = (std::process::id() & 0xffff) as u16;
|
p[14..16].copy_from_slice(&process_pid16().to_le_bytes());
|
||||||
p[14..16].copy_from_slice(&pid.to_le_bytes());
|
p[16..20].copy_from_slice(&process_utime().to_le_bytes());
|
||||||
|
|
||||||
|
// process_id peer_pid
|
||||||
|
p[20..24].copy_from_slice(&peer_ip);
|
||||||
|
p[24..26].copy_from_slice(&peer_port.to_le_bytes());
|
||||||
|
p[26..28].copy_from_slice(&0u16.to_le_bytes());
|
||||||
|
p[28..32].copy_from_slice(&0u32.to_le_bytes());
|
||||||
|
p
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn parse_handshake_flags(payload: &[u8]) -> Result<u32> {
|
||||||
|
if payload.len() != 32 {
|
||||||
|
return Err(ProxyError::InvalidHandshake(format!(
|
||||||
|
"Bad handshake payload len: {}",
|
||||||
|
payload.len()
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
let hs_type = u32::from_le_bytes(payload[0..4].try_into().unwrap());
|
||||||
|
if hs_type != RPC_HANDSHAKE_U32 {
|
||||||
|
return Err(ProxyError::InvalidHandshake(format!(
|
||||||
|
"Expected HANDSHAKE 0x{RPC_HANDSHAKE_U32:08x}, got 0x{hs_type:08x}"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
Ok(u32::from_le_bytes(payload[4..8].try_into().unwrap()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process_pid16() -> u16 {
|
||||||
|
(std::process::id() & 0xffff) as u16
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process_utime() -> u32 {
|
||||||
let utime = std::time::SystemTime::now()
|
let utime = std::time::SystemTime::now()
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.as_secs() as u32;
|
.as_secs() as u32;
|
||||||
p[16..20].copy_from_slice(&utime.to_le_bytes());
|
utime
|
||||||
|
|
||||||
p[20..24].copy_from_slice(&peer_ip);
|
|
||||||
p[24..26].copy_from_slice(&peer_port.to_le_bytes());
|
|
||||||
p
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn cbc_encrypt_padded(
|
pub(crate) fn cbc_encrypt_padded(
|
||||||
@@ -160,11 +217,12 @@ pub(crate) struct RpcWriter {
|
|||||||
pub(crate) key: [u8; 32],
|
pub(crate) key: [u8; 32],
|
||||||
pub(crate) iv: [u8; 16],
|
pub(crate) iv: [u8; 16],
|
||||||
pub(crate) seq_no: i32,
|
pub(crate) seq_no: i32,
|
||||||
|
pub(crate) crc_mode: RpcChecksumMode,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RpcWriter {
|
impl RpcWriter {
|
||||||
pub(crate) async fn send(&mut self, payload: &[u8]) -> Result<()> {
|
pub(crate) async fn send(&mut self, payload: &[u8]) -> Result<()> {
|
||||||
let frame = build_rpc_frame(self.seq_no, payload);
|
let frame = build_rpc_frame(self.seq_no, payload, self.crc_mode);
|
||||||
self.seq_no += 1;
|
self.seq_no += 1;
|
||||||
|
|
||||||
let pad = (16 - (frame.len() % 16)) % 16;
|
let pad = (16 - (frame.len() % 16)) % 16;
|
||||||
@@ -189,12 +247,4 @@ impl RpcWriter {
|
|||||||
self.send(payload).await?;
|
self.send(payload).await?;
|
||||||
self.writer.flush().await.map_err(ProxyError::Io)
|
self.writer.flush().await.map_err(ProxyError::Io)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn send_keepalive(&mut self, payload: [u8; 4]) -> Result<()> {
|
|
||||||
// Keepalive is a frame with fl == 4 and 4 bytes payload.
|
|
||||||
let mut frame = Vec::with_capacity(8);
|
|
||||||
frame.extend_from_slice(&4u32.to_le_bytes());
|
|
||||||
frame.extend_from_slice(&payload);
|
|
||||||
self.send(&frame).await
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,13 +18,14 @@ use crate::crypto::{SecureRandom, build_middleproxy_prekey, derive_middleproxy_k
|
|||||||
use crate::error::{ProxyError, Result};
|
use crate::error::{ProxyError, Result};
|
||||||
use crate::network::IpFamily;
|
use crate::network::IpFamily;
|
||||||
use crate::protocol::constants::{
|
use crate::protocol::constants::{
|
||||||
ME_CONNECT_TIMEOUT_SECS, ME_HANDSHAKE_TIMEOUT_SECS, RPC_CRYPTO_AES_U32, RPC_HANDSHAKE_ERROR_U32,
|
ME_CONNECT_TIMEOUT_SECS, ME_HANDSHAKE_TIMEOUT_SECS, RPC_CRYPTO_AES_U32,
|
||||||
RPC_HANDSHAKE_U32, RPC_PING_U32, RPC_PONG_U32, RPC_NONCE_U32,
|
RPC_HANDSHAKE_ERROR_U32, rpc_crypto_flags,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::codec::{
|
use super::codec::{
|
||||||
build_handshake_payload, build_nonce_payload, build_rpc_frame, cbc_decrypt_inplace,
|
RpcChecksumMode, build_handshake_payload, build_nonce_payload, build_rpc_frame,
|
||||||
cbc_encrypt_padded, parse_nonce_payload, read_rpc_frame_plaintext,
|
cbc_decrypt_inplace, cbc_encrypt_padded, parse_handshake_flags, parse_nonce_payload,
|
||||||
|
read_rpc_frame_plaintext, rpc_crc,
|
||||||
};
|
};
|
||||||
use super::wire::{extract_ip_material, IpMaterial};
|
use super::wire::{extract_ip_material, IpMaterial};
|
||||||
use super::MePool;
|
use super::MePool;
|
||||||
@@ -37,6 +38,7 @@ pub(crate) struct HandshakeOutput {
|
|||||||
pub read_iv: [u8; 16],
|
pub read_iv: [u8; 16],
|
||||||
pub write_key: [u8; 32],
|
pub write_key: [u8; 32],
|
||||||
pub write_iv: [u8; 16],
|
pub write_iv: [u8; 16],
|
||||||
|
pub crc_mode: RpcChecksumMode,
|
||||||
pub handshake_ms: f64,
|
pub handshake_ms: f64,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -146,7 +148,7 @@ impl MePool {
|
|||||||
|
|
||||||
let ks = self.key_selector().await;
|
let ks = self.key_selector().await;
|
||||||
let nonce_payload = build_nonce_payload(ks, crypto_ts, &my_nonce);
|
let nonce_payload = build_nonce_payload(ks, crypto_ts, &my_nonce);
|
||||||
let nonce_frame = build_rpc_frame(-2, &nonce_payload);
|
let nonce_frame = build_rpc_frame(-2, &nonce_payload, RpcChecksumMode::Crc32);
|
||||||
let dump = hex_dump(&nonce_frame[..nonce_frame.len().min(44)]);
|
let dump = hex_dump(&nonce_frame[..nonce_frame.len().min(44)]);
|
||||||
debug!(
|
debug!(
|
||||||
key_selector = format_args!("0x{ks:08x}"),
|
key_selector = format_args!("0x{ks:08x}"),
|
||||||
@@ -284,8 +286,15 @@ impl MePool {
|
|||||||
srv_v6_opt.as_ref(),
|
srv_v6_opt.as_ref(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let hs_payload = build_handshake_payload(hs_our_ip, local_addr.port(), hs_peer_ip, peer_addr.port());
|
let requested_crc_mode = RpcChecksumMode::Crc32c;
|
||||||
let hs_frame = build_rpc_frame(-1, &hs_payload);
|
let hs_payload = build_handshake_payload(
|
||||||
|
hs_our_ip,
|
||||||
|
local_addr.port(),
|
||||||
|
hs_peer_ip,
|
||||||
|
peer_addr.port(),
|
||||||
|
requested_crc_mode.advertised_flags(),
|
||||||
|
);
|
||||||
|
let hs_frame = build_rpc_frame(-1, &hs_payload, RpcChecksumMode::Crc32);
|
||||||
if diag_level >= 1 {
|
if diag_level >= 1 {
|
||||||
info!(
|
info!(
|
||||||
write_key = %hex_dump(&wk),
|
write_key = %hex_dump(&wk),
|
||||||
@@ -314,7 +323,7 @@ impl MePool {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let (encrypted_hs, mut write_iv) = cbc_encrypt_padded(&wk, &wi, &hs_frame)?;
|
let (encrypted_hs, write_iv) = cbc_encrypt_padded(&wk, &wi, &hs_frame)?;
|
||||||
if diag_level >= 1 {
|
if diag_level >= 1 {
|
||||||
info!(
|
info!(
|
||||||
hs_cipher = %hex_dump(&encrypted_hs),
|
hs_cipher = %hex_dump(&encrypted_hs),
|
||||||
@@ -328,6 +337,7 @@ impl MePool {
|
|||||||
let mut enc_buf = BytesMut::with_capacity(256);
|
let mut enc_buf = BytesMut::with_capacity(256);
|
||||||
let mut dec_buf = BytesMut::with_capacity(256);
|
let mut dec_buf = BytesMut::with_capacity(256);
|
||||||
let mut read_iv = ri;
|
let mut read_iv = ri;
|
||||||
|
let mut negotiated_crc_mode = RpcChecksumMode::Crc32;
|
||||||
let mut handshake_ok = false;
|
let mut handshake_ok = false;
|
||||||
|
|
||||||
while Instant::now() < deadline && !handshake_ok {
|
while Instant::now() < deadline && !handshake_ok {
|
||||||
@@ -375,17 +385,23 @@ impl MePool {
|
|||||||
let frame = dec_buf.split_to(fl);
|
let frame = dec_buf.split_to(fl);
|
||||||
let pe = fl - 4;
|
let pe = fl - 4;
|
||||||
let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap());
|
let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap());
|
||||||
let ac = crate::crypto::crc32(&frame[..pe]);
|
let ac = rpc_crc(RpcChecksumMode::Crc32, &frame[..pe]);
|
||||||
if ec != ac {
|
if ec != ac {
|
||||||
return Err(ProxyError::InvalidHandshake(format!(
|
return Err(ProxyError::InvalidHandshake(format!(
|
||||||
"HS CRC mismatch: 0x{ec:08x} vs 0x{ac:08x}"
|
"HS CRC mismatch: 0x{ec:08x} vs 0x{ac:08x}"
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
let hs_type = u32::from_le_bytes(frame[8..12].try_into().unwrap());
|
let hs_payload = &frame[8..pe];
|
||||||
|
if hs_payload.len() < 4 {
|
||||||
|
return Err(ProxyError::InvalidHandshake(
|
||||||
|
"Handshake payload too short".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let hs_type = u32::from_le_bytes(hs_payload[0..4].try_into().unwrap());
|
||||||
if hs_type == RPC_HANDSHAKE_ERROR_U32 {
|
if hs_type == RPC_HANDSHAKE_ERROR_U32 {
|
||||||
let err_code = if frame.len() >= 16 {
|
let err_code = if hs_payload.len() >= 8 {
|
||||||
i32::from_le_bytes(frame[12..16].try_into().unwrap())
|
i32::from_le_bytes(hs_payload[4..8].try_into().unwrap())
|
||||||
} else {
|
} else {
|
||||||
-1
|
-1
|
||||||
};
|
};
|
||||||
@@ -393,11 +409,21 @@ impl MePool {
|
|||||||
"ME rejected handshake (error={err_code})"
|
"ME rejected handshake (error={err_code})"
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
if hs_type != RPC_HANDSHAKE_U32 {
|
let hs_flags = parse_handshake_flags(hs_payload)?;
|
||||||
|
if hs_flags & 0xff != 0 {
|
||||||
return Err(ProxyError::InvalidHandshake(format!(
|
return Err(ProxyError::InvalidHandshake(format!(
|
||||||
"Expected HANDSHAKE 0x{RPC_HANDSHAKE_U32:08x}, got 0x{hs_type:08x}"
|
"Unsupported handshake flags: 0x{hs_flags:08x}"
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
negotiated_crc_mode = if (hs_flags & requested_crc_mode.advertised_flags()) != 0 {
|
||||||
|
RpcChecksumMode::from_handshake_flags(hs_flags)
|
||||||
|
} else if (hs_flags & rpc_crypto_flags::USE_CRC32C) != 0 {
|
||||||
|
return Err(ProxyError::InvalidHandshake(format!(
|
||||||
|
"Peer negotiated unsupported CRC flags: 0x{hs_flags:08x}"
|
||||||
|
)));
|
||||||
|
} else {
|
||||||
|
RpcChecksumMode::Crc32
|
||||||
|
};
|
||||||
|
|
||||||
handshake_ok = true;
|
handshake_ok = true;
|
||||||
break;
|
break;
|
||||||
@@ -418,6 +444,7 @@ impl MePool {
|
|||||||
read_iv,
|
read_iv,
|
||||||
write_key: wk,
|
write_key: wk,
|
||||||
write_iv,
|
write_iv,
|
||||||
|
crc_mode: negotiated_crc_mode,
|
||||||
handshake_ms,
|
handshake_ms,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,13 +17,11 @@ use crate::network::IpFamily;
|
|||||||
use crate::protocol::constants::*;
|
use crate::protocol::constants::*;
|
||||||
|
|
||||||
use super::ConnRegistry;
|
use super::ConnRegistry;
|
||||||
use super::registry::{BoundConn, ConnMeta};
|
use super::registry::BoundConn;
|
||||||
use super::codec::{RpcWriter, WriterCommand};
|
use super::codec::{RpcWriter, WriterCommand};
|
||||||
use super::reader::reader_loop;
|
use super::reader::reader_loop;
|
||||||
use super::MeResponse;
|
|
||||||
const ME_ACTIVE_PING_SECS: u64 = 25;
|
const ME_ACTIVE_PING_SECS: u64 = 25;
|
||||||
const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
|
const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
|
||||||
const ME_KEEPALIVE_PAYLOAD_LEN: usize = 4;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct MeWriter {
|
pub struct MeWriter {
|
||||||
@@ -361,7 +359,6 @@ impl MePool {
|
|||||||
|
|
||||||
// Additional connections up to pool_size total (round-robin across DCs), staggered to de-phase lifecycles.
|
// Additional connections up to pool_size total (round-robin across DCs), staggered to de-phase lifecycles.
|
||||||
if self.me_warmup_stagger_enabled {
|
if self.me_warmup_stagger_enabled {
|
||||||
let mut delay_ms = 0u64;
|
|
||||||
for (dc, addrs) in dc_addrs.iter() {
|
for (dc, addrs) in dc_addrs.iter() {
|
||||||
for (ip, port) in addrs {
|
for (ip, port) in addrs {
|
||||||
if self.connection_count() >= pool_size {
|
if self.connection_count() >= pool_size {
|
||||||
@@ -369,7 +366,7 @@ impl MePool {
|
|||||||
}
|
}
|
||||||
let addr = SocketAddr::new(*ip, *port);
|
let addr = SocketAddr::new(*ip, *port);
|
||||||
let jitter = rand::rng().random_range(0..=self.me_warmup_step_jitter.as_millis() as u64);
|
let jitter = rand::rng().random_range(0..=self.me_warmup_step_jitter.as_millis() as u64);
|
||||||
delay_ms = delay_ms.saturating_add(self.me_warmup_step_delay.as_millis() as u64 + jitter);
|
let delay_ms = self.me_warmup_step_delay.as_millis() as u64 + jitter;
|
||||||
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
|
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
|
||||||
if let Err(e) = self.connect_one(addr, rng.as_ref()).await {
|
if let Err(e) = self.connect_one(addr, rng.as_ref()).await {
|
||||||
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed (staggered)");
|
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed (staggered)");
|
||||||
@@ -419,13 +416,12 @@ impl MePool {
|
|||||||
let draining = Arc::new(AtomicBool::new(false));
|
let draining = Arc::new(AtomicBool::new(false));
|
||||||
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
|
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
|
||||||
let tx_for_keepalive = tx.clone();
|
let tx_for_keepalive = tx.clone();
|
||||||
let keepalive_random = self.me_keepalive_payload_random;
|
|
||||||
let stats = self.stats.clone();
|
|
||||||
let mut rpc_writer = RpcWriter {
|
let mut rpc_writer = RpcWriter {
|
||||||
writer: hs.wr,
|
writer: hs.wr,
|
||||||
key: hs.write_key,
|
key: hs.write_key,
|
||||||
iv: hs.write_iv,
|
iv: hs.write_iv,
|
||||||
seq_no: 0,
|
seq_no: 0,
|
||||||
|
crc_mode: hs.crc_mode,
|
||||||
};
|
};
|
||||||
let cancel_wr = cancel.clone();
|
let cancel_wr = cancel.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
@@ -439,21 +435,6 @@ impl MePool {
|
|||||||
Some(WriterCommand::DataAndFlush(payload)) => {
|
Some(WriterCommand::DataAndFlush(payload)) => {
|
||||||
if rpc_writer.send_and_flush(&payload).await.is_err() { break; }
|
if rpc_writer.send_and_flush(&payload).await.is_err() { break; }
|
||||||
}
|
}
|
||||||
Some(WriterCommand::Keepalive) => {
|
|
||||||
let mut payload = [0u8; ME_KEEPALIVE_PAYLOAD_LEN];
|
|
||||||
if keepalive_random {
|
|
||||||
rand::rng().fill(&mut payload);
|
|
||||||
}
|
|
||||||
match rpc_writer.send_keepalive(payload).await {
|
|
||||||
Ok(()) => {
|
|
||||||
stats.increment_me_keepalive_sent();
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
stats.increment_me_keepalive_failed();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(WriterCommand::Close) | None => break,
|
Some(WriterCommand::Close) | None => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -476,7 +457,11 @@ impl MePool {
|
|||||||
let reg = self.registry.clone();
|
let reg = self.registry.clone();
|
||||||
let writers_arc = self.writers_arc();
|
let writers_arc = self.writers_arc();
|
||||||
let ping_tracker = self.ping_tracker.clone();
|
let ping_tracker = self.ping_tracker.clone();
|
||||||
|
let ping_tracker_reader = ping_tracker.clone();
|
||||||
let rtt_stats = self.rtt_stats.clone();
|
let rtt_stats = self.rtt_stats.clone();
|
||||||
|
let stats_reader = self.stats.clone();
|
||||||
|
let stats_ping = self.stats.clone();
|
||||||
|
let stats_keepalive = self.stats.clone();
|
||||||
let pool = Arc::downgrade(self);
|
let pool = Arc::downgrade(self);
|
||||||
let cancel_ping = cancel.clone();
|
let cancel_ping = cancel.clone();
|
||||||
let tx_ping = tx.clone();
|
let tx_ping = tx.clone();
|
||||||
@@ -496,12 +481,14 @@ impl MePool {
|
|||||||
hs.rd,
|
hs.rd,
|
||||||
hs.read_key,
|
hs.read_key,
|
||||||
hs.read_iv,
|
hs.read_iv,
|
||||||
|
hs.crc_mode,
|
||||||
reg.clone(),
|
reg.clone(),
|
||||||
BytesMut::new(),
|
BytesMut::new(),
|
||||||
BytesMut::new(),
|
BytesMut::new(),
|
||||||
tx.clone(),
|
tx.clone(),
|
||||||
ping_tracker.clone(),
|
ping_tracker_reader,
|
||||||
rtt_stats.clone(),
|
rtt_stats.clone(),
|
||||||
|
stats_reader,
|
||||||
writer_id,
|
writer_id,
|
||||||
degraded.clone(),
|
degraded.clone(),
|
||||||
cancel_reader_token.clone(),
|
cancel_reader_token.clone(),
|
||||||
@@ -542,7 +529,12 @@ impl MePool {
|
|||||||
p.extend_from_slice(&sent_id.to_le_bytes());
|
p.extend_from_slice(&sent_id.to_le_bytes());
|
||||||
{
|
{
|
||||||
let mut tracker = ping_tracker_ping.lock().await;
|
let mut tracker = ping_tracker_ping.lock().await;
|
||||||
|
let before = tracker.len();
|
||||||
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
|
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
|
||||||
|
let expired = before.saturating_sub(tracker.len());
|
||||||
|
if expired > 0 {
|
||||||
|
stats_ping.increment_me_keepalive_timeout_by(expired as u64);
|
||||||
|
}
|
||||||
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
|
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
|
||||||
}
|
}
|
||||||
ping_id = ping_id.wrapping_add(1);
|
ping_id = ping_id.wrapping_add(1);
|
||||||
@@ -565,18 +557,37 @@ impl MePool {
|
|||||||
if keepalive_enabled {
|
if keepalive_enabled {
|
||||||
let tx_keepalive = tx_for_keepalive;
|
let tx_keepalive = tx_for_keepalive;
|
||||||
let cancel_keepalive = cancel_keepalive_token;
|
let cancel_keepalive = cancel_keepalive_token;
|
||||||
|
let ping_tracker_keepalive = ping_tracker.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// Per-writer jittered start to avoid phase sync.
|
// Per-writer jittered start to avoid phase sync.
|
||||||
let jitter_cap_ms = keepalive_interval.as_millis() / 2;
|
let jitter_cap_ms = keepalive_interval.as_millis() / 2;
|
||||||
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
|
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
|
||||||
let initial_jitter_ms = rand::rng().random_range(0..=effective_jitter_ms as u64);
|
let initial_jitter_ms = rand::rng().random_range(0..=effective_jitter_ms as u64);
|
||||||
tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await;
|
tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await;
|
||||||
|
let mut ping_id: i64 = rand::random::<i64>();
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = cancel_keepalive.cancelled() => break,
|
_ = cancel_keepalive.cancelled() => break,
|
||||||
_ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))) => {}
|
_ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))) => {}
|
||||||
}
|
}
|
||||||
if tx_keepalive.send(WriterCommand::Keepalive).await.is_err() {
|
let sent_id = ping_id;
|
||||||
|
ping_id = ping_id.wrapping_add(1);
|
||||||
|
let mut p = Vec::with_capacity(12);
|
||||||
|
p.extend_from_slice(&RPC_PING_U32.to_le_bytes());
|
||||||
|
p.extend_from_slice(&sent_id.to_le_bytes());
|
||||||
|
{
|
||||||
|
let mut tracker = ping_tracker_keepalive.lock().await;
|
||||||
|
let before = tracker.len();
|
||||||
|
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
|
||||||
|
let expired = before.saturating_sub(tracker.len());
|
||||||
|
if expired > 0 {
|
||||||
|
stats_keepalive.increment_me_keepalive_timeout_by(expired as u64);
|
||||||
|
}
|
||||||
|
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
|
||||||
|
}
|
||||||
|
stats_keepalive.increment_me_keepalive_sent();
|
||||||
|
if tx_keepalive.send(WriterCommand::DataAndFlush(p)).await.is_err() {
|
||||||
|
stats_keepalive.increment_me_keepalive_failed();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,31 +10,33 @@ use tokio::sync::{Mutex, mpsc};
|
|||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, trace, warn};
|
use tracing::{debug, trace, warn};
|
||||||
|
|
||||||
use crate::crypto::{AesCbc, crc32};
|
use crate::crypto::AesCbc;
|
||||||
use crate::error::{ProxyError, Result};
|
use crate::error::{ProxyError, Result};
|
||||||
use crate::protocol::constants::*;
|
use crate::protocol::constants::*;
|
||||||
|
use crate::stats::Stats;
|
||||||
|
|
||||||
use super::codec::WriterCommand;
|
use super::codec::{RpcChecksumMode, WriterCommand, rpc_crc};
|
||||||
|
use super::registry::RouteResult;
|
||||||
use super::{ConnRegistry, MeResponse};
|
use super::{ConnRegistry, MeResponse};
|
||||||
|
|
||||||
pub(crate) async fn reader_loop(
|
pub(crate) async fn reader_loop(
|
||||||
mut rd: tokio::io::ReadHalf<TcpStream>,
|
mut rd: tokio::io::ReadHalf<TcpStream>,
|
||||||
dk: [u8; 32],
|
dk: [u8; 32],
|
||||||
mut div: [u8; 16],
|
mut div: [u8; 16],
|
||||||
|
crc_mode: RpcChecksumMode,
|
||||||
reg: Arc<ConnRegistry>,
|
reg: Arc<ConnRegistry>,
|
||||||
enc_leftover: BytesMut,
|
enc_leftover: BytesMut,
|
||||||
mut dec: BytesMut,
|
mut dec: BytesMut,
|
||||||
tx: mpsc::Sender<WriterCommand>,
|
tx: mpsc::Sender<WriterCommand>,
|
||||||
ping_tracker: Arc<Mutex<HashMap<i64, (Instant, u64)>>>,
|
ping_tracker: Arc<Mutex<HashMap<i64, (Instant, u64)>>>,
|
||||||
rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>,
|
rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>,
|
||||||
|
stats: Arc<Stats>,
|
||||||
_writer_id: u64,
|
_writer_id: u64,
|
||||||
degraded: Arc<AtomicBool>,
|
degraded: Arc<AtomicBool>,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut raw = enc_leftover;
|
let mut raw = enc_leftover;
|
||||||
let mut expected_seq: i32 = 0;
|
let mut expected_seq: i32 = 0;
|
||||||
let mut crc_errors = 0u32;
|
|
||||||
let mut seq_mismatch = 0u32;
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut tmp = [0u8; 16_384];
|
let mut tmp = [0u8; 16_384];
|
||||||
@@ -80,26 +82,28 @@ pub(crate) async fn reader_loop(
|
|||||||
let frame = dec.split_to(fl);
|
let frame = dec.split_to(fl);
|
||||||
let pe = fl - 4;
|
let pe = fl - 4;
|
||||||
let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap());
|
let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap());
|
||||||
if crc32(&frame[..pe]) != ec {
|
let actual_crc = rpc_crc(crc_mode, &frame[..pe]);
|
||||||
warn!("CRC mismatch in data frame");
|
if actual_crc != ec {
|
||||||
crc_errors += 1;
|
stats.increment_me_crc_mismatch();
|
||||||
if crc_errors > 3 {
|
warn!(
|
||||||
return Err(ProxyError::Proxy("Too many CRC mismatches".into()));
|
frame_len = fl,
|
||||||
}
|
expected_crc = format_args!("0x{ec:08x}"),
|
||||||
continue;
|
actual_crc = format_args!("0x{actual_crc:08x}"),
|
||||||
|
"CRC mismatch — CBC crypto desync, aborting ME connection"
|
||||||
|
);
|
||||||
|
return Err(ProxyError::Proxy("CRC mismatch (crypto desync)".into()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let seq_no = i32::from_le_bytes(frame[4..8].try_into().unwrap());
|
let seq_no = i32::from_le_bytes(frame[4..8].try_into().unwrap());
|
||||||
if seq_no != expected_seq {
|
if seq_no != expected_seq {
|
||||||
|
stats.increment_me_seq_mismatch();
|
||||||
warn!(seq_no, expected = expected_seq, "ME RPC seq mismatch");
|
warn!(seq_no, expected = expected_seq, "ME RPC seq mismatch");
|
||||||
seq_mismatch += 1;
|
return Err(ProxyError::SeqNoMismatch {
|
||||||
if seq_mismatch > 10 {
|
expected: expected_seq,
|
||||||
return Err(ProxyError::Proxy("Too many seq mismatches".into()));
|
got: seq_no,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
expected_seq = seq_no.wrapping_add(1);
|
|
||||||
} else {
|
|
||||||
expected_seq = expected_seq.wrapping_add(1);
|
expected_seq = expected_seq.wrapping_add(1);
|
||||||
}
|
|
||||||
|
|
||||||
let payload = &frame[8..pe];
|
let payload = &frame[8..pe];
|
||||||
if payload.len() < 4 {
|
if payload.len() < 4 {
|
||||||
@@ -116,7 +120,13 @@ pub(crate) async fn reader_loop(
|
|||||||
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
|
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
|
||||||
|
|
||||||
let routed = reg.route(cid, MeResponse::Data { flags, data }).await;
|
let routed = reg.route(cid, MeResponse::Data { flags, data }).await;
|
||||||
if !routed {
|
if !matches!(routed, RouteResult::Routed) {
|
||||||
|
match routed {
|
||||||
|
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
|
||||||
|
RouteResult::ChannelClosed => stats.increment_me_route_drop_channel_closed(),
|
||||||
|
RouteResult::QueueFull => stats.increment_me_route_drop_queue_full(),
|
||||||
|
RouteResult::Routed => {}
|
||||||
|
}
|
||||||
reg.unregister(cid).await;
|
reg.unregister(cid).await;
|
||||||
send_close_conn(&tx, cid).await;
|
send_close_conn(&tx, cid).await;
|
||||||
}
|
}
|
||||||
@@ -126,7 +136,13 @@ pub(crate) async fn reader_loop(
|
|||||||
trace!(cid, cfm, "RPC_SIMPLE_ACK");
|
trace!(cid, cfm, "RPC_SIMPLE_ACK");
|
||||||
|
|
||||||
let routed = reg.route(cid, MeResponse::Ack(cfm)).await;
|
let routed = reg.route(cid, MeResponse::Ack(cfm)).await;
|
||||||
if !routed {
|
if !matches!(routed, RouteResult::Routed) {
|
||||||
|
match routed {
|
||||||
|
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
|
||||||
|
RouteResult::ChannelClosed => stats.increment_me_route_drop_channel_closed(),
|
||||||
|
RouteResult::QueueFull => stats.increment_me_route_drop_queue_full(),
|
||||||
|
RouteResult::Routed => {}
|
||||||
|
}
|
||||||
reg.unregister(cid).await;
|
reg.unregister(cid).await;
|
||||||
send_close_conn(&tx, cid).await;
|
send_close_conn(&tx, cid).await;
|
||||||
}
|
}
|
||||||
@@ -152,6 +168,7 @@ pub(crate) async fn reader_loop(
|
|||||||
}
|
}
|
||||||
} else if pt == RPC_PONG_U32 && body.len() >= 8 {
|
} else if pt == RPC_PONG_U32 && body.len() >= 8 {
|
||||||
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
|
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
|
||||||
|
stats.increment_me_keepalive_pong();
|
||||||
if let Some((sent, wid)) = {
|
if let Some((sent, wid)) = {
|
||||||
let mut guard = ping_tracker.lock().await;
|
let mut guard = ping_tracker.lock().await;
|
||||||
guard.remove(&ping_id)
|
guard.remove(&ping_id)
|
||||||
|
|||||||
@@ -1,13 +1,23 @@
|
|||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use tokio::sync::{mpsc, Mutex, RwLock};
|
use tokio::sync::{mpsc, RwLock};
|
||||||
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
|
|
||||||
use super::codec::WriterCommand;
|
use super::codec::WriterCommand;
|
||||||
use super::MeResponse;
|
use super::MeResponse;
|
||||||
|
|
||||||
|
const ROUTE_CHANNEL_CAPACITY: usize = 4096;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub enum RouteResult {
|
||||||
|
Routed,
|
||||||
|
NoConn,
|
||||||
|
ChannelClosed,
|
||||||
|
QueueFull,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct ConnMeta {
|
pub struct ConnMeta {
|
||||||
pub target_dc: i16,
|
pub target_dc: i16,
|
||||||
@@ -64,7 +74,7 @@ impl ConnRegistry {
|
|||||||
|
|
||||||
pub async fn register(&self) -> (u64, mpsc::Receiver<MeResponse>) {
|
pub async fn register(&self) -> (u64, mpsc::Receiver<MeResponse>) {
|
||||||
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
|
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
|
||||||
let (tx, rx) = mpsc::channel(1024);
|
let (tx, rx) = mpsc::channel(ROUTE_CHANNEL_CAPACITY);
|
||||||
self.inner.write().await.map.insert(id, tx);
|
self.inner.write().await.map.insert(id, tx);
|
||||||
(id, rx)
|
(id, rx)
|
||||||
}
|
}
|
||||||
@@ -83,12 +93,16 @@ impl ConnRegistry {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn route(&self, id: u64, resp: MeResponse) -> bool {
|
pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult {
|
||||||
let inner = self.inner.read().await;
|
let inner = self.inner.read().await;
|
||||||
if let Some(tx) = inner.map.get(&id) {
|
if let Some(tx) = inner.map.get(&id) {
|
||||||
tx.try_send(resp).is_ok()
|
match tx.try_send(resp) {
|
||||||
|
Ok(()) => RouteResult::Routed,
|
||||||
|
Err(TrySendError::Closed(_)) => RouteResult::ChannelClosed,
|
||||||
|
Err(TrySendError::Full(_)) => RouteResult::QueueFull,
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
false
|
RouteResult::NoConn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
//! TCP Socket Configuration
|
//! TCP Socket Configuration
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::fs;
|
||||||
use std::io::Result;
|
use std::io::Result;
|
||||||
use std::net::{SocketAddr, IpAddr};
|
use std::net::{SocketAddr, IpAddr};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -234,6 +236,133 @@ pub fn create_listener(addr: SocketAddr, options: &ListenOptions) -> Result<Sock
|
|||||||
Ok(socket)
|
Ok(socket)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Best-effort process list for listeners occupying the same local TCP port.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ListenerProcessInfo {
|
||||||
|
pub pid: u32,
|
||||||
|
pub process: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Find processes currently listening on the local TCP port of `addr`.
|
||||||
|
/// Returns an empty list when unsupported or when no owners can be resolved.
|
||||||
|
pub fn find_listener_processes(addr: SocketAddr) -> Vec<ListenerProcessInfo> {
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
{
|
||||||
|
find_listener_processes_linux(addr)
|
||||||
|
}
|
||||||
|
#[cfg(not(target_os = "linux"))]
|
||||||
|
{
|
||||||
|
let _ = addr;
|
||||||
|
Vec::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
fn find_listener_processes_linux(addr: SocketAddr) -> Vec<ListenerProcessInfo> {
|
||||||
|
let inodes = listening_inodes_for_port(addr);
|
||||||
|
if inodes.is_empty() {
|
||||||
|
return Vec::new();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut out = Vec::new();
|
||||||
|
|
||||||
|
let proc_entries = match fs::read_dir("/proc") {
|
||||||
|
Ok(entries) => entries,
|
||||||
|
Err(_) => return out,
|
||||||
|
};
|
||||||
|
|
||||||
|
for entry in proc_entries.flatten() {
|
||||||
|
let pid = match entry.file_name().to_string_lossy().parse::<u32>() {
|
||||||
|
Ok(pid) => pid,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let fd_dir = entry.path().join("fd");
|
||||||
|
let fd_entries = match fs::read_dir(fd_dir) {
|
||||||
|
Ok(entries) => entries,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut matched = false;
|
||||||
|
for fd in fd_entries.flatten() {
|
||||||
|
let link_target = match fs::read_link(fd.path()) {
|
||||||
|
Ok(link) => link,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let link_str = link_target.to_string_lossy();
|
||||||
|
let Some(rest) = link_str.strip_prefix("socket:[") else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let Some(inode_str) = rest.strip_suffix(']') else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let Ok(inode) = inode_str.parse::<u64>() else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
if inodes.contains(&inode) {
|
||||||
|
matched = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if matched {
|
||||||
|
let process = fs::read_to_string(entry.path().join("comm"))
|
||||||
|
.ok()
|
||||||
|
.map(|s| s.trim().to_string())
|
||||||
|
.filter(|s| !s.is_empty())
|
||||||
|
.unwrap_or_else(|| "unknown".to_string());
|
||||||
|
out.push(ListenerProcessInfo { pid, process });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
out.sort_by_key(|p| p.pid);
|
||||||
|
out.dedup_by_key(|p| p.pid);
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
fn listening_inodes_for_port(addr: SocketAddr) -> HashSet<u64> {
|
||||||
|
let path = match addr {
|
||||||
|
SocketAddr::V4(_) => "/proc/net/tcp",
|
||||||
|
SocketAddr::V6(_) => "/proc/net/tcp6",
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut inodes = HashSet::new();
|
||||||
|
let Ok(data) = fs::read_to_string(path) else {
|
||||||
|
return inodes;
|
||||||
|
};
|
||||||
|
|
||||||
|
for line in data.lines().skip(1) {
|
||||||
|
let cols: Vec<&str> = line.split_whitespace().collect();
|
||||||
|
if cols.len() < 10 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// LISTEN state in /proc/net/tcp*
|
||||||
|
if cols[3] != "0A" {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some(port_hex) = cols[1].split(':').nth(1) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let Ok(port) = u16::from_str_radix(port_hex, 16) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
if port != addr.port() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Ok(inode) = cols[9].parse::<u64>() {
|
||||||
|
inodes.insert(inode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
inodes
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|||||||
@@ -24,6 +24,8 @@ const NUM_DCS: usize = 5;
|
|||||||
|
|
||||||
/// Timeout for individual DC ping attempt
|
/// Timeout for individual DC ping attempt
|
||||||
const DC_PING_TIMEOUT_SECS: u64 = 5;
|
const DC_PING_TIMEOUT_SECS: u64 = 5;
|
||||||
|
/// Timeout for direct TG DC TCP connect readiness.
|
||||||
|
const DIRECT_CONNECT_TIMEOUT_SECS: u64 = 10;
|
||||||
|
|
||||||
// ============= RTT Tracking =============
|
// ============= RTT Tracking =============
|
||||||
|
|
||||||
@@ -375,7 +377,16 @@ impl UpstreamManager {
|
|||||||
let std_stream: std::net::TcpStream = socket.into();
|
let std_stream: std::net::TcpStream = socket.into();
|
||||||
let stream = TcpStream::from_std(std_stream)?;
|
let stream = TcpStream::from_std(std_stream)?;
|
||||||
|
|
||||||
stream.writable().await?;
|
let connect_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS);
|
||||||
|
match tokio::time::timeout(connect_timeout, stream.writable()).await {
|
||||||
|
Ok(Ok(())) => {}
|
||||||
|
Ok(Err(e)) => return Err(ProxyError::Io(e)),
|
||||||
|
Err(_) => {
|
||||||
|
return Err(ProxyError::ConnectionTimeout {
|
||||||
|
addr: target.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
if let Some(e) = stream.take_error()? {
|
if let Some(e) = stream.take_error()? {
|
||||||
return Err(ProxyError::Io(e));
|
return Err(ProxyError::Io(e));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user