Compare commits

..

54 Commits

Author SHA1 Message Date
Alexey
869d1429ac Merge pull request #209 from telemt/flow
ME Pool + ME Hotpath + ME Buffers tuning
2026-02-23 04:05:25 +03:00
Alexey
eaba926fe5 ME Buffer reuse + Bytes Len over Full + Seq-no over Wrap-add
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-23 03:52:37 +03:00
Alexey
536e6417a0 Update Cargo.toml 2026-02-23 03:48:40 +03:00
Alexey
ecad96374a ME Pool tuning
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-23 03:41:51 +03:00
Alexey
4895217828 Bounded backpressure + Semaphore Globalgate +
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-23 03:32:06 +03:00
Alexey
d0a8d31c3c Update README.md 2026-02-23 03:27:58 +03:00
Alexey
4d83cc1f04 Merge branch 'flow' of https://github.com/telemt/telemt into flow 2026-02-23 03:20:28 +03:00
Alexey
c4c91863f0 Middle-End tuning
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-23 03:20:13 +03:00
Alexey
aae3e2665e Merge pull request #208 from telemt/flow
Middle-End protocol hardening
2026-02-23 02:51:01 +03:00
Alexey
a5c7a41c49 Update types.rs 2026-02-23 02:48:03 +03:00
Alexey
7cc78a5746 Update types.rs 2026-02-23 02:45:16 +03:00
Alexey
cf96e686d1 Update Cargo.toml 2026-02-23 02:41:54 +03:00
Alexey
d4d867156a Secure Payload length fixes 2026-02-23 02:38:25 +03:00
Alexey
8c1d66a03e Update Cargo.toml 2026-02-23 02:32:13 +03:00
Alexey
6ff29e43d3 Middle-End protocol hardening
- Secure framing / hot-path fix: enforced a single length + padding contract across the framing layer. Replaced legacy runtime `len % 4` recovery with strict validation to eliminate undefined behavior paths.

- ME RPC aligned with C reference contract: handshake now includes `flags + sender_pid + peer_pid`. Added negotiated CRC mode (CRC32 / CRC32C) and applied the negotiated mode consistently in read/write paths.

- Sequence fail-fast semantics: immediate connection termination on first sequence mismatch with dedicated counter increment.

- Keepalive reworked to RPC ping/pong: removed raw CBC keepalive frames. Introduced stale ping tracker with proper timeout accounting.

- Route/backpressure observability improvements: increased per-connection route queue to 4096. Added `RouteResult` with explicit failure reasons (NoConn, ChannelClosed, QueueFull) and per-reason counters.

- Direct-DC secure mode-gate relaxation: removed TLS/secure conflict in Direct-DC handshake path.
2026-02-23 02:28:00 +03:00
Alexey
208020817a Update AGENTS_SYSTEM_PROMT.md 2026-02-23 01:51:50 +03:00
Alexey
6864f49292 Merge pull request #207 from telemt/neurosl0pe
Update AGENTS_SYSTEM_PROMT.md
2026-02-23 01:27:45 +03:00
Alexey
726fb77ccc Update AGENTS_SYSTEM_PROMT.md 2026-02-23 01:27:27 +03:00
Alexey
69be44b2b6 Merge pull request #206 from telemt/flow
Flush on Response + Hotpath tunings + Reuseport Checker
2026-02-23 01:03:15 +03:00
Alexey
07ca94ce57 Reuseport Checker 2026-02-23 00:55:47 +03:00
Alexey
d050c4794a Hotpath tunings
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-23 00:50:10 +03:00
Alexey
197f9867e0 Flush-response experiments
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-22 23:53:10 +03:00
Alexey
78dfc2bc39 Merge pull request #205 from axemanofic/feature/build-and-push-docker
Add docker-image in ghrc
2026-02-22 16:45:10 +03:00
Alexey
fcf37a1a69 Merge pull request #203 from Dimasssss/main
Moving parameters from config.toml to the code
2026-02-22 16:36:12 +03:00
Roman Sotnikov
cc9e71a737 fix: fix push image to telemt 2026-02-22 16:29:04 +03:00
Roman Sotnikov
eb96fcbf76 fix: fix push image to telemt 2026-02-22 16:17:44 +03:00
Roman Sotnikov
ad167f9b1a style(yaml): fix formating for build-push-docker 2026-02-22 15:55:30 +03:00
Roman Sotnikov
df7bd39f25 style(yaml): fix formating for build-push-docker 2026-02-22 15:53:31 +03:00
Roman Sotnikov
f4c047748d feat: add gh docker-image 2026-02-22 15:42:57 +03:00
Dimasssss
c5f5b43494 Update README.md 2026-02-22 01:24:50 +03:00
Dimasssss
b2aaf404e1 Add files via upload 2026-02-22 01:19:26 +03:00
Alexey
d552ae84d0 Merge pull request #200 from telemt/flow
ME Connection lost fixes
2026-02-21 16:31:49 +03:00
Alexey
3ab56f55e9 ME Connection error handling 2026-02-21 16:28:47 +03:00
Alexey
06d2cdef78 ME Connection lost fixes 2026-02-21 16:12:19 +03:00
Alexey
1be4422431 Merge pull request #199 from telemt/axkurcom-patch-1
Update Cargo.toml
2026-02-21 14:11:34 +03:00
Alexey
3d3428ad4d Update Cargo.toml 2026-02-21 14:11:12 +03:00
Alexey
eaff96b8c1 Merge pull request #198 from telemt/flow
Peer - Connection closed fixes
2026-02-21 14:09:05 +03:00
Alexey
7bf6f3e071 Merge pull request #195 from ivulit/fix/mask-host-tls-emulation
Use mask_host for TLS emulation fetcher
2026-02-21 13:58:38 +03:00
Alexey
c3ebb42120 Peer - Connection closed fixes 2026-02-21 13:56:24 +03:00
Alexey
8d93695194 Merge pull request #196 from telemt/axkurcom-patch-1
Update Cargo.toml
2026-02-21 13:21:00 +03:00
Alexey
40711fda09 Update Cargo.toml 2026-02-21 13:20:44 +03:00
ivulit
6ce25c6600 Use mask_host for TLS emulation fetcher 2026-02-21 10:40:59 +03:00
Alexey
1a525f7d29 Merge pull request #191 from Dimasssss/patch-1
Update config.toml
2026-02-21 05:10:25 +03:00
Alexey
2dcbdbe302 Merge pull request #194 from telemt/flow
ME Frame too large Fixes
2026-02-21 05:04:42 +03:00
Alexey
1bd495a224 Fixed tests 2026-02-21 04:04:49 +03:00
Alexey
b0e6c04c54 Merge pull request #193 from artemws/main
Fix config reload for Docker
2026-02-21 03:37:48 +03:00
Alexey
d5a7882ad1 Merge pull request #190 from vladon/feature/socks-hostname-support
feat: add hostname support for SOCKS4/SOCKS5 upstream proxies
2026-02-21 03:36:58 +03:00
Alexey
83fc9d6db3 Middle-End Fixes
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-21 03:36:13 +03:00
Alexey
c9a043d8d5 ME Frame too large Fixes
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-02-21 02:15:10 +03:00
artemws
a74bdf8aea Update hot_reload.rs 2026-02-20 23:03:26 +02:00
Dimasssss
94e9bfbbb9 Update config.toml 2026-02-20 22:23:16 +03:00
Dimasssss
18c1444904 Update config.toml 2026-02-20 22:04:56 +03:00
Dimasssss
3b89c1ce7e Update config.toml
user_expirations
2026-02-20 22:02:34 +03:00
Vladislav Yaroslavlev
100cb92ad1 feat: add hostname support for SOCKS4/SOCKS5 upstream proxies
Previously, SOCKS proxy addresses only accepted IP:port format.
Now both IP:port and hostname:port formats are supported.

Changes:
- Try parsing as SocketAddr first (IP:port) for backward compatibility
- Fall back to tokio::net::TcpStream::connect() for hostname resolution
- Log warning if interface binding is specified with hostname (not supported)

Example usage:
[[upstreams]]
type = "socks5"
address = "proxy.example.com:1080"
username = "user"
password = "pass"
2026-02-20 21:42:15 +03:00
36 changed files with 1935 additions and 721 deletions

View File

@@ -3,8 +3,8 @@ name: Release
on:
push:
tags:
- '[0-9]+.[0-9]+.[0-9]+' # Matches tags like 3.0.0, 3.1.2, etc.
workflow_dispatch: # Manual trigger from GitHub Actions UI
- '[0-9]+.[0-9]+.[0-9]+' # Matches tags like 3.0.0, 3.1.2, etc.
workflow_dispatch: # Manual trigger from GitHub Actions UI
permissions:
contents: read
@@ -84,6 +84,32 @@ jobs:
target/${{ matrix.target }}/release/${{ matrix.asset_name }}.tar.gz
target/${{ matrix.target }}/release/${{ matrix.asset_name }}.sha256
build-docker-image:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to GitHub Container Registry
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.TOKEN_GH_DEPLOY }}
- name: Build and push
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: ${{ github.ref }}
release:
name: Create Release
needs: build
@@ -108,17 +134,17 @@ jobs:
# Extract version from tag (remove 'v' prefix if present)
VERSION="${GITHUB_REF#refs/tags/}"
VERSION="${VERSION#v}"
# Install cargo-edit for version bumping
cargo install cargo-edit
# Update Cargo.toml version
cargo set-version "$VERSION"
# Configure git
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
# Commit and push changes
#git add Cargo.toml Cargo.lock
#git commit -m "chore: bump version to $VERSION" || echo "No changes to commit"

View File

@@ -1,6 +1,7 @@
## System Prompt — Production Rust Codebase: Modification and Architecture Guidelines
You are a senior Rust systems engineer acting as a strict code reviewer and implementation partner. Your responses are precise, minimal, and architecturally sound. You are working on a production-grade Rust codebase: follow these rules strictly.
You are a senior Rust Engineer and pricipal Rust Architect acting as a strict code reviewer and implementation partner.
Your responses are precise, minimal, and architecturally sound. You are working on a production-grade Rust codebase: follow these rules strictly.
---
@@ -32,6 +33,11 @@ The user can override this behavior with explicit commands:
- `"Make minimal changes"` — no coordinated fixes, narrowest possible diff.
- `"Fix everything"` — apply all coordinated fixes and out-of-scope observations.
### Core Rule
The codebase must never enter an invalid intermediate state.
No response may leave the repository in a condition that requires follow-up fixes.
---
### 1. Comments and Documentation
@@ -131,16 +137,32 @@ You MUST:
- Document non-obvious logic with comments that describe *why*, not *what*.
- Limit changes strictly to the requested scope (plus coordinated fixes per Section 0).
- Keep all existing symbol names unless renaming is explicitly requested.
- Preserve global formatting as-is.
- Preserve global formatting as-is
- Result every modification in a self-contained, compilable, runnable state of the codebase
You MUST NOT:
- Use placeholders: no `// ... rest of code`, no `// implement here`, no `/* TODO */` stubs that replace existing working code. Write full, working implementation. If the implementation is unclear, ask first.
- Refactor code outside the requested scope.
- Make speculative improvements.
- Use placeholders: no `// ... rest of code`, no `// implement here`, no `/* TODO */` stubs that replace existing working code. Write full, working implementation. If the implementation is unclear, ask first
- Refactor code outside the requested scope
- Make speculative improvements
- Spawn multiple agents for EDITING
- Produce partial changes
- Introduce references to entities that are not yet implemented
- Leave TODO placeholders in production paths
Note: `todo!()` and `unimplemented!()` are allowed as idiomatic Rust markers for genuinely unfinished code paths.
Every change must:
- compile,
- pass type checks,
- have no broken imports,
- preserve invariants,
- not rely on future patches.
If the task requires multiple phases:
- either implement all required phases,
- or explicitly refuse and explain missing dependencies.
---
### 8. Decision Process for Complex Changes
@@ -160,6 +182,7 @@ When facing a non-trivial modification, follow this sequence:
- When provided with partial code, assume the rest of the codebase exists and functions correctly unless stated otherwise.
- Reference existing types, functions, and module structures by their actual names as shown in the provided code.
- When the provided context is insufficient to make a safe change, request the missing context explicitly.
- Spawn multiple agents for SEARCHING information, code, functions
---
@@ -167,14 +190,14 @@ When facing a non-trivial modification, follow this sequence:
#### Language Policy
- Code, comments, commit messages, documentation: **English**.
- Reasoning and explanations in response text: **Russian**.
- Code, comments, commit messages, documentation ONLY ON **English**!
- Reasoning and explanations in response text on language from promt
#### Response Structure
Your response MUST consist of two sections:
**Section 1: `## Reasoning` (in Russian)**
**Section 1: `## Reasoning`**
- What needs to be done and why.
- Which files and modules are affected.
@@ -205,3 +228,183 @@ If the response exceeds the output limit:
2. List the files that will be provided in subsequent parts.
3. Wait for user confirmation before continuing.
4. No single file may be split across parts.
## 11. Anti-LLM Degeneration Safeguards (Principal-Paranoid, Visionary)
This section exists to prevent common LLM failure modes: scope creep, semantic drift, cargo-cult refactors, performance regressions, contract breakage, and hidden behavior changes.
### 11.1 Non-Negotiable Invariants
- **No semantic drift:** Do not reinterpret requirements, rename concepts, or change meaning of existing terms.
- **No “helpful refactors”:** Any refactor not explicitly requested is forbidden.
- **No architectural drift:** Do not introduce new layers, patterns, abstractions, or “clean architecture” migrations unless requested.
- **No dependency drift:** Do not add crates, features, or versions unless explicitly requested.
- **No behavior drift:** If a change could alter runtime behavior, you MUST call it out explicitly in `## Reasoning` and justify it.
### 11.2 Minimal Surface Area Rule
- Touch the smallest number of files possible.
- Prefer local changes over cross-cutting edits.
- Do not “align style” across a file/module—only adjust the modified region.
- Do not reorder items, imports, or code unless required for correctness.
### 11.3 No Implicit Contract Changes
Contracts include:
- public APIs, trait bounds, visibility, error types, timeouts/retries, logging semantics, metrics semantics,
- protocol formats, framing, padding, keepalive cadence, state machine transitions,
- concurrency guarantees, cancellation behavior, backpressure behavior.
Rule:
- If you change a contract, you MUST update all dependents in the same patch AND document the contract delta explicitly.
### 11.4 Hot-Path Preservation (Performance Paranoia)
- Do not introduce extra allocations, cloning, or formatting in hot paths.
- Do not add logging/metrics on hot paths unless requested.
- Do not add new locks or broaden lock scope.
- Prefer `&str` / slices / borrowed data where the codebase already does so.
- Avoid `String` building for errors/logs if it changes current patterns.
If you cannot prove performance neutrality, label it as risk in `## Reasoning`.
### 11.5 Async / Concurrency Safety (Cancellation & Backpressure)
- No blocking calls inside async contexts.
- Preserve cancellation safety: do not introduce `await` between lock acquisition and critical invariants unless already present.
- Preserve backpressure: do not replace bounded channels with unbounded, do not remove flow control.
- Do not change task lifecycle semantics (spawn patterns, join handles, shutdown order) unless requested.
- Do not introduce `tokio::spawn` / background tasks unless explicitly requested.
### 11.6 Error Semantics Integrity
- Do not replace structured errors with generic strings.
- Do not widen/narrow error types or change error categories without explicit approval.
- Avoid introducing panics in production paths (`unwrap`, `expect`) unless the codebase already treats that path as impossible and documented.
### 11.7 “No New Abstractions” Default
Default stance:
- No new traits, generics, macros, builder patterns, type-level cleverness, or “frameworking”.
- If abstraction is necessary, prefer the smallest possible local helper (private function) and justify it.
### 11.8 Negative-Diff Protection
Avoid “diff inflation” patterns:
- mass edits,
- moving code between files,
- rewrapping long lines,
- rearranging module order,
- renaming for aesthetics.
If a diff becomes large, STOP and ask before proceeding.
### 11.9 Consistency with Existing Style (But Not Style Refactors)
- Follow existing conventions of the touched module (naming, error style, return patterns).
- Do not enforce global “best practices” that the codebase does not already use.
### 11.10 Two-Phase Safety Gate (Plan → Patch)
For non-trivial changes:
1) Provide a micro-plan (15 bullets): what files, what functions, what invariants, what risks.
2) Implement exactly that plan—no extra improvements.
### 11.11 Pre-Response Checklist (Hard Gate)
Before final output, verify internally:
- No unresolved symbols / broken imports.
- No partially updated call sites.
- No new public surface changes unless requested.
- No transitional states / TODO placeholders replacing working code.
- Changes are atomic: the repository remains buildable and runnable.
- Any behavior change is explicitly stated.
If any check fails: fix it before responding.
### 11.12 Truthfulness Policy (No Hallucinated Claims)
- Do not claim “this compiles” or “tests pass” unless you actually verified with the available tooling/context.
- If verification is not possible, state: “Not executed; reasoning-based consistency check only.”
### 11.13 Visionary Guardrail: Preserve Optionality
When multiple valid designs exist, prefer the one that:
- minimally constrains future evolution,
- preserves existing extension points,
- avoids locking the project into a new paradigm,
- keeps interfaces stable and implementation local.
Default to reversible changes.
### 11.14 Stop Conditions
STOP and ask targeted questions if:
- required context is missing,
- a change would cross module boundaries,
- a contract might change,
- concurrency/protocol invariants are unclear,
- the diff is growing beyond a minimal patch.
No guessing.
### 12. Invariant Preservation
You MUST explicitly preserve:
- Thread-safety guarantees (`Send` / `Sync` expectations).
- Memory safety assumptions (no hidden `unsafe` expansions).
- Lock ordering and deadlock invariants.
- State machine correctness (no new invalid transitions).
- Backward compatibility of serialized formats (if applicable).
If a change touches concurrency, networking, protocol logic, or state machines,
you MUST explain why existing invariants remain valid.
### 13. Error Handling Policy
- Do not replace structured errors with generic strings.
- Preserve existing error propagation semantics.
- Do not widen or narrow error types without approval.
- Avoid introducing panics in production paths.
- Prefer explicit error mapping over implicit conversions.
### 14. Test Safety
- Do not modify existing tests unless the task explicitly requires it.
- Do not weaken assertions.
- Preserve determinism in testable components.
### 15. Security Constraints
- Do not weaken cryptographic assumptions.
- Do not modify key derivation logic without explicit request.
- Do not change constant-time behavior.
- Do not introduce logging of secrets.
- Preserve TLS/MTProto protocol correctness.
### 16. Logging Policy
- Do not introduce excessive logging in hot paths.
- Do not log sensitive data.
- Preserve existing log levels and style.
### 17. Pre-Response Verification Checklist
Before producing the final answer, verify internally:
- The change compiles conceptually.
- No unresolved symbols exist.
- All modified call sites are updated.
- No accidental behavioral changes were introduced.
- Architectural boundaries remain intact.
### 18. Atomic Change Principle
Every patch must be **atomic and production-safe**.
* **Self-contained** — no dependency on future patches or unimplemented components.
* **Build-safe** — the project must compile successfully after the change.
* **Contract-consistent** — no partial interface or behavioral changes; all dependent code must be updated within the same patch.
* **No transitional states** — no placeholders, incomplete refactors, or temporary inconsistencies.
**Invariant:** After any single patch, the repository remains fully functional and buildable.

View File

@@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.0.6"
version = "3.0.10"
edition = "2024"
[dependencies]
@@ -20,6 +20,7 @@ sha1 = "0.10"
md-5 = "0.10"
hmac = "0.12"
crc32fast = "1.4"
crc32c = "0.6"
zeroize = { version = "1.8", features = ["derive"] }
# Network

206
README.md
View File

@@ -10,41 +10,77 @@
### 🇷🇺 RU
18 февраля мы опубликовали `telemt 3.0.3`, он имеет:
#### Драфтинг LTS и текущие улучшения
- улучшенный механизм Middle-End Health Check
- высокоскоростное восстановление инициализации Middle-End
- меньше задержек на hot-path
- более корректную работу в Dualstack, а именно - IPv6 Middle-End
- аккуратное переподключение клиента без дрифта сессий между Middle-End
- автоматическая деградация на Direct-DC при массовой (>2 ME-DC-групп) недоступности Middle-End
- автодетект IP за NAT, при возможности - будет выполнен хендшейк с ME, при неудаче - автодеградация
- единственный известный специальный DC=203 уже добавлен в код: медиа загружаются с CDN в Direct-DC режиме
С 21 февраля мы начали подготовку LTS-версии.
[Здесь вы можете найти релиз](https://github.com/telemt/telemt/releases/tag/3.0.3)
Мы внимательно анализируем весь доступный фидбек.
Наша цель — сделать LTS-кандидаты максимально стабильными, тщательно отлаженными и готовыми к long-run и highload production-сценариям.
Если у вас есть компетенции в асинхронных сетевых приложениях, анализе трафика, реверс-инжиниринге или сетевых расследованиях - мы открыты к идеям и pull requests!
---
#### Улучшения от 23 февраля
23 февраля были внесены улучшения производительности в режимах **DC** и **Middle-End (ME)**, с акцентом на обратный канал (путь клиент → DC / ME).
Дополнительно реализован ряд изменений, направленных на повышение устойчивости системы:
- Смягчение сетевой нестабильности
- Повышение устойчивости к десинхронизации криптографии
- Снижение дрейфа сессий при неблагоприятных условиях
- Улучшение обработки ошибок в edge-case транспортных сценариях
Релиз:
[3.0.9](https://github.com/telemt/telemt/releases/tag/3.0.9)
---
Если у вас есть компетенции в:
- Асинхронных сетевых приложениях
- Анализе трафика
- Реверс-инжиниринге
- Сетевых расследованиях
Мы открыты к архитектурным предложениям, идеям и pull requests
</td>
<td width="50%" valign="top">
### 🇬🇧 EN
On February 18, we released `telemt 3.0.3`. This version introduces:
#### LTS Drafting and Ongoing Improvements
- improved Middle-End Health Check method
- high-speed recovery of Middle-End init
- reduced latency on the hot path
- correct Dualstack support: proper handling of IPv6 Middle-End
- *clean* client reconnection without session "drift" between Middle-End
- automatic degradation to Direct-DC mode in case of large-scale (>2 ME-DC groups) Middle-End unavailability
- automatic public IP detection behind NAT; first - Middle-End handshake is performed, otherwise automatic degradation is applied
- known special DC=203 is now handled natively: media is delivered from the CDN via Direct-DC mode
Starting February 21, we began drafting the upcoming LTS version.
[Release is available here](https://github.com/telemt/telemt/releases/tag/3.0.3)
We are carefully reviewing and analyzing all available feedback.
The goal is to ensure that LTS candidates are максимально stable, thoroughly debugged, and ready for long-run and high-load production scenarios.
If you have expertise in asynchronous network applications, traffic analysis, reverse engineering, or network forensics - we welcome ideas and pull requests!
---
#### February 23 Improvements
On February 23, we introduced performance improvements for both **DC** and **Middle-End (ME)** modes, specifically optimizing the reverse channel (client → DC / ME data path).
Additionally, we implemented a set of robustness enhancements designed to:
- Mitigate network-related instability
- Improve resilience against cryptographic desynchronization
- Reduce session drift under adverse conditions
- Improve error handling in edge-case transport scenarios
Release:
[3.0.9](https://github.com/telemt/telemt/releases/tag/3.0.9)
---
If you have expertise in:
- Asynchronous network applications
- Traffic analysis
- Reverse engineering
- Network forensics
We welcome ideas, architectural feedback, and pull requests.
</td>
</tr>
</table>
@@ -178,147 +214,21 @@ then Ctrl+X -> Y -> Enter to save
```toml
# === General Settings ===
[general]
fast_mode = true
use_middle_proxy = true
# ad_tag = "00000000000000000000000000000000"
# Path to proxy-secret binary (auto-downloaded if missing).
proxy_secret_path = "proxy-secret"
# disable_colors = false # Disable colored output in logs (useful for files/systemd)
# === Log Level ===
# Log level: debug | verbose | normal | silent
# Can be overridden with --silent or --log-level CLI flags
# RUST_LOG env var takes absolute priority over all of these
log_level = "normal"
# === Middle Proxy - ME ===
# Public IP override for ME KDF when behind NAT; leave unset to auto-detect.
# middle_proxy_nat_ip = "203.0.113.10"
# Enable STUN probing to discover public IP:port for ME.
middle_proxy_nat_probe = true
# Primary STUN server (host:port); defaults to Telegram STUN when empty.
middle_proxy_nat_stun = "stun.l.google.com:19302"
# Optional fallback STUN servers list.
middle_proxy_nat_stun_servers = ["stun1.l.google.com:19302", "stun2.l.google.com:19302"]
# Desired number of concurrent ME writers in pool.
middle_proxy_pool_size = 16
# Pre-initialized warm-standby ME connections kept idle.
middle_proxy_warm_standby = 8
# Ignore STUN/interface mismatch and keep ME enabled even if IP differs.
stun_iface_mismatch_ignore = false
# Keepalive padding frames - fl==4
me_keepalive_enabled = true
me_keepalive_interval_secs = 25 # Period between keepalives
me_keepalive_jitter_secs = 5 # Jitter added to interval
me_keepalive_payload_random = true # Randomize 4-byte payload (vs zeros)
# Stagger extra ME connections on warmup to de-phase lifecycles.
me_warmup_stagger_enabled = true
me_warmup_step_delay_ms = 500 # Base delay between extra connects
me_warmup_step_jitter_ms = 300 # Jitter for warmup delay
# Reconnect policy knobs.
me_reconnect_max_concurrent_per_dc = 1 # Parallel reconnects per DC - EXPERIMENTAL! UNSTABLE!
me_reconnect_backoff_base_ms = 500 # Backoff start
me_reconnect_backoff_cap_ms = 30000 # Backoff cap
me_reconnect_fast_retry_count = 11 # Quick retries before backoff
[general.modes]
classic = false
secure = false
tls = true
[general.links]
show = "*"
# show = ["alice", "bob"] # Only show links for alice and bob
# show = "*" # Show links for all users
# public_host = "proxy.example.com" # Host (IP or domain) for tg:// links
# public_port = 443 # Port for tg:// links (default: server.port)
# === Network Parameters ===
[network]
# Enable/disable families: true/false/auto(None)
ipv4 = true
ipv6 = false # UNSTABLE WITH ME
# prefer = 4 or 6
prefer = 4
multipath = false # EXPERIMENTAL!
# === Server Binding ===
[server]
port = 443
listen_addr_ipv4 = "0.0.0.0"
listen_addr_ipv6 = "::"
# listen_unix_sock = "/var/run/telemt.sock" # Unix socket
# listen_unix_sock_perm = "0666" # Socket file permissions
# metrics_port = 9090
# metrics_whitelist = [
# "192.168.0.0/24",
# "172.16.0.0/12",
# "127.0.0.1/32",
# "::1/128"
#]
# Listen on multiple interfaces/IPs - IPv4
[[server.listeners]]
ip = "0.0.0.0"
# Listen on multiple interfaces/IPs - IPv6
[[server.listeners]]
ip = "::"
# === Timeouts (in seconds) ===
[timeouts]
client_handshake = 30
tg_connect = 10
client_keepalive = 60
client_ack = 300
# Quick ME reconnects for single-address DCs (count and per-attempt timeout, ms).
me_one_retry = 12
me_one_timeout_ms = 1200
# === Anti-Censorship & Masking ===
[censorship]
tls_domain = "petrovich.ru"
mask = true
mask_port = 443
# mask_host = "petrovich.ru" # Defaults to tls_domain if not set
# mask_unix_sock = "/var/run/nginx.sock" # Unix socket (mutually exclusive with mask_host)
fake_cert_len = 2048
# === Access Control & Users ===
[access]
replay_check_len = 65536
replay_window_secs = 1800
ignore_time_skew = false
[access.users]
# format: "username" = "32_hex_chars_secret"
hello = "00000000000000000000000000000000"
# [access.user_max_tcp_conns]
# hello = 50
# [access.user_max_unique_ips]
# hello = 5
# [access.user_data_quota]
# hello = 1073741824 # 1 GB
# === Upstreams & Routing ===
[[upstreams]]
type = "direct"
enabled = true
weight = 10
# [[upstreams]]
# type = "socks5"
# address = "127.0.0.1:1080"
# enabled = false
# weight = 1
# === DC Address Overrides ===
# [dc_overrides]
# "203" = "91.105.192.100:443"
```
### Advanced
#### Adtag

View File

@@ -124,6 +124,10 @@ hello = "00000000000000000000000000000000"
# [access.user_data_quota]
# hello = 1073741824 # 1 GB
# [access.user_expirations]
# format: username = "[year]-[month]-[day]T[hour]:[minute]:[second]Z" UTC
# hello = "2027-01-01T00:00:00Z"
# === Upstreams & Routing ===
[[upstreams]]
type = "direct"

View File

@@ -213,6 +213,7 @@ listen_addr_ipv6 = "::"
[[server.listeners]]
ip = "0.0.0.0"
# reuse_allow = false # Set true only when intentionally running multiple telemt instances on same port
[[server.listeners]]
ip = "::"

View File

@@ -110,6 +110,75 @@ pub(crate) fn default_reconnect_backoff_cap_ms() -> u64 {
30_000
}
pub(crate) fn default_crypto_pending_buffer() -> usize {
256 * 1024
}
pub(crate) fn default_max_client_frame() -> usize {
16 * 1024 * 1024
}
pub(crate) fn default_tls_new_session_tickets() -> u8 {
0
}
pub(crate) fn default_server_hello_delay_min_ms() -> u64 {
0
}
pub(crate) fn default_server_hello_delay_max_ms() -> u64 {
0
}
pub(crate) fn default_alpn_enforce() -> bool {
true
}
pub(crate) fn default_stun_servers() -> Vec<String> {
vec![
"stun.l.google.com:19302".to_string(),
"stun1.l.google.com:19302".to_string(),
"stun2.l.google.com:19302".to_string(),
"stun.stunprotocol.org:3478".to_string(),
"stun.voip.eutelia.it:3478".to_string(),
]
}
pub(crate) fn default_http_ip_detect_urls() -> Vec<String> {
vec![
"https://ifconfig.me/ip".to_string(),
"https://api.ipify.org".to_string(),
]
}
pub(crate) fn default_cache_public_ip_path() -> String {
"cache/public_ip.txt".to_string()
}
pub(crate) fn default_proxy_secret_reload_secs() -> u64 {
12 * 60 * 60
}
pub(crate) fn default_proxy_config_reload_secs() -> u64 {
12 * 60 * 60
}
pub(crate) fn default_ntp_check() -> bool {
true
}
pub(crate) fn default_ntp_servers() -> Vec<String> {
vec!["pool.ntp.org".to_string()]
}
pub(crate) fn default_fast_mode_min_tls_record() -> usize {
0
}
pub(crate) fn default_degradation_min_unavailable_dc_groups() -> u8 {
2
}
// Custom deserializer helpers
#[derive(Deserialize)]

View File

@@ -316,118 +316,106 @@ pub fn spawn_config_watcher(
let (config_tx, config_rx) = watch::channel(initial);
let (log_tx, log_rx) = watch::channel(initial_level);
// Bridge: sync notify callback → async task via mpsc.
// Bridge: sync notify callbacks → async task via mpsc.
let (notify_tx, mut notify_rx) = mpsc::channel::<()>(4);
// Canonicalize the config path so it matches what notify returns in events
// (notify always gives absolute paths, but config_path may be relative).
// Canonicalize so path matches what notify returns (absolute) in events.
let config_path = match config_path.canonicalize() {
Ok(p) => p,
Err(_) => config_path.to_path_buf(), // file doesn't exist yet, use as-is
Err(_) => config_path.to_path_buf(),
};
// Watch the parent directory rather than the file itself, because many
// editors (vim, nano, systemd-sysusers) write via rename, which would
// cause inotify to lose track of the original inode.
// editors (vim, nano) and systemd write via rename, which would cause
// inotify to lose track of the original inode.
let watch_dir = config_path
.parent()
.unwrap_or_else(|| std::path::Path::new("."))
.to_path_buf();
// ── inotify watcher (instant on local fs) ────────────────────────────
let config_file = config_path.clone();
let tx_clone = notify_tx.clone();
let watcher_result = recommended_watcher(move |res: notify::Result<notify::Event>| {
let tx_inotify = notify_tx.clone();
let inotify_ok = match recommended_watcher(move |res: notify::Result<notify::Event>| {
let Ok(event) = res else { return };
let is_our_file = event.paths.iter().any(|p| p == &config_file);
if !is_our_file {
return;
if !is_our_file { return; }
if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)) {
let _ = tx_inotify.try_send(());
}
let relevant = matches!(
event.kind,
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
);
if relevant {
let _ = tx_clone.try_send(());
}) {
Ok(mut w) => match w.watch(&watch_dir, RecursiveMode::NonRecursive) {
Ok(()) => {
info!("config watcher: inotify active on {:?}", config_path);
Box::leak(Box::new(w));
true
}
Err(e) => { warn!("config watcher: inotify watch failed: {}", e); false }
},
Err(e) => { warn!("config watcher: inotify unavailable: {}", e); false }
};
// ── poll watcher (always active, fixes Docker bind mounts / NFS) ─────
// inotify does not receive events for files mounted from the host into
// a container. PollWatcher compares file contents every 3 s and fires
// on any change regardless of the underlying fs.
let config_file2 = config_path.clone();
let tx_poll = notify_tx.clone();
match notify::poll::PollWatcher::new(
move |res: notify::Result<notify::Event>| {
let Ok(event) = res else { return };
let is_our_file = event.paths.iter().any(|p| p == &config_file2);
if !is_our_file { return; }
if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)) {
let _ = tx_poll.try_send(());
}
},
notify::Config::default()
.with_poll_interval(std::time::Duration::from_secs(3))
.with_compare_contents(true),
) {
Ok(mut w) => match w.watch(&config_path, RecursiveMode::NonRecursive) {
Ok(()) => {
if inotify_ok {
info!("config watcher: poll watcher also active (Docker/NFS safe)");
} else {
info!("config watcher: poll watcher active on {:?} (3s interval)", config_path);
}
Box::leak(Box::new(w));
}
Err(e) => warn!("config watcher: poll watch failed: {}", e),
},
Err(e) => warn!("config watcher: poll watcher unavailable: {}", e),
}
// ── event loop ───────────────────────────────────────────────────────
tokio::spawn(async move {
#[cfg(unix)]
let mut sighup = {
use tokio::signal::unix::{SignalKind, signal};
signal(SignalKind::hangup()).expect("Failed to register SIGHUP handler")
};
loop {
#[cfg(unix)]
tokio::select! {
msg = notify_rx.recv() => {
if msg.is_none() { break; }
}
_ = sighup.recv() => {
info!("SIGHUP received — reloading {:?}", config_path);
}
}
#[cfg(not(unix))]
if notify_rx.recv().await.is_none() { break; }
// Debounce: drain extra events that arrive within 50 ms.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
while notify_rx.try_recv().is_ok() {}
reload_config(&config_path, &config_tx, &log_tx, detected_ip_v4, detected_ip_v6);
}
});
match watcher_result {
Ok(mut watcher) => {
match watcher.watch(&watch_dir, RecursiveMode::NonRecursive) {
Ok(()) => info!("config watcher: watching {:?} via inotify", config_path),
Err(e) => warn!(
"config watcher: failed to watch {:?}: {}; use SIGHUP to reload",
watch_dir, e
),
}
tokio::spawn(async move {
let _watcher = watcher; // keep alive
#[cfg(unix)]
let mut sighup = {
use tokio::signal::unix::{SignalKind, signal};
signal(SignalKind::hangup()).expect("Failed to register SIGHUP handler")
};
loop {
#[cfg(unix)]
tokio::select! {
msg = notify_rx.recv() => {
if msg.is_none() { break; }
}
_ = sighup.recv() => {
info!("SIGHUP received — reloading {:?}", config_path);
}
}
#[cfg(not(unix))]
if notify_rx.recv().await.is_none() { break; }
// Debounce: drain extra events fired within 50ms.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
while notify_rx.try_recv().is_ok() {}
reload_config(
&config_path,
&config_tx,
&log_tx,
detected_ip_v4,
detected_ip_v6,
);
}
});
}
Err(e) => {
warn!(
"config watcher: inotify unavailable ({}); only SIGHUP will trigger reload",
e
);
// Fall back to SIGHUP-only.
tokio::spawn(async move {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let mut sighup = signal(SignalKind::hangup())
.expect("Failed to register SIGHUP handler");
loop {
sighup.recv().await;
info!("SIGHUP received — reloading {:?}", config_path);
reload_config(
&config_path,
&config_tx,
&log_tx,
detected_ip_v4,
detected_ip_v6,
);
}
}
#[cfg(not(unix))]
let _ = (config_tx, log_tx, config_path);
});
}
}
(config_rx, log_rx)
}

View File

@@ -227,6 +227,7 @@ impl ProxyConfig {
announce: None,
announce_ip: None,
proxy_protocol: None,
reuse_allow: false,
});
}
if let Some(ipv6_str) = &config.server.listen_addr_ipv6 {
@@ -236,6 +237,7 @@ impl ProxyConfig {
announce: None,
announce_ip: None,
proxy_protocol: None,
reuse_allow: false,
});
}
}

View File

@@ -74,8 +74,8 @@ pub struct ProxyModes {
impl Default for ProxyModes {
fn default() -> Self {
Self {
classic: true,
secure: true,
classic: false,
secure: false,
tls: true,
}
}
@@ -96,15 +96,35 @@ pub struct NetworkConfig {
#[serde(default)]
pub multipath: bool,
/// STUN servers list for public IP discovery.
#[serde(default = "default_stun_servers")]
pub stun_servers: Vec<String>,
/// Enable TCP STUN fallback when UDP is blocked.
#[serde(default)]
pub stun_tcp_fallback: bool,
/// HTTP-based public IP detection endpoints (fallback after STUN).
#[serde(default = "default_http_ip_detect_urls")]
pub http_ip_detect_urls: Vec<String>,
/// Cache file path for detected public IP.
#[serde(default = "default_cache_public_ip_path")]
pub cache_public_ip_path: String,
}
impl Default for NetworkConfig {
fn default() -> Self {
Self {
ipv4: true,
ipv6: None,
ipv6: Some(false),
prefer: 4,
multipath: false,
stun_servers: default_stun_servers(),
stun_tcp_fallback: true,
http_ip_detect_urls: default_http_ip_detect_urls(),
cache_public_ip_path: default_cache_public_ip_path(),
}
}
}
@@ -172,6 +192,15 @@ pub struct GeneralConfig {
#[serde(default = "default_true")]
pub me_keepalive_payload_random: bool,
/// Max pending ciphertext buffer per client writer (bytes).
/// Controls FakeTLS backpressure vs throughput.
#[serde(default = "default_crypto_pending_buffer")]
pub crypto_pending_buffer: usize,
/// Maximum allowed client MTProto frame size (bytes).
#[serde(default = "default_max_client_frame")]
pub max_client_frame: usize,
/// Enable staggered warmup of extra ME writers.
#[serde(default = "default_true")]
pub me_warmup_stagger_enabled: bool,
@@ -218,6 +247,34 @@ pub struct GeneralConfig {
/// [general.links] — proxy link generation overrides.
#[serde(default)]
pub links: LinksConfig,
/// Minimum TLS record size when fast_mode coalescing is enabled (0 = disabled).
#[serde(default = "default_fast_mode_min_tls_record")]
pub fast_mode_min_tls_record: usize,
/// Automatically reload proxy-secret every N seconds.
#[serde(default = "default_proxy_secret_reload_secs")]
pub proxy_secret_auto_reload_secs: u64,
/// Automatically reload proxy-multi.conf every N seconds.
#[serde(default = "default_proxy_config_reload_secs")]
pub proxy_config_auto_reload_secs: u64,
/// Enable NTP drift check at startup.
#[serde(default = "default_ntp_check")]
pub ntp_check: bool,
/// NTP servers for drift check.
#[serde(default = "default_ntp_servers")]
pub ntp_servers: Vec<String>,
/// Enable auto-degradation from ME to Direct-DC.
#[serde(default = "default_true")]
pub auto_degradation_enabled: bool,
/// Minimum unavailable ME DC groups before degrading.
#[serde(default = "default_degradation_min_unavailable_dc_groups")]
pub degradation_min_unavailable_dc_groups: u8,
}
impl Default for GeneralConfig {
@@ -234,7 +291,7 @@ impl Default for GeneralConfig {
middle_proxy_nat_stun: None,
middle_proxy_nat_stun_servers: Vec::new(),
middle_proxy_pool_size: default_pool_size(),
middle_proxy_warm_standby: 0,
middle_proxy_warm_standby: 8,
me_keepalive_enabled: true,
me_keepalive_interval_secs: default_keepalive_interval(),
me_keepalive_jitter_secs: default_keepalive_jitter(),
@@ -242,15 +299,24 @@ impl Default for GeneralConfig {
me_warmup_stagger_enabled: true,
me_warmup_step_delay_ms: default_warmup_step_delay_ms(),
me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(),
me_reconnect_max_concurrent_per_dc: 1,
me_reconnect_max_concurrent_per_dc: 4,
me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(),
me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(),
me_reconnect_fast_retry_count: 1,
me_reconnect_fast_retry_count: 8,
stun_iface_mismatch_ignore: false,
unknown_dc_log_path: default_unknown_dc_log_path(),
log_level: LogLevel::Normal,
disable_colors: false,
links: LinksConfig::default(),
crypto_pending_buffer: default_crypto_pending_buffer(),
max_client_frame: default_max_client_frame(),
fast_mode_min_tls_record: default_fast_mode_min_tls_record(),
proxy_secret_auto_reload_secs: default_proxy_secret_reload_secs(),
proxy_config_auto_reload_secs: default_proxy_config_reload_secs(),
ntp_check: default_ntp_check(),
ntp_servers: default_ntp_servers(),
auto_degradation_enabled: true,
degradation_min_unavailable_dc_groups: default_degradation_min_unavailable_dc_groups(),
}
}
}
@@ -395,6 +461,22 @@ pub struct AntiCensorshipConfig {
/// Directory to store TLS front cache (on disk).
#[serde(default = "default_tls_front_dir")]
pub tls_front_dir: String,
/// Minimum server_hello delay in milliseconds (anti-fingerprint).
#[serde(default = "default_server_hello_delay_min_ms")]
pub server_hello_delay_min_ms: u64,
/// Maximum server_hello delay in milliseconds.
#[serde(default = "default_server_hello_delay_max_ms")]
pub server_hello_delay_max_ms: u64,
/// Number of NewSessionTicket messages to emit post-handshake.
#[serde(default = "default_tls_new_session_tickets")]
pub tls_new_session_tickets: u8,
/// Enforce ALPN echo of client preference.
#[serde(default = "default_alpn_enforce")]
pub alpn_enforce: bool,
}
impl Default for AntiCensorshipConfig {
@@ -409,6 +491,10 @@ impl Default for AntiCensorshipConfig {
fake_cert_len: default_fake_cert_len(),
tls_emulation: false,
tls_front_dir: default_tls_front_dir(),
server_hello_delay_min_ms: default_server_hello_delay_min_ms(),
server_hello_delay_max_ms: default_server_hello_delay_max_ms(),
tls_new_session_tickets: default_tls_new_session_tickets(),
alpn_enforce: default_alpn_enforce(),
}
}
}
@@ -517,6 +603,10 @@ pub struct ListenerConfig {
/// Per-listener PROXY protocol override. When set, overrides global server.proxy_protocol.
#[serde(default)]
pub proxy_protocol: Option<bool>,
/// Allow multiple telemt instances to listen on the same IP:port (SO_REUSEPORT).
/// Default is false for safety.
#[serde(default)]
pub reuse_allow: bool,
}
// ============= ShowLink =============

View File

@@ -55,6 +55,11 @@ pub fn crc32(data: &[u8]) -> u32 {
crc32fast::hash(data)
}
/// CRC32C (Castagnoli)
pub fn crc32c(data: &[u8]) -> u32 {
crc32c::crc32c(data)
}
/// Build the exact prekey buffer used by Telegram Middle Proxy KDF.
///
/// Returned buffer layout (IPv4):

View File

@@ -5,5 +5,8 @@ pub mod hash;
pub mod random;
pub use aes::{AesCtr, AesCbc};
pub use hash::{sha256, sha256_hmac, sha1, md5, crc32, derive_middleproxy_keys, build_middleproxy_prekey};
pub use hash::{
build_middleproxy_prekey, crc32, crc32c, derive_middleproxy_keys, md5, sha1, sha256,
sha256_hmac,
};
pub use random::SecureRandom;

View File

@@ -49,19 +49,32 @@ impl SecureRandom {
}
}
/// Generate random bytes
pub fn bytes(&self, len: usize) -> Vec<u8> {
/// Fill a caller-provided buffer with random bytes.
pub fn fill(&self, out: &mut [u8]) {
let mut inner = self.inner.lock();
const CHUNK_SIZE: usize = 512;
while inner.buffer.len() < len {
let mut chunk = vec![0u8; CHUNK_SIZE];
inner.rng.fill_bytes(&mut chunk);
inner.cipher.apply(&mut chunk);
inner.buffer.extend_from_slice(&chunk);
let mut written = 0usize;
while written < out.len() {
if inner.buffer.is_empty() {
let mut chunk = vec![0u8; CHUNK_SIZE];
inner.rng.fill_bytes(&mut chunk);
inner.cipher.apply(&mut chunk);
inner.buffer.extend_from_slice(&chunk);
}
let take = (out.len() - written).min(inner.buffer.len());
out[written..written + take].copy_from_slice(&inner.buffer[..take]);
inner.buffer.drain(..take);
written += take;
}
inner.buffer.drain(..len).collect()
}
/// Generate random bytes
pub fn bytes(&self, len: usize) -> Vec<u8> {
let mut out = vec![0u8; len];
self.fill(&mut out);
out
}
/// Generate random number in range [0, max)

View File

@@ -38,7 +38,7 @@ use crate::stream::BufferPool;
use crate::transport::middle_proxy::{
MePool, fetch_proxy_config, run_me_ping, MePingFamily, MePingSample, format_sample_line,
};
use crate::transport::{ListenOptions, UpstreamManager, create_listener};
use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes};
use crate::tls_front::TlsFrontCache;
fn parse_cli() -> (String, bool, Option<String>) {
@@ -213,6 +213,9 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
"Modes: classic={} secure={} tls={}",
config.general.modes.classic, config.general.modes.secure, config.general.modes.tls
);
if config.general.modes.classic {
warn!("Classic mode is vulnerable to DPI detection; enable only for legacy clients");
}
info!("TLS domain: {}", config.censorship.tls_domain);
if let Some(ref sock) = config.censorship.mask_unix_sock {
info!("Mask: {} -> unix:{}", config.censorship.mask, sock);
@@ -262,7 +265,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
}
// Connection concurrency limit
let _max_connections = Arc::new(Semaphore::new(10_000));
let max_connections = Arc::new(Semaphore::new(10_000));
if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me {
warn!("No usable IP family for Middle Proxy detected; falling back to direct DC");
@@ -458,10 +461,12 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
cache.load_from_disk().await;
let port = config.censorship.mask_port;
let mask_host = config.censorship.mask_host.clone()
.unwrap_or_else(|| config.censorship.tls_domain.clone());
// Initial synchronous fetch to warm cache before serving clients.
for domain in tls_domains.clone() {
match crate::tls_front::fetcher::fetch_real_tls(
&domain,
&mask_host,
port,
&domain,
Duration::from_secs(5),
@@ -485,7 +490,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
tokio::time::sleep(Duration::from_secs(base_secs + jitter_secs)).await;
for domain in &domains {
match crate::tls_front::fetcher::fetch_real_tls(
domain,
&mask_host,
port,
domain,
Duration::from_secs(5),
@@ -710,6 +715,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
continue;
}
let options = ListenOptions {
reuse_port: listener_conf.reuse_allow,
ipv6_only: listener_conf.ip.is_ipv6(),
..Default::default()
};
@@ -748,7 +754,33 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
listeners.push((listener, listener_proxy_protocol));
}
Err(e) => {
error!("Failed to bind to {}: {}", addr, e);
if e.kind() == std::io::ErrorKind::AddrInUse {
let owners = find_listener_processes(addr);
if owners.is_empty() {
error!(
%addr,
"Failed to bind: address already in use (owner process unresolved)"
);
} else {
for owner in owners {
error!(
%addr,
pid = owner.pid,
process = %owner.process,
"Failed to bind: address already in use"
);
}
}
if !listener_conf.reuse_allow {
error!(
%addr,
"reuse_allow=false; set [[server.listeners]].reuse_allow=true to allow multi-instance listening"
);
}
} else {
error!("Failed to bind to {}: {}", addr, e);
}
}
}
}
@@ -812,6 +844,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
let max_connections_unix = max_connections.clone();
tokio::spawn(async move {
let unix_conn_counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1));
@@ -819,6 +852,13 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
loop {
match unix_listener.accept().await {
Ok((stream, _)) => {
let permit = match max_connections_unix.clone().acquire_owned().await {
Ok(permit) => permit,
Err(_) => {
error!("Connection limiter is closed");
break;
}
};
let conn_id = unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let fake_peer = SocketAddr::from(([127, 0, 0, 1], (conn_id % 65535) as u16));
@@ -834,6 +874,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let proxy_protocol_enabled = config.server.proxy_protocol;
tokio::spawn(async move {
let _permit = permit;
if let Err(e) = crate::proxy::client::handle_client_stream(
stream, fake_peer, config, stats,
upstream_manager, replay_checker, buffer_pool, rng,
@@ -901,11 +942,19 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let me_pool = me_pool.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
let max_connections_tcp = max_connections.clone();
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, peer_addr)) => {
let permit = match max_connections_tcp.clone().acquire_owned().await {
Ok(permit) => permit,
Err(_) => {
error!("Connection limiter is closed");
break;
}
};
let config = config_rx.borrow_and_update().clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
@@ -918,6 +967,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
let proxy_protocol_enabled = listener_proxy_protocol;
tokio::spawn(async move {
let _permit = permit;
if let Err(e) = ClientHandler::new(
stream,
peer_addr,
@@ -935,7 +985,40 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
.run()
.await
{
warn!(peer = %peer_addr, error = %e, "Connection closed with error");
let peer_closed = matches!(
&e,
crate::error::ProxyError::Io(ioe)
if matches!(
ioe.kind(),
std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::NotConnected
)
) || matches!(
&e,
crate::error::ProxyError::Stream(
crate::error::StreamError::Io(ioe)
)
if matches!(
ioe.kind(),
std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::NotConnected
)
);
let me_closed = matches!(
&e,
crate::error::ProxyError::Proxy(msg) if msg == "ME connection lost"
);
match (peer_closed, me_closed) {
(true, _) => debug!(peer = %peer_addr, error = %e, "Connection closed by client"),
(_, true) => warn!(peer = %peer_addr, error = %e, "Connection closed: Middle-End dropped session"),
_ => warn!(peer = %peer_addr, error = %e, "Connection closed with error"),
}
}
});
}

View File

@@ -100,6 +100,14 @@ fn render_metrics(stats: &Stats) -> String {
let _ = writeln!(out, "# TYPE telemt_me_keepalive_failed_total counter");
let _ = writeln!(out, "telemt_me_keepalive_failed_total {}", stats.get_me_keepalive_failed());
let _ = writeln!(out, "# HELP telemt_me_keepalive_pong_total ME keepalive pong replies");
let _ = writeln!(out, "# TYPE telemt_me_keepalive_pong_total counter");
let _ = writeln!(out, "telemt_me_keepalive_pong_total {}", stats.get_me_keepalive_pong());
let _ = writeln!(out, "# HELP telemt_me_keepalive_timeout_total ME keepalive ping timeouts");
let _ = writeln!(out, "# TYPE telemt_me_keepalive_timeout_total counter");
let _ = writeln!(out, "telemt_me_keepalive_timeout_total {}", stats.get_me_keepalive_timeout());
let _ = writeln!(out, "# HELP telemt_me_reconnect_attempts_total ME reconnect attempts");
let _ = writeln!(out, "# TYPE telemt_me_reconnect_attempts_total counter");
let _ = writeln!(out, "telemt_me_reconnect_attempts_total {}", stats.get_me_reconnect_attempts());
@@ -108,6 +116,30 @@ fn render_metrics(stats: &Stats) -> String {
let _ = writeln!(out, "# TYPE telemt_me_reconnect_success_total counter");
let _ = writeln!(out, "telemt_me_reconnect_success_total {}", stats.get_me_reconnect_success());
let _ = writeln!(out, "# HELP telemt_me_crc_mismatch_total ME CRC mismatches");
let _ = writeln!(out, "# TYPE telemt_me_crc_mismatch_total counter");
let _ = writeln!(out, "telemt_me_crc_mismatch_total {}", stats.get_me_crc_mismatch());
let _ = writeln!(out, "# HELP telemt_me_seq_mismatch_total ME sequence mismatches");
let _ = writeln!(out, "# TYPE telemt_me_seq_mismatch_total counter");
let _ = writeln!(out, "telemt_me_seq_mismatch_total {}", stats.get_me_seq_mismatch());
let _ = writeln!(out, "# HELP telemt_me_route_drop_no_conn_total ME route drops: no conn");
let _ = writeln!(out, "# TYPE telemt_me_route_drop_no_conn_total counter");
let _ = writeln!(out, "telemt_me_route_drop_no_conn_total {}", stats.get_me_route_drop_no_conn());
let _ = writeln!(out, "# HELP telemt_me_route_drop_channel_closed_total ME route drops: channel closed");
let _ = writeln!(out, "# TYPE telemt_me_route_drop_channel_closed_total counter");
let _ = writeln!(out, "telemt_me_route_drop_channel_closed_total {}", stats.get_me_route_drop_channel_closed());
let _ = writeln!(out, "# HELP telemt_me_route_drop_queue_full_total ME route drops: queue full");
let _ = writeln!(out, "# TYPE telemt_me_route_drop_queue_full_total counter");
let _ = writeln!(out, "telemt_me_route_drop_queue_full_total {}", stats.get_me_route_drop_queue_full());
let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths");
let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter");
let _ = writeln!(out, "telemt_secure_padding_invalid_total {}", stats.get_secure_padding_invalid());
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");

View File

@@ -1,6 +1,8 @@
//! Protocol constants and datacenter addresses
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use crate::crypto::SecureRandom;
use std::sync::LazyLock;
// ============= Telegram Datacenters =============
@@ -151,7 +153,32 @@ pub const TLS_RECORD_ALERT: u8 = 0x15;
/// Maximum TLS record size
pub const MAX_TLS_RECORD_SIZE: usize = 16384;
/// Maximum TLS chunk size (with overhead)
pub const MAX_TLS_CHUNK_SIZE: usize = 16384 + 24;
/// RFC 8446 §5.2 allows up to 16384 + 256 bytes of ciphertext
pub const MAX_TLS_CHUNK_SIZE: usize = 16384 + 256;
/// Secure Intermediate payload is expected to be 4-byte aligned.
pub fn is_valid_secure_payload_len(data_len: usize) -> bool {
data_len % 4 == 0
}
/// Compute Secure Intermediate payload length from wire length.
/// Secure mode strips up to 3 random tail bytes by truncating to 4-byte boundary.
pub fn secure_payload_len_from_wire_len(wire_len: usize) -> Option<usize> {
if wire_len < 4 {
return None;
}
Some(wire_len - (wire_len % 4))
}
/// Generate padding length for Secure Intermediate protocol.
/// Data must be 4-byte aligned; padding is 1..=3 so total is never divisible by 4.
pub fn secure_padding_len(data_len: usize, rng: &SecureRandom) -> usize {
debug_assert!(
is_valid_secure_payload_len(data_len),
"Secure payload must be 4-byte aligned, got {data_len}"
);
(rng.range(3) + 1) as usize
}
// ============= Timeouts =============
@@ -284,6 +311,10 @@ pub mod rpc_flags {
pub const FLAG_ABRIDGED: u32 = 0x40000000;
pub const FLAG_QUICKACK: u32 = 0x80000000;
}
pub mod rpc_crypto_flags {
pub const USE_CRC32C: u32 = 0x800;
}
pub const ME_CONNECT_TIMEOUT_SECS: u64 = 5;
pub const ME_HANDSHAKE_TIMEOUT_SECS: u64 = 10;
@@ -319,4 +350,43 @@ mod tests {
assert_eq!(TG_DATACENTERS_V4.len(), 5);
assert_eq!(TG_DATACENTERS_V6.len(), 5);
}
}
#[test]
fn secure_padding_never_produces_aligned_total() {
let rng = SecureRandom::new();
for data_len in (0..1000).step_by(4) {
for _ in 0..100 {
let padding = secure_padding_len(data_len, &rng);
assert!(
padding <= 3,
"padding out of range: data_len={data_len}, padding={padding}"
);
assert_ne!(
(data_len + padding) % 4,
0,
"invariant violated: data_len={data_len}, padding={padding}, total={}",
data_len + padding
);
}
}
}
#[test]
fn secure_wire_len_roundtrip_for_aligned_payload() {
for payload_len in (4..4096).step_by(4) {
for padding in 0..=3usize {
let wire_len = payload_len + padding;
let recovered = secure_payload_len_from_wire_len(wire_len);
assert_eq!(recovered, Some(payload_len));
}
}
}
#[test]
fn secure_wire_len_rejects_too_short_frames() {
assert_eq!(secure_payload_len_from_wire_len(0), None);
assert_eq!(secure_payload_len_from_wire_len(1), None);
assert_eq!(secure_payload_len_from_wire_len(2), None);
assert_eq!(secure_payload_len_from_wire_len(3), None);
}
}

View File

@@ -32,6 +32,7 @@ pub const TIME_SKEW_MAX: i64 = 10 * 60; // 10 minutes after
mod extension_type {
pub const KEY_SHARE: u16 = 0x0033;
pub const SUPPORTED_VERSIONS: u16 = 0x002b;
pub const ALPN: u16 = 0x0010;
}
/// TLS Cipher Suites
@@ -62,6 +63,7 @@ pub struct TlsValidation {
// ============= TLS Extension Builder =============
/// Builder for TLS extensions with correct length calculation
#[derive(Clone)]
struct TlsExtensionBuilder {
extensions: Vec<u8>,
}
@@ -108,6 +110,27 @@ impl TlsExtensionBuilder {
self
}
/// Add ALPN extension with a single selected protocol.
fn add_alpn(&mut self, proto: &[u8]) -> &mut Self {
// Extension type: ALPN (0x0010)
self.extensions.extend_from_slice(&extension_type::ALPN.to_be_bytes());
// ALPN extension format:
// extension_data length (2 bytes)
// protocols length (2 bytes)
// protocol name length (1 byte)
// protocol name bytes
let proto_len = proto.len() as u8;
let list_len: u16 = 1 + proto_len as u16;
let ext_len: u16 = 2 + list_len;
self.extensions.extend_from_slice(&ext_len.to_be_bytes());
self.extensions.extend_from_slice(&list_len.to_be_bytes());
self.extensions.push(proto_len);
self.extensions.extend_from_slice(proto);
self
}
/// Build final extensions with length prefix
fn build(self) -> Vec<u8> {
@@ -144,6 +167,8 @@ struct ServerHelloBuilder {
compression: u8,
/// Extensions
extensions: TlsExtensionBuilder,
/// Selected ALPN protocol (if any)
alpn: Option<Vec<u8>>,
}
impl ServerHelloBuilder {
@@ -154,6 +179,7 @@ impl ServerHelloBuilder {
cipher_suite: cipher_suite::TLS_AES_128_GCM_SHA256,
compression: 0x00,
extensions: TlsExtensionBuilder::new(),
alpn: None,
}
}
@@ -167,10 +193,19 @@ impl ServerHelloBuilder {
self.extensions.add_supported_versions(0x0304);
self
}
fn with_alpn(mut self, proto: Option<Vec<u8>>) -> Self {
self.alpn = proto;
self
}
/// Build ServerHello message (without record header)
fn build_message(&self) -> Vec<u8> {
let extensions = self.extensions.extensions.clone();
let mut ext_builder = self.extensions.clone();
if let Some(ref alpn) = self.alpn {
ext_builder.add_alpn(alpn);
}
let extensions = ext_builder.extensions.clone();
let extensions_len = extensions.len() as u16;
// Calculate total length
@@ -350,6 +385,8 @@ pub fn build_server_hello(
session_id: &[u8],
fake_cert_len: usize,
rng: &SecureRandom,
alpn: Option<Vec<u8>>,
new_session_tickets: u8,
) -> Vec<u8> {
const MIN_APP_DATA: usize = 64;
const MAX_APP_DATA: usize = 16640; // RFC 8446 §5.2 upper bound
@@ -360,6 +397,7 @@ pub fn build_server_hello(
let server_hello = ServerHelloBuilder::new(session_id.to_vec())
.with_x25519_key(&x25519_key)
.with_tls13_version()
.with_alpn(alpn)
.build_record();
// Build Change Cipher Spec record
@@ -376,21 +414,35 @@ pub fn build_server_hello(
app_data_record.push(TLS_RECORD_APPLICATION);
app_data_record.extend_from_slice(&TLS_VERSION);
app_data_record.extend_from_slice(&(fake_cert_len as u16).to_be_bytes());
if fake_cert_len > 17 {
app_data_record.extend_from_slice(&fake_cert[..fake_cert_len - 17]);
app_data_record.push(0x16); // inner content type marker
app_data_record.extend_from_slice(&rng.bytes(16)); // AEAD-like tag mimic
} else {
app_data_record.extend_from_slice(&fake_cert);
}
// Fill ApplicationData with fully random bytes of desired length to avoid
// deterministic DPI fingerprints (fixed inner content type markers).
app_data_record.extend_from_slice(&fake_cert);
// Build optional NewSessionTicket records (TLS 1.3 handshake messages are encrypted;
// here we mimic with opaque ApplicationData records of plausible size).
let mut tickets = Vec::new();
if new_session_tickets > 0 {
for _ in 0..new_session_tickets {
let ticket_len: usize = rng.range(48) + 48; // 48-95 bytes
let mut record = Vec::with_capacity(5 + ticket_len);
record.push(TLS_RECORD_APPLICATION);
record.extend_from_slice(&TLS_VERSION);
record.extend_from_slice(&(ticket_len as u16).to_be_bytes());
record.extend_from_slice(&rng.bytes(ticket_len));
tickets.push(record);
}
}
// Combine all records
let mut response = Vec::with_capacity(
server_hello.len() + change_cipher_spec.len() + app_data_record.len()
server_hello.len() + change_cipher_spec.len() + app_data_record.len() + tickets.iter().map(|r| r.len()).sum::<usize>()
);
response.extend_from_slice(&server_hello);
response.extend_from_slice(&change_cipher_spec);
response.extend_from_slice(&app_data_record);
for t in &tickets {
response.extend_from_slice(t);
}
// Compute HMAC for the response
let mut hmac_input = Vec::with_capacity(TLS_DIGEST_LEN + response.len());
@@ -484,85 +536,53 @@ pub fn extract_sni_from_client_hello(handshake: &[u8]) -> Option<String> {
None
}
/// Extract ALPN protocol list from TLS ClientHello.
pub fn extract_alpn_from_client_hello(handshake: &[u8]) -> Option<Vec<String>> {
if handshake.len() < 43 || handshake[0] != TLS_RECORD_HANDSHAKE {
return None;
}
/// Extract ALPN protocol list from ClientHello, return in offered order.
pub fn extract_alpn_from_client_hello(handshake: &[u8]) -> Vec<Vec<u8>> {
let mut pos = 5; // after record header
if handshake.get(pos).copied()? != 0x01 {
return None; // not ClientHello
if handshake.get(pos) != Some(&0x01) {
return Vec::new();
}
// Handshake length bytes
pos += 4; // type + len (3)
// version (2) + random (32)
pos += 2 + 32;
if pos + 1 > handshake.len() {
return None;
}
let session_id_len = *handshake.get(pos)? as usize;
pos += 4; // type + len
pos += 2 + 32; // version + random
if pos >= handshake.len() { return Vec::new(); }
let session_id_len = *handshake.get(pos).unwrap_or(&0) as usize;
pos += 1 + session_id_len;
if pos + 2 > handshake.len() {
return None;
}
let cipher_suites_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize;
pos += 2 + cipher_suites_len;
if pos + 1 > handshake.len() {
return None;
}
let comp_len = *handshake.get(pos)? as usize;
if pos + 2 > handshake.len() { return Vec::new(); }
let cipher_len = u16::from_be_bytes([handshake[pos], handshake[pos+1]]) as usize;
pos += 2 + cipher_len;
if pos >= handshake.len() { return Vec::new(); }
let comp_len = *handshake.get(pos).unwrap_or(&0) as usize;
pos += 1 + comp_len;
if pos + 2 > handshake.len() {
return None;
}
let ext_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize;
if pos + 2 > handshake.len() { return Vec::new(); }
let ext_len = u16::from_be_bytes([handshake[pos], handshake[pos+1]]) as usize;
pos += 2;
let ext_end = pos + ext_len;
if ext_end > handshake.len() {
return None;
}
if ext_end > handshake.len() { return Vec::new(); }
let mut out = Vec::new();
while pos + 4 <= ext_end {
let etype = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]);
let elen = u16::from_be_bytes([handshake[pos + 2], handshake[pos + 3]]) as usize;
let etype = u16::from_be_bytes([handshake[pos], handshake[pos+1]]);
let elen = u16::from_be_bytes([handshake[pos+2], handshake[pos+3]]) as usize;
pos += 4;
if pos + elen > ext_end {
if pos + elen > ext_end { break; }
if etype == extension_type::ALPN && elen >= 3 {
let list_len = u16::from_be_bytes([handshake[pos], handshake[pos+1]]) as usize;
let mut lp = pos + 2;
let list_end = (pos + 2).saturating_add(list_len).min(pos + elen);
while lp + 1 <= list_end {
let plen = handshake[lp] as usize;
lp += 1;
if lp + plen > list_end { break; }
out.push(handshake[lp..lp+plen].to_vec());
lp += plen;
}
break;
}
if etype == 0x0010 && elen >= 3 {
// ALPN
let list_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize;
let mut alpn_pos = pos + 2;
let list_end = std::cmp::min(alpn_pos + list_len, pos + elen);
let mut protocols = Vec::new();
while alpn_pos < list_end {
let proto_len = *handshake.get(alpn_pos)? as usize;
alpn_pos += 1;
if alpn_pos + proto_len > list_end {
break;
}
if let Ok(p) = std::str::from_utf8(&handshake[alpn_pos..alpn_pos + proto_len]) {
protocols.push(p.to_string());
}
alpn_pos += proto_len;
}
return Some(protocols);
}
pos += elen;
}
None
out
}
/// Check if bytes look like a TLS ClientHello
pub fn is_tls_handshake(first_bytes: &[u8]) -> bool {
if first_bytes.len() < 3 {
@@ -741,7 +761,7 @@ mod tests {
let session_id = vec![0xAA; 32];
let rng = SecureRandom::new();
let response = build_server_hello(secret, &client_digest, &session_id, 2048, &rng);
let response = build_server_hello(secret, &client_digest, &session_id, 2048, &rng, None, 0);
// Should have at least 3 records
assert!(response.len() > 100);
@@ -774,8 +794,8 @@ mod tests {
let session_id = vec![0xAA; 32];
let rng = SecureRandom::new();
let response1 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng);
let response2 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng);
let response1 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng, None, 0);
let response2 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng, None, 0);
// Digest position should have non-zero data
let digest1 = &response1[TLS_DIGEST_POS..TLS_DIGEST_POS + TLS_DIGEST_LEN];
@@ -904,8 +924,12 @@ mod tests {
alpn_data.push(2);
alpn_data.extend_from_slice(b"h2");
let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test");
let alpn = extract_alpn_from_client_hello(&ch).unwrap();
assert_eq!(alpn, vec!["h2"]);
let alpn = extract_alpn_from_client_hello(&ch);
let alpn_str: Vec<String> = alpn
.iter()
.map(|p| std::str::from_utf8(p).unwrap().to_string())
.collect();
assert_eq!(alpn_str, vec!["h2"]);
}
#[test]
@@ -920,7 +944,11 @@ mod tests {
alpn_data.push(2);
alpn_data.extend_from_slice(b"h3");
let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test");
let alpn = extract_alpn_from_client_hello(&ch).unwrap();
assert_eq!(alpn, vec!["h2", "spdy", "h3"]);
let alpn = extract_alpn_from_client_hello(&ch);
let alpn_str: Vec<String> = alpn
.iter()
.map(|p| std::str::from_utf8(p).unwrap().to_string())
.collect();
assert_eq!(alpn_str, vec!["h2", "spdy", "h3"]);
}
}

View File

@@ -178,8 +178,9 @@ async fn do_tg_handshake_static(
let (read_half, write_half) = stream.into_split();
let max_pending = config.general.crypto_pending_buffer;
Ok((
CryptoReader::new(read_half, tg_decryptor),
CryptoWriter::new(write_half, tg_encryptor),
CryptoWriter::new(write_half, tg_encryptor, max_pending),
))
}

View File

@@ -7,6 +7,7 @@ use tracing::{debug, warn, trace, info};
use zeroize::Zeroize;
use crate::crypto::{sha256, AesCtr, SecureRandom};
use rand::Rng;
use crate::protocol::constants::*;
use crate::protocol::tls;
use crate::stream::{FakeTlsReader, FakeTlsWriter, CryptoReader, CryptoWriter};
@@ -119,6 +120,23 @@ where
None
};
let alpn_list = if config.censorship.alpn_enforce {
tls::extract_alpn_from_client_hello(handshake)
} else {
Vec::new()
};
let selected_alpn = if config.censorship.alpn_enforce {
if alpn_list.iter().any(|p| p == b"h2") {
Some(b"h2".to_vec())
} else if alpn_list.iter().any(|p| p == b"http/1.1") {
Some(b"http/1.1".to_vec())
} else {
None
}
} else {
None
};
let response = if let Some(cached_entry) = cached {
emulator::build_emulated_server_hello(
secret,
@@ -126,6 +144,8 @@ where
&validation.session_id,
&cached_entry,
rng,
selected_alpn.clone(),
config.censorship.tls_new_session_tickets,
)
} else {
tls::build_server_hello(
@@ -134,9 +154,25 @@ where
&validation.session_id,
config.censorship.fake_cert_len,
rng,
selected_alpn.clone(),
config.censorship.tls_new_session_tickets,
)
};
// Optional anti-fingerprint delay before sending ServerHello.
if config.censorship.server_hello_delay_max_ms > 0 {
let min = config.censorship.server_hello_delay_min_ms;
let max = config.censorship.server_hello_delay_max_ms.max(min);
let delay_ms = if max == min {
max
} else {
rand::rng().random_range(min..=max)
};
if delay_ms > 0 {
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
}
}
debug!(peer = %peer, response_len = response.len(), "Sending TLS ServerHello");
if let Err(e) = writer.write_all(&response).await {
@@ -217,7 +253,11 @@ where
let mode_ok = match proto_tag {
ProtoTag::Secure => {
if is_tls { config.general.modes.tls } else { config.general.modes.secure }
if is_tls {
config.general.modes.tls || config.general.modes.secure
} else {
config.general.modes.secure || config.general.modes.tls
}
}
ProtoTag::Intermediate | ProtoTag::Abridged => config.general.modes.classic,
};
@@ -264,9 +304,10 @@ where
"MTProto handshake successful"
);
let max_pending = config.general.crypto_pending_buffer;
return HandshakeResult::Success((
CryptoReader::new(reader, decryptor),
CryptoWriter::new(writer, encryptor),
CryptoWriter::new(writer, encryptor, max_pending),
success,
));
}

View File

@@ -2,24 +2,30 @@ use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tracing::{debug, info, trace};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info, trace, warn};
use crate::config::ProxyConfig;
use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result};
use crate::protocol::constants::*;
use crate::protocol::constants::{*, secure_padding_len};
use crate::proxy::handshake::HandshakeSuccess;
use crate::stats::Stats;
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
enum C2MeCommand {
Data { payload: Vec<u8>, flags: u32 },
Close,
}
pub(crate) async fn handle_via_middle_proxy<R, W>(
mut crypto_reader: CryptoReader<R>,
mut crypto_writer: CryptoWriter<W>,
crypto_writer: CryptoWriter<W>,
success: HandshakeSuccess,
me_pool: Arc<MePool>,
stats: Arc<Stats>,
_config: Arc<ProxyConfig>,
config: Arc<ProxyConfig>,
_buffer_pool: Arc<BufferPool>,
local_addr: SocketAddr,
rng: Arc<SecureRandom>,
@@ -41,7 +47,7 @@ where
"Routing via Middle-End"
);
let (conn_id, mut me_rx) = me_pool.registry().register().await;
let (conn_id, me_rx) = me_pool.registry().register().await;
stats.increment_user_connects(&user);
stats.increment_user_curr_connects(&user);
@@ -56,59 +62,179 @@ where
let translated_local_addr = me_pool.translate_our_addr(local_addr);
let result: Result<()> = loop {
tokio::select! {
client_frame = read_client_payload(&mut crypto_reader, proto_tag) => {
match client_frame {
Ok(Some((payload, quickack))) => {
trace!(conn_id, bytes = payload.len(), "C->ME frame");
stats.add_user_octets_from(&user, payload.len() as u64);
let mut flags = proto_flags;
if quickack {
flags |= RPC_FLAG_QUICKACK;
}
if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) {
flags |= RPC_FLAG_NOT_ENCRYPTED;
}
me_pool.send_proxy_req(
conn_id,
success.dc_idx,
peer,
translated_local_addr,
&payload,
flags,
).await?;
}
Ok(None) => {
debug!(conn_id, "Client EOF");
let _ = me_pool.send_close(conn_id).await;
break Ok(());
}
Err(e) => break Err(e),
let frame_limit = config.general.max_client_frame;
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(1024);
let me_pool_c2me = me_pool.clone();
let c2me_sender = tokio::spawn(async move {
while let Some(cmd) = c2me_rx.recv().await {
match cmd {
C2MeCommand::Data { payload, flags } => {
me_pool_c2me.send_proxy_req(
conn_id,
success.dc_idx,
peer,
translated_local_addr,
&payload,
flags,
).await?;
}
}
me_msg = me_rx.recv() => {
match me_msg {
Some(MeResponse::Data { flags, data }) => {
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
stats.add_user_octets_to(&user, data.len() as u64);
write_client_payload(&mut crypto_writer, proto_tag, flags, &data, rng.as_ref()).await?;
}
Some(MeResponse::Ack(confirm)) => {
trace!(conn_id, confirm, "ME->C quickack");
write_client_ack(&mut crypto_writer, proto_tag, confirm).await?;
}
Some(MeResponse::Close) => {
debug!(conn_id, "ME sent close");
break Ok(());
}
None => {
debug!(conn_id, "ME channel closed");
break Err(ProxyError::Proxy("ME connection lost".into()));
}
C2MeCommand::Close => {
let _ = me_pool_c2me.send_close(conn_id).await;
return Ok(());
}
}
}
Ok(())
});
let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
let mut me_rx_task = me_rx;
let stats_clone = stats.clone();
let rng_clone = rng.clone();
let user_clone = user.clone();
let me_writer = tokio::spawn(async move {
let mut writer = crypto_writer;
let mut frame_buf = Vec::with_capacity(16 * 1024);
loop {
tokio::select! {
msg = me_rx_task.recv() => {
match msg {
Some(MeResponse::Data { flags, data }) => {
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
write_client_payload(
&mut writer,
proto_tag,
flags,
&data,
rng_clone.as_ref(),
&mut frame_buf,
)
.await?;
// Drain all immediately queued ME responses and flush once.
while let Ok(next) = me_rx_task.try_recv() {
match next {
MeResponse::Data { flags, data } => {
trace!(conn_id, bytes = data.len(), flags, "ME->C data (batched)");
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
write_client_payload(
&mut writer,
proto_tag,
flags,
&data,
rng_clone.as_ref(),
&mut frame_buf,
).await?;
}
MeResponse::Ack(confirm) => {
trace!(conn_id, confirm, "ME->C quickack (batched)");
write_client_ack(&mut writer, proto_tag, confirm).await?;
}
MeResponse::Close => {
debug!(conn_id, "ME sent close (batched)");
let _ = writer.flush().await;
return Ok(());
}
}
}
writer.flush().await.map_err(ProxyError::Io)?;
}
Some(MeResponse::Ack(confirm)) => {
trace!(conn_id, confirm, "ME->C quickack");
write_client_ack(&mut writer, proto_tag, confirm).await?;
}
Some(MeResponse::Close) => {
debug!(conn_id, "ME sent close");
let _ = writer.flush().await;
return Ok(());
}
None => {
debug!(conn_id, "ME channel closed");
return Err(ProxyError::Proxy("ME connection lost".into()));
}
}
}
_ = &mut stop_rx => {
debug!(conn_id, "ME writer stop signal");
return Ok(());
}
}
}
});
let mut main_result: Result<()> = Ok(());
let mut client_closed = false;
let mut frame_counter: u64 = 0;
loop {
match read_client_payload(
&mut crypto_reader,
proto_tag,
frame_limit,
&user,
&mut frame_counter,
&stats,
).await {
Ok(Some((payload, quickack))) => {
trace!(conn_id, bytes = payload.len(), "C->ME frame");
stats.add_user_octets_from(&user, payload.len() as u64);
let mut flags = proto_flags;
if quickack {
flags |= RPC_FLAG_QUICKACK;
}
if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) {
flags |= RPC_FLAG_NOT_ENCRYPTED;
}
// Keep client read loop lightweight: route heavy ME send path via a dedicated task.
if c2me_tx
.send(C2MeCommand::Data { payload, flags })
.await
.is_err()
{
main_result = Err(ProxyError::Proxy("ME sender channel closed".into()));
break;
}
}
Ok(None) => {
debug!(conn_id, "Client EOF");
client_closed = true;
let _ = c2me_tx.send(C2MeCommand::Close).await;
break;
}
Err(e) => {
main_result = Err(e);
break;
}
}
}
drop(c2me_tx);
let c2me_result = c2me_sender
.await
.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME sender join error: {e}"))));
let _ = stop_tx.send(());
let mut writer_result = me_writer
.await
.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}"))));
// When client closes, but ME channel stopped as unregistered - it isnt error
if client_closed {
if matches!(
writer_result,
Err(ProxyError::Proxy(ref msg)) if msg == "ME connection lost"
) {
writer_result = Ok(());
}
}
let result = match (main_result, c2me_result, writer_result) {
(Ok(()), Ok(()), Ok(())) => Ok(()),
(Err(e), _, _) => Err(e),
(_, Err(e), _) => Err(e),
(_, _, Err(e)) => Err(e),
};
debug!(user = %user, conn_id, "ME relay cleanup");
@@ -120,66 +246,125 @@ where
async fn read_client_payload<R>(
client_reader: &mut CryptoReader<R>,
proto_tag: ProtoTag,
max_frame: usize,
user: &str,
frame_counter: &mut u64,
stats: &Stats,
) -> Result<Option<(Vec<u8>, bool)>>
where
R: AsyncRead + Unpin + Send + 'static,
{
let (len, quickack) = match proto_tag {
ProtoTag::Abridged => {
let mut first = [0u8; 1];
match client_reader.read_exact(&mut first).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(ProxyError::Io(e)),
loop {
let (len, quickack, raw_len_bytes) = match proto_tag {
ProtoTag::Abridged => {
let mut first = [0u8; 1];
match client_reader.read_exact(&mut first).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(ProxyError::Io(e)),
}
let quickack = (first[0] & 0x80) != 0;
let len_words = if (first[0] & 0x7f) == 0x7f {
let mut ext = [0u8; 3];
client_reader
.read_exact(&mut ext)
.await
.map_err(ProxyError::Io)?;
u32::from_le_bytes([ext[0], ext[1], ext[2], 0]) as usize
} else {
(first[0] & 0x7f) as usize
};
let len = len_words
.checked_mul(4)
.ok_or_else(|| ProxyError::Proxy("Abridged frame length overflow".into()))?;
(len, quickack, None)
}
let quickack = (first[0] & 0x80) != 0;
let len_words = if (first[0] & 0x7f) == 0x7f {
let mut ext = [0u8; 3];
client_reader
.read_exact(&mut ext)
.await
.map_err(ProxyError::Io)?;
u32::from_le_bytes([ext[0], ext[1], ext[2], 0]) as usize
} else {
(first[0] & 0x7f) as usize
};
let len = len_words
.checked_mul(4)
.ok_or_else(|| ProxyError::Proxy("Abridged frame length overflow".into()))?;
(len, quickack)
}
ProtoTag::Intermediate | ProtoTag::Secure => {
let mut len_buf = [0u8; 4];
match client_reader.read_exact(&mut len_buf).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(ProxyError::Io(e)),
ProtoTag::Intermediate | ProtoTag::Secure => {
let mut len_buf = [0u8; 4];
match client_reader.read_exact(&mut len_buf).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(ProxyError::Io(e)),
}
let quickack = (len_buf[3] & 0x80) != 0;
(
(u32::from_le_bytes(len_buf) & 0x7fff_ffff) as usize,
quickack,
Some(len_buf),
)
}
let quickack = (len_buf[3] & 0x80) != 0;
((u32::from_le_bytes(len_buf) & 0x7fff_ffff) as usize, quickack)
};
if len == 0 {
continue;
}
};
if len > 16 * 1024 * 1024 {
return Err(ProxyError::Proxy(format!("Frame too large: {len}")));
}
let mut payload = vec![0u8; len];
client_reader
.read_exact(&mut payload)
.await
.map_err(ProxyError::Io)?;
// Secure Intermediate: remove random padding (last len%4 bytes)
if proto_tag == ProtoTag::Secure {
let rem = len % 4;
if rem != 0 && payload.len() >= rem {
payload.truncate(len - rem);
if len < 4 && proto_tag != ProtoTag::Abridged {
warn!(
user = %user,
len,
proto = ?proto_tag,
"Frame too small — corrupt or probe"
);
return Err(ProxyError::Proxy(format!("Frame too small: {len}")));
}
if len > max_frame {
let len_buf = raw_len_bytes.unwrap_or((len as u32).to_le_bytes());
let looks_like_tls = raw_len_bytes
.map(|b| b[0] == 0x16 && b[1] == 0x03)
.unwrap_or(false);
let looks_like_http = raw_len_bytes
.map(|b| matches!(b[0], b'G' | b'P' | b'H' | b'C' | b'D'))
.unwrap_or(false);
warn!(
user = %user,
raw_len = len,
raw_len_hex = format_args!("0x{:08x}", len),
raw_bytes = format_args!(
"{:02x} {:02x} {:02x} {:02x}",
len_buf[0], len_buf[1], len_buf[2], len_buf[3]
),
proto = ?proto_tag,
tls_like = looks_like_tls,
http_like = looks_like_http,
frames_ok = *frame_counter,
"Frame too large — crypto desync forensics"
);
return Err(ProxyError::Proxy(format!(
"Frame too large: {len} (max {max_frame}), frames_ok={}",
*frame_counter
)));
}
let secure_payload_len = if proto_tag == ProtoTag::Secure {
match secure_payload_len_from_wire_len(len) {
Some(payload_len) => payload_len,
None => {
stats.increment_secure_padding_invalid();
return Err(ProxyError::Proxy(format!(
"Invalid secure frame length: {len}"
)));
}
}
} else {
len
};
let mut payload = vec![0u8; len];
client_reader
.read_exact(&mut payload)
.await
.map_err(ProxyError::Io)?;
// Secure Intermediate: strip validated trailing padding bytes.
if proto_tag == ProtoTag::Secure {
payload.truncate(secure_payload_len);
}
*frame_counter += 1;
return Ok(Some((payload, quickack)));
}
Ok(Some((payload, quickack)))
}
async fn write_client_payload<W>(
@@ -188,6 +373,7 @@ async fn write_client_payload<W>(
flags: u32,
data: &[u8],
rng: &SecureRandom,
frame_buf: &mut Vec<u8>,
) -> Result<()>
where
W: AsyncWrite + Unpin + Send + 'static,
@@ -209,8 +395,12 @@ where
if quickack {
first |= 0x80;
}
frame_buf.clear();
frame_buf.reserve(1 + data.len());
frame_buf.push(first);
frame_buf.extend_from_slice(data);
client_writer
.write_all(&[first])
.write_all(&frame_buf)
.await
.map_err(ProxyError::Io)?;
} else if len_words < (1 << 24) {
@@ -219,8 +409,12 @@ where
first |= 0x80;
}
let lw = (len_words as u32).to_le_bytes();
frame_buf.clear();
frame_buf.reserve(4 + data.len());
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
frame_buf.extend_from_slice(data);
client_writer
.write_all(&[first, lw[0], lw[1], lw[2]])
.write_all(&frame_buf)
.await
.map_err(ProxyError::Io)?;
} else {
@@ -229,47 +423,40 @@ where
data.len()
)));
}
client_writer
.write_all(data)
.await
.map_err(ProxyError::Io)?;
}
ProtoTag::Intermediate | ProtoTag::Secure => {
let padding_len = if proto_tag == ProtoTag::Secure {
(rng.bytes(1)[0] % 4) as usize
if !is_valid_secure_payload_len(data.len()) {
return Err(ProxyError::Proxy(format!(
"Secure payload must be 4-byte aligned, got {}",
data.len()
)));
}
secure_padding_len(data.len(), rng)
} else {
0
};
let mut len = (data.len() + padding_len) as u32;
let mut len_val = (data.len() + padding_len) as u32;
if quickack {
len |= 0x8000_0000;
len_val |= 0x8000_0000;
}
client_writer
.write_all(&len.to_le_bytes())
.await
.map_err(ProxyError::Io)?;
client_writer
.write_all(data)
.await
.map_err(ProxyError::Io)?;
let total = 4 + data.len() + padding_len;
frame_buf.clear();
frame_buf.reserve(total);
frame_buf.extend_from_slice(&len_val.to_le_bytes());
frame_buf.extend_from_slice(data);
if padding_len > 0 {
let pad = rng.bytes(padding_len);
client_writer
.write_all(&pad)
.await
.map_err(ProxyError::Io)?;
let start = frame_buf.len();
frame_buf.resize(start + padding_len, 0);
rng.fill(&mut frame_buf[start..]);
}
client_writer
.write_all(&frame_buf)
.await
.map_err(ProxyError::Io)?;
}
}
// Avoid unconditional per-frame flush (throughput killer on large downloads).
// Flush only when low-latency ack semantics are requested or when
// CryptoWriter has buffered pending ciphertext that must be drained.
if quickack || client_writer.has_pending() {
client_writer.flush().await.map_err(ProxyError::Io)?;
}
Ok(())
}

View File

@@ -21,8 +21,16 @@ pub struct Stats {
handshake_timeouts: AtomicU64,
me_keepalive_sent: AtomicU64,
me_keepalive_failed: AtomicU64,
me_keepalive_pong: AtomicU64,
me_keepalive_timeout: AtomicU64,
me_reconnect_attempts: AtomicU64,
me_reconnect_success: AtomicU64,
me_crc_mismatch: AtomicU64,
me_seq_mismatch: AtomicU64,
me_route_drop_no_conn: AtomicU64,
me_route_drop_channel_closed: AtomicU64,
me_route_drop_queue_full: AtomicU64,
secure_padding_invalid: AtomicU64,
user_stats: DashMap<String, UserStats>,
start_time: parking_lot::RwLock<Option<Instant>>,
}
@@ -49,14 +57,45 @@ impl Stats {
pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_keepalive_sent(&self) { self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_keepalive_failed(&self) { self.me_keepalive_failed.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_keepalive_pong(&self) { self.me_keepalive_pong.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_keepalive_timeout(&self) { self.me_keepalive_timeout.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_keepalive_timeout_by(&self, value: u64) {
self.me_keepalive_timeout.fetch_add(value, Ordering::Relaxed);
}
pub fn increment_me_reconnect_attempt(&self) { self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_reconnect_success(&self) { self.me_reconnect_success.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_crc_mismatch(&self) { self.me_crc_mismatch.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_seq_mismatch(&self) { self.me_seq_mismatch.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_route_drop_no_conn(&self) { self.me_route_drop_no_conn.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_route_drop_channel_closed(&self) {
self.me_route_drop_channel_closed.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_route_drop_queue_full(&self) {
self.me_route_drop_queue_full.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_secure_padding_invalid(&self) {
self.secure_padding_invalid.fetch_add(1, Ordering::Relaxed);
}
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) }
pub fn get_me_keepalive_pong(&self) -> u64 { self.me_keepalive_pong.load(Ordering::Relaxed) }
pub fn get_me_keepalive_timeout(&self) -> u64 { self.me_keepalive_timeout.load(Ordering::Relaxed) }
pub fn get_me_reconnect_attempts(&self) -> u64 { self.me_reconnect_attempts.load(Ordering::Relaxed) }
pub fn get_me_reconnect_success(&self) -> u64 { self.me_reconnect_success.load(Ordering::Relaxed) }
pub fn get_me_crc_mismatch(&self) -> u64 { self.me_crc_mismatch.load(Ordering::Relaxed) }
pub fn get_me_seq_mismatch(&self) -> u64 { self.me_seq_mismatch.load(Ordering::Relaxed) }
pub fn get_me_route_drop_no_conn(&self) -> u64 { self.me_route_drop_no_conn.load(Ordering::Relaxed) }
pub fn get_me_route_drop_channel_closed(&self) -> u64 {
self.me_route_drop_channel_closed.load(Ordering::Relaxed)
}
pub fn get_me_route_drop_queue_full(&self) -> u64 {
self.me_route_drop_queue_full.load(Ordering::Relaxed)
}
pub fn get_secure_padding_invalid(&self) -> u64 {
self.secure_padding_invalid.load(Ordering::Relaxed)
}
pub fn increment_user_connects(&self, user: &str) {
self.user_stats.entry(user.to_string()).or_default()
@@ -70,7 +109,22 @@ impl Stats {
pub fn decrement_user_curr_connects(&self, user: &str) {
if let Some(stats) = self.user_stats.get(user) {
stats.curr_connects.fetch_sub(1, Ordering::Relaxed);
let counter = &stats.curr_connects;
let mut current = counter.load(Ordering::Relaxed);
loop {
if current == 0 {
break;
}
match counter.compare_exchange_weak(
current,
current - 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
}

View File

@@ -34,7 +34,7 @@
//! └────────────────────────────────────────┘
//!
//! Backpressure
//! - pending ciphertext buffer is bounded (MAX_PENDING_WRITE)
//! - pending ciphertext buffer is bounded (configurable per connection)
//! - pending is full and upstream is pending
//! -> poll_write returns Poll::Pending
//! -> do not accept any plaintext
@@ -62,10 +62,9 @@ use super::state::{StreamState, YieldBuffer};
// ============= Constants =============
/// Maximum size for pending ciphertext buffer (bounded backpressure).
/// Reduced to 64KB to prevent bufferbloat on mobile networks.
/// 512KB was causing high latency on 3G/LTE connections.
const MAX_PENDING_WRITE: usize = 64 * 1024;
/// Default size for pending ciphertext buffer (bounded backpressure).
/// Actual limit is supplied at runtime from configuration.
const DEFAULT_MAX_PENDING_WRITE: usize = 64 * 1024;
/// Default read buffer capacity (reader mostly decrypts in-place into caller buffer).
const DEFAULT_READ_CAPACITY: usize = 16 * 1024;
@@ -427,15 +426,22 @@ pub struct CryptoWriter<W> {
encryptor: AesCtr,
state: CryptoWriterState,
scratch: BytesMut,
max_pending_write: usize,
}
impl<W> CryptoWriter<W> {
pub fn new(upstream: W, encryptor: AesCtr) -> Self {
pub fn new(upstream: W, encryptor: AesCtr, max_pending_write: usize) -> Self {
let max_pending = if max_pending_write == 0 {
DEFAULT_MAX_PENDING_WRITE
} else {
max_pending_write
};
Self {
upstream,
encryptor,
state: CryptoWriterState::Idle,
scratch: BytesMut::with_capacity(16 * 1024),
max_pending_write: max_pending.max(4 * 1024),
}
}
@@ -484,10 +490,10 @@ impl<W> CryptoWriter<W> {
}
/// Ensure we are in Flushing state and return mutable pending buffer.
fn ensure_pending<'a>(state: &'a mut CryptoWriterState) -> &'a mut PendingCiphertext {
fn ensure_pending<'a>(state: &'a mut CryptoWriterState, max_pending: usize) -> &'a mut PendingCiphertext {
if matches!(state, CryptoWriterState::Idle) {
*state = CryptoWriterState::Flushing {
pending: PendingCiphertext::new(MAX_PENDING_WRITE),
pending: PendingCiphertext::new(max_pending),
};
}
@@ -498,14 +504,14 @@ impl<W> CryptoWriter<W> {
}
/// Select how many plaintext bytes can be accepted in buffering path
fn select_to_accept_for_buffering(state: &CryptoWriterState, buf_len: usize) -> usize {
fn select_to_accept_for_buffering(state: &CryptoWriterState, buf_len: usize, max_pending: usize) -> usize {
if buf_len == 0 {
return 0;
}
match state {
CryptoWriterState::Flushing { pending } => buf_len.min(pending.remaining_capacity()),
CryptoWriterState::Idle => buf_len.min(MAX_PENDING_WRITE),
CryptoWriterState::Idle => buf_len.min(max_pending),
CryptoWriterState::Poisoned { .. } => 0,
}
}
@@ -603,7 +609,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
Poll::Pending => {
// Upstream blocked. Apply ideal backpressure
let to_accept =
Self::select_to_accept_for_buffering(&this.state, buf.len());
Self::select_to_accept_for_buffering(&this.state, buf.len(), this.max_pending_write);
if to_accept == 0 {
trace!(
@@ -618,7 +624,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
// Disjoint borrows
let encryptor = &mut this.encryptor;
let pending = Self::ensure_pending(&mut this.state);
let pending = Self::ensure_pending(&mut this.state, this.max_pending_write);
if let Err(e) = pending.push_encrypted(encryptor, plaintext) {
if e.kind() == ErrorKind::WouldBlock {
@@ -635,7 +641,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
// 2) Fast path: pending empty -> write-through
debug_assert!(matches!(this.state, CryptoWriterState::Idle));
let to_accept = buf.len().min(MAX_PENDING_WRITE);
let to_accept = buf.len().min(this.max_pending_write);
let plaintext = &buf[..to_accept];
Self::encrypt_into_scratch(&mut this.encryptor, &mut this.scratch, plaintext);
@@ -645,7 +651,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
// Upstream blocked: buffer FULL ciphertext for accepted bytes.
let ciphertext = std::mem::take(&mut this.scratch);
let pending = Self::ensure_pending(&mut this.state);
let pending = Self::ensure_pending(&mut this.state, this.max_pending_write);
pending.replace_with(ciphertext);
Poll::Ready(Ok(to_accept))
@@ -672,7 +678,7 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for CryptoWriter<W> {
let remainder = this.scratch.split_off(n);
this.scratch.clear();
let pending = Self::ensure_pending(&mut this.state);
let pending = Self::ensure_pending(&mut this.state, this.max_pending_write);
pending.replace_with(remainder);
Poll::Ready(Ok(to_accept))
@@ -767,4 +773,4 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for PassthroughStream<S> {
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}
}
}

View File

@@ -8,7 +8,9 @@ use std::io::{self, Error, ErrorKind};
use std::sync::Arc;
use tokio_util::codec::{Decoder, Encoder};
use crate::protocol::constants::ProtoTag;
use crate::protocol::constants::{
ProtoTag, is_valid_secure_payload_len, secure_padding_len, secure_payload_len_from_wire_len,
};
use crate::crypto::SecureRandom;
use super::frame::{Frame, FrameMeta, FrameCodec as FrameCodecTrait};
@@ -274,13 +276,13 @@ fn decode_secure(src: &mut BytesMut, max_size: usize) -> io::Result<Option<Frame
return Ok(None);
}
// Calculate padding (indicated by length not divisible by 4)
let padding_len = len % 4;
let data_len = if padding_len != 0 {
len - padding_len
} else {
len
};
let data_len = secure_payload_len_from_wire_len(len).ok_or_else(|| {
Error::new(
ErrorKind::InvalidData,
format!("invalid secure frame length: {len}"),
)
})?;
let padding_len = len - data_len;
meta.padding_len = padding_len as u8;
@@ -303,14 +305,15 @@ fn encode_secure(frame: &Frame, dst: &mut BytesMut, rng: &SecureRandom) -> io::R
return Ok(());
}
// Generate padding to make length not divisible by 4
let padding_len = if data.len() % 4 == 0 {
// Add 1-3 bytes to make it non-aligned
(rng.range(3) + 1) as usize
} else {
// Already non-aligned, can add 0-3
rng.range(4) as usize
};
if !is_valid_secure_payload_len(data.len()) {
return Err(Error::new(
ErrorKind::InvalidData,
format!("secure payload must be 4-byte aligned, got {}", data.len()),
));
}
// Generate padding that keeps total length non-divisible by 4.
let padding_len = secure_padding_len(data.len(), rng);
let total_len = data.len() + padding_len;
dst.reserve(4 + total_len);
@@ -625,4 +628,4 @@ mod tests {
let result = codec.decode(&mut buf);
assert!(result.is_err());
}
}
}

View File

@@ -232,11 +232,13 @@ impl<R: AsyncRead + Unpin> SecureIntermediateFrameReader<R> {
let mut data = vec![0u8; len];
self.upstream.read_exact(&mut data).await?;
// Strip padding (not aligned to 4)
if len % 4 != 0 {
let actual_len = len - (len % 4);
data.truncate(actual_len);
}
let payload_len = secure_payload_len_from_wire_len(len).ok_or_else(|| {
Error::new(
ErrorKind::InvalidData,
format!("Invalid secure frame length: {len}"),
)
})?;
data.truncate(payload_len);
Ok((Bytes::from(data), meta))
}
@@ -267,8 +269,15 @@ impl<W: AsyncWrite + Unpin> SecureIntermediateFrameWriter<W> {
return Ok(());
}
// Add random padding (0-3 bytes)
let padding_len = self.rng.range(4);
if !is_valid_secure_payload_len(data.len()) {
return Err(Error::new(
ErrorKind::InvalidData,
format!("Secure payload must be 4-byte aligned, got {}", data.len()),
));
}
// Add padding so total length is never divisible by 4 (MTProto Secure)
let padding_len = secure_padding_len(data.len(), &self.rng);
let padding = self.rng.bytes(padding_len);
let total_len = data.len() + padding_len;
@@ -550,9 +559,7 @@ mod tests {
writer.flush().await.unwrap();
let (received, _meta) = reader.read_frame().await.unwrap();
// Received should have padding stripped to align to 4
let expected_len = (data.len() / 4) * 4;
assert_eq!(received.len(), expected_len);
assert_eq!(received.len(), data.len());
}
#[tokio::test]
@@ -585,4 +592,4 @@ mod tests {
let (received, _) = reader.read_frame().await.unwrap();
assert_eq!(&received[..], &data[..]);
}
}
}

View File

@@ -40,4 +40,4 @@ pub use frame_stream::{
SecureIntermediateFrameReader, SecureIntermediateFrameWriter,
MtprotoFrameReader, MtprotoFrameWriter,
FrameReaderKind, FrameWriterKind,
};
};

View File

@@ -25,7 +25,8 @@
//! - However, the on-the-wire record length can exceed 16384 because TLS 1.3
//! uses AEAD and can include tag/overhead/padding.
//! - Telegram FakeTLS clients (notably iOS) may send Application Data records
//! with length up to 16384 + 24 bytes. We accept that as MAX_TLS_CHUNK_SIZE.
//! with length up to 16384 + 256 bytes (RFC 8446 §5.2). We accept that as
//! MAX_TLS_CHUNK_SIZE.
//!
//! If you reject those (e.g. validate length <= 16384), you will see errors like:
//! "TLS record too large: 16408 bytes"
@@ -52,9 +53,8 @@ use super::state::{StreamState, HeaderBuffer, YieldBuffer, WriteBuffer};
const TLS_HEADER_SIZE: usize = 5;
/// Maximum TLS fragment size we emit for Application Data.
/// Real TLS 1.3 ciphertexts often add ~16-24 bytes AEAD overhead, so to mimic
/// on-the-wire record sizes we allow up to 16384 + 24 bytes of plaintext.
const MAX_TLS_PAYLOAD: usize = 16384 + 24;
/// Real TLS 1.3 allows up to 16384 + 256 bytes of ciphertext (incl. tag).
const MAX_TLS_PAYLOAD: usize = 16384 + 256;
/// Maximum pending write buffer for one record remainder.
/// Note: we never queue unlimited amount of data here; state holds at most one record.
@@ -91,7 +91,7 @@ impl TlsRecordHeader {
/// - We accept TLS 1.0 header version for ClientHello-like records (0x03 0x01),
/// and TLS 1.2/1.3 style version bytes for the rest (we use TLS_VERSION = 0x03 0x03).
/// - For Application Data, Telegram FakeTLS may send payload length up to
/// MAX_TLS_CHUNK_SIZE (16384 + 24).
/// MAX_TLS_CHUNK_SIZE (16384 + 256).
/// - For other record types we keep stricter bounds to avoid memory abuse.
fn validate(&self) -> Result<()> {
// Version: accept TLS 1.0 header (ClientHello quirk) and TLS_VERSION (0x0303).
@@ -105,7 +105,7 @@ impl TlsRecordHeader {
let len = self.length as usize;
// Length checks depend on record type.
// Telegram FakeTLS: ApplicationData length may be 16384 + 24.
// Telegram FakeTLS: ApplicationData length may be 16384 + 256.
match self.record_type {
TLS_RECORD_APPLICATION => {
if len > MAX_TLS_CHUNK_SIZE {
@@ -755,9 +755,6 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for FakeTlsWriter<W> {
payload_size: chunk_size,
};
// Wake to retry flushing soon.
cx.waker().wake_by_ref();
Poll::Ready(Ok(chunk_size))
}
}

View File

@@ -72,7 +72,27 @@ impl TlsFrontCache {
continue;
}
if let Ok(data) = tokio::fs::read(entry.path()).await {
if let Ok(cached) = serde_json::from_slice::<CachedTlsData>(&data) {
if let Ok(mut cached) = serde_json::from_slice::<CachedTlsData>(&data) {
if cached.domain.is_empty()
|| cached.domain.len() > 255
|| !cached.domain.chars().all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-')
{
warn!(file = %name, "Skipping TLS cache entry with invalid domain");
continue;
}
// fetched_at is skipped during deserialization; approximate with file mtime if available.
if let Ok(meta) = entry.metadata().await {
if let Ok(modified) = meta.modified() {
cached.fetched_at = modified;
}
}
// Drop entries older than 72h
if let Ok(age) = cached.fetched_at.elapsed() {
if age > Duration::from_secs(72 * 3600) {
warn!(domain = %cached.domain, "Skipping stale TLS cache entry (>72h)");
continue;
}
}
let domain = cached.domain.clone();
self.set(&domain, cached).await;
loaded += 1;

View File

@@ -34,6 +34,8 @@ pub fn build_emulated_server_hello(
session_id: &[u8],
cached: &CachedTlsData,
rng: &SecureRandom,
alpn: Option<Vec<u8>>,
new_session_tickets: u8,
) -> Vec<u8> {
// --- ServerHello ---
let mut extensions = Vec::new();
@@ -48,6 +50,15 @@ pub fn build_emulated_server_hello(
extensions.extend_from_slice(&0x002bu16.to_be_bytes());
extensions.extend_from_slice(&(2u16).to_be_bytes());
extensions.extend_from_slice(&0x0304u16.to_be_bytes());
if let Some(alpn_proto) = &alpn {
extensions.extend_from_slice(&0x0010u16.to_be_bytes());
let list_len: u16 = 1 + alpn_proto.len() as u16;
let ext_len: u16 = 2 + list_len;
extensions.extend_from_slice(&ext_len.to_be_bytes());
extensions.extend_from_slice(&list_len.to_be_bytes());
extensions.push(alpn_proto.len() as u8);
extensions.extend_from_slice(alpn_proto);
}
let extensions_len = extensions.len() as u16;
@@ -118,10 +129,25 @@ pub fn build_emulated_server_hello(
}
// --- Combine ---
let mut response = Vec::with_capacity(server_hello.len() + change_cipher_spec.len() + app_data.len());
// Optional NewSessionTicket mimic records (opaque ApplicationData for fingerprint).
let mut tickets = Vec::new();
if new_session_tickets > 0 {
for _ in 0..new_session_tickets {
let ticket_len: usize = rng.range(48) + 48;
let mut rec = Vec::with_capacity(5 + ticket_len);
rec.push(TLS_RECORD_APPLICATION);
rec.extend_from_slice(&TLS_VERSION);
rec.extend_from_slice(&(ticket_len as u16).to_be_bytes());
rec.extend_from_slice(&rng.bytes(ticket_len));
tickets.extend_from_slice(&rec);
}
}
let mut response = Vec::with_capacity(server_hello.len() + change_cipher_spec.len() + app_data.len() + tickets.len());
response.extend_from_slice(&server_hello);
response.extend_from_slice(&change_cipher_spec);
response.extend_from_slice(&app_data);
response.extend_from_slice(&tickets);
// --- HMAC ---
let mut hmac_input = Vec::with_capacity(TLS_DIGEST_LEN + response.len());

View File

@@ -1,6 +1,6 @@
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::crypto::{AesCbc, crc32};
use crate::crypto::{AesCbc, crc32, crc32c};
use crate::error::{ProxyError, Result};
use crate::protocol::constants::*;
@@ -8,17 +8,46 @@ use crate::protocol::constants::*;
pub(crate) enum WriterCommand {
Data(Vec<u8>),
DataAndFlush(Vec<u8>),
Keepalive,
Close,
}
pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8]) -> Vec<u8> {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum RpcChecksumMode {
Crc32,
Crc32c,
}
impl RpcChecksumMode {
pub(crate) fn from_handshake_flags(flags: u32) -> Self {
if (flags & rpc_crypto_flags::USE_CRC32C) != 0 {
Self::Crc32c
} else {
Self::Crc32
}
}
pub(crate) fn advertised_flags(self) -> u32 {
match self {
Self::Crc32 => 0,
Self::Crc32c => rpc_crypto_flags::USE_CRC32C,
}
}
}
pub(crate) fn rpc_crc(mode: RpcChecksumMode, data: &[u8]) -> u32 {
match mode {
RpcChecksumMode::Crc32 => crc32(data),
RpcChecksumMode::Crc32c => crc32c(data),
}
}
pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8], crc_mode: RpcChecksumMode) -> Vec<u8> {
let total_len = (4 + 4 + payload.len() + 4) as u32;
let mut frame = Vec::with_capacity(total_len as usize);
frame.extend_from_slice(&total_len.to_le_bytes());
frame.extend_from_slice(&seq_no.to_le_bytes());
frame.extend_from_slice(payload);
let c = crc32(&frame);
let c = rpc_crc(crc_mode, &frame);
frame.extend_from_slice(&c.to_le_bytes());
frame
}
@@ -45,7 +74,7 @@ pub(crate) async fn read_rpc_frame_plaintext(
let crc_offset = total_len - 4;
let expected_crc = u32::from_le_bytes(full[crc_offset..crc_offset + 4].try_into().unwrap());
let actual_crc = crc32(&full[..crc_offset]);
let actual_crc = rpc_crc(RpcChecksumMode::Crc32, &full[..crc_offset]);
if expected_crc != actual_crc {
return Err(ProxyError::InvalidHandshake(format!(
"CRC mismatch: 0x{expected_crc:08x} vs 0x{actual_crc:08x}"
@@ -95,24 +124,52 @@ pub(crate) fn build_handshake_payload(
our_port: u16,
peer_ip: [u8; 4],
peer_port: u16,
flags: u32,
) -> [u8; 32] {
let mut p = [0u8; 32];
p[0..4].copy_from_slice(&RPC_HANDSHAKE_U32.to_le_bytes());
p[4..8].copy_from_slice(&flags.to_le_bytes());
// Keep C memory layout compatibility for PID IPv4 bytes.
// process_id sender_pid
p[8..12].copy_from_slice(&our_ip);
p[12..14].copy_from_slice(&our_port.to_le_bytes());
let pid = (std::process::id() & 0xffff) as u16;
p[14..16].copy_from_slice(&pid.to_le_bytes());
p[14..16].copy_from_slice(&process_pid16().to_le_bytes());
p[16..20].copy_from_slice(&process_utime().to_le_bytes());
// process_id peer_pid
p[20..24].copy_from_slice(&peer_ip);
p[24..26].copy_from_slice(&peer_port.to_le_bytes());
p[26..28].copy_from_slice(&0u16.to_le_bytes());
p[28..32].copy_from_slice(&0u32.to_le_bytes());
p
}
pub(crate) fn parse_handshake_flags(payload: &[u8]) -> Result<u32> {
if payload.len() != 32 {
return Err(ProxyError::InvalidHandshake(format!(
"Bad handshake payload len: {}",
payload.len()
)));
}
let hs_type = u32::from_le_bytes(payload[0..4].try_into().unwrap());
if hs_type != RPC_HANDSHAKE_U32 {
return Err(ProxyError::InvalidHandshake(format!(
"Expected HANDSHAKE 0x{RPC_HANDSHAKE_U32:08x}, got 0x{hs_type:08x}"
)));
}
Ok(u32::from_le_bytes(payload[4..8].try_into().unwrap()))
}
fn process_pid16() -> u16 {
(std::process::id() & 0xffff) as u16
}
fn process_utime() -> u32 {
let utime = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as u32;
p[16..20].copy_from_slice(&utime.to_le_bytes());
p[20..24].copy_from_slice(&peer_ip);
p[24..26].copy_from_slice(&peer_port.to_le_bytes());
p
utime
}
pub(crate) fn cbc_encrypt_padded(
@@ -160,12 +217,13 @@ pub(crate) struct RpcWriter {
pub(crate) key: [u8; 32],
pub(crate) iv: [u8; 16],
pub(crate) seq_no: i32,
pub(crate) crc_mode: RpcChecksumMode,
}
impl RpcWriter {
pub(crate) async fn send(&mut self, payload: &[u8]) -> Result<()> {
let frame = build_rpc_frame(self.seq_no, payload);
self.seq_no += 1;
let frame = build_rpc_frame(self.seq_no, payload, self.crc_mode);
self.seq_no = self.seq_no.wrapping_add(1);
let pad = (16 - (frame.len() % 16)) % 16;
let mut buf = frame;
@@ -189,12 +247,4 @@ impl RpcWriter {
self.send(payload).await?;
self.writer.flush().await.map_err(ProxyError::Io)
}
pub(crate) async fn send_keepalive(&mut self, payload: [u8; 4]) -> Result<()> {
// Keepalive is a frame with fl == 4 and 4 bytes payload.
let mut frame = Vec::with_capacity(8);
frame.extend_from_slice(&4u32.to_le_bytes());
frame.extend_from_slice(&payload);
self.send(&frame).await
}
}

View File

@@ -18,13 +18,14 @@ use crate::crypto::{SecureRandom, build_middleproxy_prekey, derive_middleproxy_k
use crate::error::{ProxyError, Result};
use crate::network::IpFamily;
use crate::protocol::constants::{
ME_CONNECT_TIMEOUT_SECS, ME_HANDSHAKE_TIMEOUT_SECS, RPC_CRYPTO_AES_U32, RPC_HANDSHAKE_ERROR_U32,
RPC_HANDSHAKE_U32, RPC_PING_U32, RPC_PONG_U32, RPC_NONCE_U32,
ME_CONNECT_TIMEOUT_SECS, ME_HANDSHAKE_TIMEOUT_SECS, RPC_CRYPTO_AES_U32,
RPC_HANDSHAKE_ERROR_U32, rpc_crypto_flags,
};
use super::codec::{
build_handshake_payload, build_nonce_payload, build_rpc_frame, cbc_decrypt_inplace,
cbc_encrypt_padded, parse_nonce_payload, read_rpc_frame_plaintext,
RpcChecksumMode, build_handshake_payload, build_nonce_payload, build_rpc_frame,
cbc_decrypt_inplace, cbc_encrypt_padded, parse_handshake_flags, parse_nonce_payload,
read_rpc_frame_plaintext, rpc_crc,
};
use super::wire::{extract_ip_material, IpMaterial};
use super::MePool;
@@ -37,6 +38,7 @@ pub(crate) struct HandshakeOutput {
pub read_iv: [u8; 16],
pub write_key: [u8; 32],
pub write_iv: [u8; 16],
pub crc_mode: RpcChecksumMode,
pub handshake_ms: f64,
}
@@ -146,7 +148,7 @@ impl MePool {
let ks = self.key_selector().await;
let nonce_payload = build_nonce_payload(ks, crypto_ts, &my_nonce);
let nonce_frame = build_rpc_frame(-2, &nonce_payload);
let nonce_frame = build_rpc_frame(-2, &nonce_payload, RpcChecksumMode::Crc32);
let dump = hex_dump(&nonce_frame[..nonce_frame.len().min(44)]);
debug!(
key_selector = format_args!("0x{ks:08x}"),
@@ -284,8 +286,15 @@ impl MePool {
srv_v6_opt.as_ref(),
);
let hs_payload = build_handshake_payload(hs_our_ip, local_addr.port(), hs_peer_ip, peer_addr.port());
let hs_frame = build_rpc_frame(-1, &hs_payload);
let requested_crc_mode = RpcChecksumMode::Crc32c;
let hs_payload = build_handshake_payload(
hs_our_ip,
local_addr.port(),
hs_peer_ip,
peer_addr.port(),
requested_crc_mode.advertised_flags(),
);
let hs_frame = build_rpc_frame(-1, &hs_payload, RpcChecksumMode::Crc32);
if diag_level >= 1 {
info!(
write_key = %hex_dump(&wk),
@@ -314,7 +323,7 @@ impl MePool {
);
}
let (encrypted_hs, mut write_iv) = cbc_encrypt_padded(&wk, &wi, &hs_frame)?;
let (encrypted_hs, write_iv) = cbc_encrypt_padded(&wk, &wi, &hs_frame)?;
if diag_level >= 1 {
info!(
hs_cipher = %hex_dump(&encrypted_hs),
@@ -328,6 +337,7 @@ impl MePool {
let mut enc_buf = BytesMut::with_capacity(256);
let mut dec_buf = BytesMut::with_capacity(256);
let mut read_iv = ri;
let mut negotiated_crc_mode = RpcChecksumMode::Crc32;
let mut handshake_ok = false;
while Instant::now() < deadline && !handshake_ok {
@@ -375,17 +385,23 @@ impl MePool {
let frame = dec_buf.split_to(fl);
let pe = fl - 4;
let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap());
let ac = crate::crypto::crc32(&frame[..pe]);
let ac = rpc_crc(RpcChecksumMode::Crc32, &frame[..pe]);
if ec != ac {
return Err(ProxyError::InvalidHandshake(format!(
"HS CRC mismatch: 0x{ec:08x} vs 0x{ac:08x}"
)));
}
let hs_type = u32::from_le_bytes(frame[8..12].try_into().unwrap());
let hs_payload = &frame[8..pe];
if hs_payload.len() < 4 {
return Err(ProxyError::InvalidHandshake(
"Handshake payload too short".to_string(),
));
}
let hs_type = u32::from_le_bytes(hs_payload[0..4].try_into().unwrap());
if hs_type == RPC_HANDSHAKE_ERROR_U32 {
let err_code = if frame.len() >= 16 {
i32::from_le_bytes(frame[12..16].try_into().unwrap())
let err_code = if hs_payload.len() >= 8 {
i32::from_le_bytes(hs_payload[4..8].try_into().unwrap())
} else {
-1
};
@@ -393,11 +409,21 @@ impl MePool {
"ME rejected handshake (error={err_code})"
)));
}
if hs_type != RPC_HANDSHAKE_U32 {
let hs_flags = parse_handshake_flags(hs_payload)?;
if hs_flags & 0xff != 0 {
return Err(ProxyError::InvalidHandshake(format!(
"Expected HANDSHAKE 0x{RPC_HANDSHAKE_U32:08x}, got 0x{hs_type:08x}"
"Unsupported handshake flags: 0x{hs_flags:08x}"
)));
}
negotiated_crc_mode = if (hs_flags & requested_crc_mode.advertised_flags()) != 0 {
RpcChecksumMode::from_handshake_flags(hs_flags)
} else if (hs_flags & rpc_crypto_flags::USE_CRC32C) != 0 {
return Err(ProxyError::InvalidHandshake(format!(
"Peer negotiated unsupported CRC flags: 0x{hs_flags:08x}"
)));
} else {
RpcChecksumMode::Crc32
};
handshake_ok = true;
break;
@@ -418,6 +444,7 @@ impl MePool {
read_iv,
write_key: wk,
write_iv,
crc_mode: negotiated_crc_mode,
handshake_ms,
})
}

View File

@@ -17,13 +17,11 @@ use crate::network::IpFamily;
use crate::protocol::constants::*;
use super::ConnRegistry;
use super::registry::{BoundConn, ConnMeta};
use super::registry::BoundConn;
use super::codec::{RpcWriter, WriterCommand};
use super::reader::reader_loop;
use super::MeResponse;
const ME_ACTIVE_PING_SECS: u64 = 25;
const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
const ME_KEEPALIVE_PAYLOAD_LEN: usize = 4;
#[derive(Clone)]
pub struct MeWriter {
@@ -361,7 +359,6 @@ impl MePool {
// Additional connections up to pool_size total (round-robin across DCs), staggered to de-phase lifecycles.
if self.me_warmup_stagger_enabled {
let mut delay_ms = 0u64;
for (dc, addrs) in dc_addrs.iter() {
for (ip, port) in addrs {
if self.connection_count() >= pool_size {
@@ -369,7 +366,7 @@ impl MePool {
}
let addr = SocketAddr::new(*ip, *port);
let jitter = rand::rng().random_range(0..=self.me_warmup_step_jitter.as_millis() as u64);
delay_ms = delay_ms.saturating_add(self.me_warmup_step_delay.as_millis() as u64 + jitter);
let delay_ms = self.me_warmup_step_delay.as_millis() as u64 + jitter;
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
if let Err(e) = self.connect_one(addr, rng.as_ref()).await {
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed (staggered)");
@@ -418,14 +415,12 @@ impl MePool {
let degraded = Arc::new(AtomicBool::new(false));
let draining = Arc::new(AtomicBool::new(false));
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
let tx_for_keepalive = tx.clone();
let keepalive_random = self.me_keepalive_payload_random;
let stats = self.stats.clone();
let mut rpc_writer = RpcWriter {
writer: hs.wr,
key: hs.write_key,
iv: hs.write_iv,
seq_no: 0,
crc_mode: hs.crc_mode,
};
let cancel_wr = cancel.clone();
tokio::spawn(async move {
@@ -439,21 +434,6 @@ impl MePool {
Some(WriterCommand::DataAndFlush(payload)) => {
if rpc_writer.send_and_flush(&payload).await.is_err() { break; }
}
Some(WriterCommand::Keepalive) => {
let mut payload = [0u8; ME_KEEPALIVE_PAYLOAD_LEN];
if keepalive_random {
rand::rng().fill(&mut payload);
}
match rpc_writer.send_keepalive(payload).await {
Ok(()) => {
stats.increment_me_keepalive_sent();
}
Err(_) => {
stats.increment_me_keepalive_failed();
break;
}
}
}
Some(WriterCommand::Close) | None => break,
}
}
@@ -471,12 +451,15 @@ impl MePool {
};
self.writers.write().await.push(writer.clone());
self.conn_count.fetch_add(1, Ordering::Relaxed);
self.writer_available.notify_waiters();
self.writer_available.notify_one();
let reg = self.registry.clone();
let writers_arc = self.writers_arc();
let ping_tracker = self.ping_tracker.clone();
let ping_tracker_reader = ping_tracker.clone();
let rtt_stats = self.rtt_stats.clone();
let stats_reader = self.stats.clone();
let stats_ping = self.stats.clone();
let pool = Arc::downgrade(self);
let cancel_ping = cancel.clone();
let tx_ping = tx.clone();
@@ -489,19 +472,20 @@ impl MePool {
let keepalive_jitter = self.me_keepalive_jitter;
let cancel_reader_token = cancel.clone();
let cancel_ping_token = cancel_ping.clone();
let cancel_keepalive_token = cancel.clone();
tokio::spawn(async move {
let res = reader_loop(
hs.rd,
hs.read_key,
hs.read_iv,
hs.crc_mode,
reg.clone(),
BytesMut::new(),
BytesMut::new(),
tx.clone(),
ping_tracker.clone(),
ping_tracker_reader,
rtt_stats.clone(),
stats_reader,
writer_id,
degraded.clone(),
cancel_reader_token.clone(),
@@ -526,15 +510,40 @@ impl MePool {
let pool_ping = Arc::downgrade(self);
tokio::spawn(async move {
let mut ping_id: i64 = rand::random::<i64>();
loop {
// Per-writer jittered start to avoid phase sync.
let startup_jitter = if keepalive_enabled {
let jitter_cap_ms = keepalive_interval.as_millis() / 2;
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))
} else {
let jitter = rand::rng()
.random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
Duration::from_secs(wait)
};
tokio::select! {
_ = cancel_ping_token.cancelled() => return,
_ = tokio::time::sleep(startup_jitter) => {}
}
loop {
let wait = if keepalive_enabled {
let jitter_cap_ms = keepalive_interval.as_millis() / 2;
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
keepalive_interval
+ Duration::from_millis(
rand::rng().random_range(0..=effective_jitter_ms as u64)
)
} else {
let jitter = rand::rng()
.random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
let secs = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
Duration::from_secs(secs)
};
tokio::select! {
_ = cancel_ping_token.cancelled() => {
break;
}
_ = tokio::time::sleep(Duration::from_secs(wait)) => {}
_ = tokio::time::sleep(wait) => {}
}
let sent_id = ping_id;
let mut p = Vec::with_capacity(12);
@@ -542,12 +551,19 @@ impl MePool {
p.extend_from_slice(&sent_id.to_le_bytes());
{
let mut tracker = ping_tracker_ping.lock().await;
let before = tracker.len();
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
let expired = before.saturating_sub(tracker.len());
if expired > 0 {
stats_ping.increment_me_keepalive_timeout_by(expired as u64);
}
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
}
ping_id = ping_id.wrapping_add(1);
stats_ping.increment_me_keepalive_sent();
if tx_ping.send(WriterCommand::DataAndFlush(p)).await.is_err() {
debug!("Active ME ping failed, removing dead writer");
stats_ping.increment_me_keepalive_failed();
debug!("ME ping failed, removing dead writer");
cancel_ping.cancel();
if let Some(pool) = pool_ping.upgrade() {
if cleanup_for_ping
@@ -562,25 +578,6 @@ impl MePool {
}
});
if keepalive_enabled {
let tx_keepalive = tx_for_keepalive;
let cancel_keepalive = cancel_keepalive_token;
tokio::spawn(async move {
// Per-writer jittered start to avoid phase sync.
let initial_jitter_ms = rand::rng().random_range(0..=keepalive_jitter.as_millis().max(1) as u64);
tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await;
loop {
tokio::select! {
_ = cancel_keepalive.cancelled() => break,
_ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=keepalive_jitter.as_millis() as u64))) => {}
}
if tx_keepalive.send(WriterCommand::Keepalive).await.is_err() {
break;
}
}
});
}
Ok(())
}
@@ -617,15 +614,19 @@ impl MePool {
}
async fn remove_writer_only(&self, writer_id: u64) -> Vec<BoundConn> {
let mut close_tx: Option<mpsc::Sender<WriterCommand>> = None;
{
let mut ws = self.writers.write().await;
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
let w = ws.remove(pos);
w.cancel.cancel();
let _ = w.tx.send(WriterCommand::Close).await;
close_tx = Some(w.tx.clone());
self.conn_count.fetch_sub(1, Ordering::Relaxed);
}
}
if let Some(tx) = close_tx {
let _ = tx.send(WriterCommand::Close).await;
}
self.rtt_stats.lock().await.remove(&writer_id);
self.registry.writer_lost(writer_id).await
}

View File

@@ -10,31 +10,33 @@ use tokio::sync::{Mutex, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use crate::crypto::{AesCbc, crc32};
use crate::crypto::AesCbc;
use crate::error::{ProxyError, Result};
use crate::protocol::constants::*;
use crate::stats::Stats;
use super::codec::WriterCommand;
use super::codec::{RpcChecksumMode, WriterCommand, rpc_crc};
use super::registry::RouteResult;
use super::{ConnRegistry, MeResponse};
pub(crate) async fn reader_loop(
mut rd: tokio::io::ReadHalf<TcpStream>,
dk: [u8; 32],
mut div: [u8; 16],
crc_mode: RpcChecksumMode,
reg: Arc<ConnRegistry>,
enc_leftover: BytesMut,
mut dec: BytesMut,
tx: mpsc::Sender<WriterCommand>,
ping_tracker: Arc<Mutex<HashMap<i64, (Instant, u64)>>>,
rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>,
stats: Arc<Stats>,
_writer_id: u64,
degraded: Arc<AtomicBool>,
cancel: CancellationToken,
) -> Result<()> {
let mut raw = enc_leftover;
let mut expected_seq: i32 = 0;
let mut crc_errors = 0u32;
let mut seq_mismatch = 0u32;
loop {
let mut tmp = [0u8; 16_384];
@@ -80,26 +82,28 @@ pub(crate) async fn reader_loop(
let frame = dec.split_to(fl);
let pe = fl - 4;
let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap());
if crc32(&frame[..pe]) != ec {
warn!("CRC mismatch in data frame");
crc_errors += 1;
if crc_errors > 3 {
return Err(ProxyError::Proxy("Too many CRC mismatches".into()));
}
continue;
let actual_crc = rpc_crc(crc_mode, &frame[..pe]);
if actual_crc != ec {
stats.increment_me_crc_mismatch();
warn!(
frame_len = fl,
expected_crc = format_args!("0x{ec:08x}"),
actual_crc = format_args!("0x{actual_crc:08x}"),
"CRC mismatch — CBC crypto desync, aborting ME connection"
);
return Err(ProxyError::Proxy("CRC mismatch (crypto desync)".into()));
}
let seq_no = i32::from_le_bytes(frame[4..8].try_into().unwrap());
if seq_no != expected_seq {
stats.increment_me_seq_mismatch();
warn!(seq_no, expected = expected_seq, "ME RPC seq mismatch");
seq_mismatch += 1;
if seq_mismatch > 10 {
return Err(ProxyError::Proxy("Too many seq mismatches".into()));
}
expected_seq = seq_no.wrapping_add(1);
} else {
expected_seq = expected_seq.wrapping_add(1);
return Err(ProxyError::SeqNoMismatch {
expected: expected_seq,
got: seq_no,
});
}
expected_seq = expected_seq.wrapping_add(1);
let payload = &frame[8..pe];
if payload.len() < 4 {
@@ -116,7 +120,13 @@ pub(crate) async fn reader_loop(
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
let routed = reg.route(cid, MeResponse::Data { flags, data }).await;
if !routed {
if !matches!(routed, RouteResult::Routed) {
match routed {
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
RouteResult::ChannelClosed => stats.increment_me_route_drop_channel_closed(),
RouteResult::QueueFull => stats.increment_me_route_drop_queue_full(),
RouteResult::Routed => {}
}
reg.unregister(cid).await;
send_close_conn(&tx, cid).await;
}
@@ -126,7 +136,13 @@ pub(crate) async fn reader_loop(
trace!(cid, cfm, "RPC_SIMPLE_ACK");
let routed = reg.route(cid, MeResponse::Ack(cfm)).await;
if !routed {
if !matches!(routed, RouteResult::Routed) {
match routed {
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
RouteResult::ChannelClosed => stats.increment_me_route_drop_channel_closed(),
RouteResult::QueueFull => stats.increment_me_route_drop_queue_full(),
RouteResult::Routed => {}
}
reg.unregister(cid).await;
send_close_conn(&tx, cid).await;
}
@@ -152,6 +168,7 @@ pub(crate) async fn reader_loop(
}
} else if pt == RPC_PONG_U32 && body.len() >= 8 {
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
stats.increment_me_keepalive_pong();
if let Some((sent, wid)) = {
let mut guard = ping_tracker.lock().await;
guard.remove(&ping_id)

View File

@@ -1,13 +1,25 @@
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, Mutex, RwLock};
use tokio::sync::{mpsc, RwLock};
use tokio::sync::mpsc::error::TrySendError;
use super::codec::WriterCommand;
use super::MeResponse;
const ROUTE_CHANNEL_CAPACITY: usize = 4096;
const ROUTE_BACKPRESSURE_TIMEOUT: Duration = Duration::from_millis(25);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RouteResult {
Routed,
NoConn,
ChannelClosed,
QueueFull,
}
#[derive(Clone)]
pub struct ConnMeta {
pub target_dc: i16,
@@ -64,7 +76,7 @@ impl ConnRegistry {
pub async fn register(&self) -> (u64, mpsc::Receiver<MeResponse>) {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = mpsc::channel(1024);
let (tx, rx) = mpsc::channel(ROUTE_CHANNEL_CAPACITY);
self.inner.write().await.map.insert(id, tx);
(id, rx)
}
@@ -83,12 +95,27 @@ impl ConnRegistry {
None
}
pub async fn route(&self, id: u64, resp: MeResponse) -> bool {
let inner = self.inner.read().await;
if let Some(tx) = inner.map.get(&id) {
tx.try_send(resp).is_ok()
} else {
false
pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult {
let tx = {
let inner = self.inner.read().await;
inner.map.get(&id).cloned()
};
let Some(tx) = tx else {
return RouteResult::NoConn;
};
match tx.try_send(resp) {
Ok(()) => RouteResult::Routed,
Err(TrySendError::Closed(_)) => RouteResult::ChannelClosed,
Err(TrySendError::Full(resp)) => {
// Absorb short bursts without dropping/closing the session immediately.
match tokio::time::timeout(ROUTE_BACKPRESSURE_TIMEOUT, tx.send(resp)).await {
Ok(Ok(())) => RouteResult::Routed,
Ok(Err(_)) => RouteResult::ChannelClosed,
Err(_) => RouteResult::QueueFull,
}
}
}
}

View File

@@ -62,6 +62,8 @@ impl MePool {
let mut writers_snapshot = {
let ws = self.writers.read().await;
if ws.is_empty() {
// Create waiter before recovery attempts so notify_one permits are not missed.
let waiter = self.writer_available.notified();
drop(ws);
for family in self.family_order() {
let map = match family {
@@ -72,13 +74,19 @@ impl MePool {
for (ip, port) in addrs {
let addr = SocketAddr::new(*ip, *port);
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
self.writer_available.notify_waiters();
self.writer_available.notify_one();
break;
}
}
}
}
if tokio::time::timeout(Duration::from_secs(3), self.writer_available.notified()).await.is_err() {
if !self.writers.read().await.is_empty() {
continue;
}
if tokio::time::timeout(Duration::from_secs(3), waiter).await.is_err() {
if !self.writers.read().await.is_empty() {
continue;
}
return Err(ProxyError::Proxy("All ME connections dead (waited 3s)".into()));
}
continue;

View File

@@ -1,5 +1,7 @@
//! TCP Socket Configuration
use std::collections::HashSet;
use std::fs;
use std::io::Result;
use std::net::{SocketAddr, IpAddr};
use std::time::Duration;
@@ -234,6 +236,133 @@ pub fn create_listener(addr: SocketAddr, options: &ListenOptions) -> Result<Sock
Ok(socket)
}
/// Best-effort process list for listeners occupying the same local TCP port.
#[derive(Debug, Clone)]
pub struct ListenerProcessInfo {
pub pid: u32,
pub process: String,
}
/// Find processes currently listening on the local TCP port of `addr`.
/// Returns an empty list when unsupported or when no owners can be resolved.
pub fn find_listener_processes(addr: SocketAddr) -> Vec<ListenerProcessInfo> {
#[cfg(target_os = "linux")]
{
find_listener_processes_linux(addr)
}
#[cfg(not(target_os = "linux"))]
{
let _ = addr;
Vec::new()
}
}
#[cfg(target_os = "linux")]
fn find_listener_processes_linux(addr: SocketAddr) -> Vec<ListenerProcessInfo> {
let inodes = listening_inodes_for_port(addr);
if inodes.is_empty() {
return Vec::new();
}
let mut out = Vec::new();
let proc_entries = match fs::read_dir("/proc") {
Ok(entries) => entries,
Err(_) => return out,
};
for entry in proc_entries.flatten() {
let pid = match entry.file_name().to_string_lossy().parse::<u32>() {
Ok(pid) => pid,
Err(_) => continue,
};
let fd_dir = entry.path().join("fd");
let fd_entries = match fs::read_dir(fd_dir) {
Ok(entries) => entries,
Err(_) => continue,
};
let mut matched = false;
for fd in fd_entries.flatten() {
let link_target = match fs::read_link(fd.path()) {
Ok(link) => link,
Err(_) => continue,
};
let link_str = link_target.to_string_lossy();
let Some(rest) = link_str.strip_prefix("socket:[") else {
continue;
};
let Some(inode_str) = rest.strip_suffix(']') else {
continue;
};
let Ok(inode) = inode_str.parse::<u64>() else {
continue;
};
if inodes.contains(&inode) {
matched = true;
break;
}
}
if matched {
let process = fs::read_to_string(entry.path().join("comm"))
.ok()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| "unknown".to_string());
out.push(ListenerProcessInfo { pid, process });
}
}
out.sort_by_key(|p| p.pid);
out.dedup_by_key(|p| p.pid);
out
}
#[cfg(target_os = "linux")]
fn listening_inodes_for_port(addr: SocketAddr) -> HashSet<u64> {
let path = match addr {
SocketAddr::V4(_) => "/proc/net/tcp",
SocketAddr::V6(_) => "/proc/net/tcp6",
};
let mut inodes = HashSet::new();
let Ok(data) = fs::read_to_string(path) else {
return inodes;
};
for line in data.lines().skip(1) {
let cols: Vec<&str> = line.split_whitespace().collect();
if cols.len() < 10 {
continue;
}
// LISTEN state in /proc/net/tcp*
if cols[3] != "0A" {
continue;
}
let Some(port_hex) = cols[1].split(':').nth(1) else {
continue;
};
let Ok(port) = u16::from_str_radix(port_hex, 16) else {
continue;
};
if port != addr.port() {
continue;
}
if let Ok(inode) = cols[9].parse::<u64>() {
inodes.insert(inode);
}
}
inodes
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -24,6 +24,8 @@ const NUM_DCS: usize = 5;
/// Timeout for individual DC ping attempt
const DC_PING_TIMEOUT_SECS: u64 = 5;
/// Timeout for direct TG DC TCP connect readiness.
const DIRECT_CONNECT_TIMEOUT_SECS: u64 = 10;
// ============= RTT Tracking =============
@@ -375,7 +377,16 @@ impl UpstreamManager {
let std_stream: std::net::TcpStream = socket.into();
let stream = TcpStream::from_std(std_stream)?;
stream.writable().await?;
let connect_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS);
match tokio::time::timeout(connect_timeout, stream.writable()).await {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(ProxyError::Io(e)),
Err(_) => {
return Err(ProxyError::ConnectionTimeout {
addr: target.to_string(),
});
}
}
if let Some(e) = stream.take_error()? {
return Err(ProxyError::Io(e));
}
@@ -383,67 +394,128 @@ impl UpstreamManager {
Ok(stream)
},
UpstreamType::Socks4 { address, interface, user_id } => {
let proxy_addr: SocketAddr = address.parse()
.map_err(|_| ProxyError::Config("Invalid SOCKS4 address".to_string()))?;
let connect_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS);
// Try to parse as SocketAddr first (IP:port), otherwise treat as hostname:port
let mut stream = if let Ok(proxy_addr) = address.parse::<SocketAddr>() {
// IP:port format - use socket with optional interface binding
let bind_ip = Self::resolve_bind_address(
interface,
&None,
proxy_addr,
bind_rr.as_deref(),
);
let bind_ip = Self::resolve_bind_address(
interface,
&None,
proxy_addr,
bind_rr.as_deref(),
);
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
socket.set_nonblocking(true)?;
match socket.connect(&proxy_addr.into()) {
Ok(()) => {},
Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) || err.kind() == std::io::ErrorKind::WouldBlock => {},
Err(err) => return Err(ProxyError::Io(err)),
}
socket.set_nonblocking(true)?;
match socket.connect(&proxy_addr.into()) {
Ok(()) => {},
Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) || err.kind() == std::io::ErrorKind::WouldBlock => {},
Err(err) => return Err(ProxyError::Io(err)),
}
let std_stream: std::net::TcpStream = socket.into();
let stream = TcpStream::from_std(std_stream)?;
let std_stream: std::net::TcpStream = socket.into();
let mut stream = TcpStream::from_std(std_stream)?;
match tokio::time::timeout(connect_timeout, stream.writable()).await {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(ProxyError::Io(e)),
Err(_) => {
return Err(ProxyError::ConnectionTimeout {
addr: proxy_addr.to_string(),
});
}
}
if let Some(e) = stream.take_error()? {
return Err(ProxyError::Io(e));
}
stream
} else {
// Hostname:port format - use tokio DNS resolution
// Note: interface binding is not supported for hostnames
if interface.is_some() {
warn!("SOCKS4 interface binding is not supported for hostname addresses, ignoring");
}
match tokio::time::timeout(connect_timeout, TcpStream::connect(address)).await {
Ok(Ok(stream)) => stream,
Ok(Err(e)) => return Err(ProxyError::Io(e)),
Err(_) => {
return Err(ProxyError::ConnectionTimeout {
addr: address.clone(),
});
}
}
};
stream.writable().await?;
if let Some(e) = stream.take_error()? {
return Err(ProxyError::Io(e));
}
// replace socks user_id with config.selected_scope, if set
let scope: Option<&str> = Some(config.selected_scope.as_str())
.filter(|s| !s.is_empty());
let _user_id: Option<&str> = scope.or(user_id.as_deref());
connect_socks4(&mut stream, target, _user_id).await?;
match tokio::time::timeout(connect_timeout, connect_socks4(&mut stream, target, _user_id)).await {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(e),
Err(_) => {
return Err(ProxyError::ConnectionTimeout {
addr: target.to_string(),
});
}
}
Ok(stream)
},
UpstreamType::Socks5 { address, interface, username, password } => {
let proxy_addr: SocketAddr = address.parse()
.map_err(|_| ProxyError::Config("Invalid SOCKS5 address".to_string()))?;
let connect_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS);
// Try to parse as SocketAddr first (IP:port), otherwise treat as hostname:port
let mut stream = if let Ok(proxy_addr) = address.parse::<SocketAddr>() {
// IP:port format - use socket with optional interface binding
let bind_ip = Self::resolve_bind_address(
interface,
&None,
proxy_addr,
bind_rr.as_deref(),
);
let bind_ip = Self::resolve_bind_address(
interface,
&None,
proxy_addr,
bind_rr.as_deref(),
);
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
socket.set_nonblocking(true)?;
match socket.connect(&proxy_addr.into()) {
Ok(()) => {},
Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) || err.kind() == std::io::ErrorKind::WouldBlock => {},
Err(err) => return Err(ProxyError::Io(err)),
}
socket.set_nonblocking(true)?;
match socket.connect(&proxy_addr.into()) {
Ok(()) => {},
Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) || err.kind() == std::io::ErrorKind::WouldBlock => {},
Err(err) => return Err(ProxyError::Io(err)),
}
let std_stream: std::net::TcpStream = socket.into();
let stream = TcpStream::from_std(std_stream)?;
let std_stream: std::net::TcpStream = socket.into();
let mut stream = TcpStream::from_std(std_stream)?;
stream.writable().await?;
if let Some(e) = stream.take_error()? {
return Err(ProxyError::Io(e));
}
match tokio::time::timeout(connect_timeout, stream.writable()).await {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(ProxyError::Io(e)),
Err(_) => {
return Err(ProxyError::ConnectionTimeout {
addr: proxy_addr.to_string(),
});
}
}
if let Some(e) = stream.take_error()? {
return Err(ProxyError::Io(e));
}
stream
} else {
// Hostname:port format - use tokio DNS resolution
// Note: interface binding is not supported for hostnames
if interface.is_some() {
warn!("SOCKS5 interface binding is not supported for hostname addresses, ignoring");
}
match tokio::time::timeout(connect_timeout, TcpStream::connect(address)).await {
Ok(Ok(stream)) => stream,
Ok(Err(e)) => return Err(ProxyError::Io(e)),
Err(_) => {
return Err(ProxyError::ConnectionTimeout {
addr: address.clone(),
});
}
}
};
debug!(config = ?config, "Socks5 connection");
// replace socks user:pass with config.selected_scope, if set
@@ -452,7 +524,20 @@ impl UpstreamManager {
let _username: Option<&str> = scope.or(username.as_deref());
let _password: Option<&str> = scope.or(password.as_deref());
connect_socks5(&mut stream, target, _username, _password).await?;
match tokio::time::timeout(
connect_timeout,
connect_socks5(&mut stream, target, _username, _password),
)
.await
{
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(e),
Err(_) => {
return Err(ProxyError::ConnectionTimeout {
addr: target.to_string(),
});
}
}
Ok(stream)
},
}