mirror of
https://github.com/telemt/telemt.git
synced 2026-06-30 06:41:11 +03:00
Compare commits
23 Commits
3.3.34
...
14804d4f74
| Author | SHA1 | Date | |
|---|---|---|---|
| 14804d4f74 | |||
| 24223914ed | |||
| f6704d7d65 | |||
| 3d20002e56 | |||
| 8fcd0fa950 | |||
| 645e968778 | |||
| b46216d357 | |||
| 8ac1a0017d | |||
| 3df274caa6 | |||
| 780546a680 | |||
| 729ffa0fcd | |||
| e594d6f079 | |||
| ecd6a19246 | |||
| 2df6b8704d | |||
| 5f5a046710 | |||
| 2dc81ad0e0 | |||
| d8d8534cf8 | |||
| f3e9d00132 | |||
| dee6e13fef | |||
| cba837745b | |||
| 876c8f1612 | |||
| ac8ad864be | |||
| fe56dc7c1a |
+72
-9
@@ -1,19 +1,82 @@
|
|||||||
# Issues - Rules
|
# Issues
|
||||||
|
## Warnung
|
||||||
|
Before opening Issue, if it is more question than problem or bug - ask about that [in our chat](https://t.me/telemtrs)
|
||||||
|
|
||||||
## What it is not
|
## What it is not
|
||||||
- NOT Question and Answer
|
- NOT Question and Answer
|
||||||
- NOT Helpdesk
|
- NOT Helpdesk
|
||||||
|
|
||||||
# Pull Requests - Rules
|
***Each of your Issues triggers attempts to reproduce problems and analyze them, which are done manually by people***
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
# Pull Requests
|
||||||
|
|
||||||
## General
|
## General
|
||||||
- ONLY signed and verified commits
|
- ONLY signed and verified commits
|
||||||
- ONLY from your name
|
- ONLY from your name
|
||||||
- DO NOT commit with `codex` or `claude` as author/commiter
|
- DO NOT commit with `codex`, `claude`, or other AI tools as author/committer
|
||||||
- PREFER `flow` branch for development, not `main`
|
- PREFER `flow` branch for development, not `main`
|
||||||
|
|
||||||
## AI
|
---
|
||||||
We are not against modern tools, like AI, where you act as a principal or architect, but we consider it important:
|
|
||||||
|
|
||||||
- you really understand what you're doing
|
## Definition of Ready (MANDATORY)
|
||||||
- you understand the relationships and dependencies of the components being modified
|
|
||||||
- you understand the architecture of Telegram MTProto, MTProxy, Middle-End KDF at least generically
|
A Pull Request WILL be ignored or closed if:
|
||||||
- you DO NOT commit for the sake of commits, but to help the community, core-developers and ordinary users
|
|
||||||
|
- it does NOT build
|
||||||
|
- it does NOT pass tests
|
||||||
|
- it does NOT follow formatting rules
|
||||||
|
- it contains unrelated or excessive changes
|
||||||
|
- the author cannot clearly explain the change
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Blessed Principles
|
||||||
|
- PR must build
|
||||||
|
- PR must pass tests
|
||||||
|
- PR must be understood by author
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## AI Usage Policy
|
||||||
|
|
||||||
|
AI tools (Claude, ChatGPT, Codex, DeepSeek, etc.) are allowed as **assistants**, NOT as decision-makers.
|
||||||
|
|
||||||
|
By submitting a PR, you confirm that:
|
||||||
|
|
||||||
|
- you fully understand the code you submit
|
||||||
|
- you verified correctness manually
|
||||||
|
- you reviewed architecture and dependencies
|
||||||
|
- you take full responsibility for the change
|
||||||
|
|
||||||
|
AI-generated code is treated as **draft** and must be validated like any other external contribution.
|
||||||
|
|
||||||
|
PRs that look like unverified AI dumps WILL be closed
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Maintainer Policy
|
||||||
|
|
||||||
|
Maintainers reserve the right to:
|
||||||
|
|
||||||
|
- close PRs that do not meet basic quality requirements
|
||||||
|
- request explanations before review
|
||||||
|
- ignore low-effort contributions
|
||||||
|
|
||||||
|
Respect the reviewers time
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Enforcement
|
||||||
|
|
||||||
|
Pull Requests that violate project standards may be closed without review.
|
||||||
|
|
||||||
|
This includes (but is not limited to):
|
||||||
|
|
||||||
|
- non-building code
|
||||||
|
- failing tests
|
||||||
|
- unverified or low-effort changes
|
||||||
|
- inability to explain the change
|
||||||
|
|
||||||
|
These actions follow the Code of Conduct and are intended to preserve signal, quality, and Telemt's integrity
|
||||||
Generated
+1
-1
@@ -2793,7 +2793,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "telemt"
|
name = "telemt"
|
||||||
version = "3.3.32"
|
version = "3.3.35"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aes",
|
"aes",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
|||||||
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "telemt"
|
name = "telemt"
|
||||||
version = "3.3.34"
|
version = "3.3.36"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
|
|||||||
@@ -2,6 +2,8 @@
|
|||||||
|
|
||||||
***Löst Probleme, bevor andere überhaupt wissen, dass sie existieren*** / ***It solves problems before others even realize they exist***
|
***Löst Probleme, bevor andere überhaupt wissen, dass sie existieren*** / ***It solves problems before others even realize they exist***
|
||||||
|
|
||||||
|
### [**Telemt Chat in Telegram**](https://t.me/telemtrs)
|
||||||
|
|
||||||
**Telemt** is a fast, secure, and feature-rich server written in Rust: it fully implements the official Telegram proxy algo and adds many production-ready improvements such as:
|
**Telemt** is a fast, secure, and feature-rich server written in Rust: it fully implements the official Telegram proxy algo and adds many production-ready improvements such as:
|
||||||
- [ME Pool + Reader/Writer + Registry + Refill + Adaptive Floor + Trio-State + Generation Lifecycle](https://github.com/telemt/telemt/blob/main/docs/model/MODEL.en.md)
|
- [ME Pool + Reader/Writer + Registry + Refill + Adaptive Floor + Trio-State + Generation Lifecycle](https://github.com/telemt/telemt/blob/main/docs/model/MODEL.en.md)
|
||||||
- [Full-covered API w/ management](https://github.com/telemt/telemt/blob/main/docs/API.md)
|
- [Full-covered API w/ management](https://github.com/telemt/telemt/blob/main/docs/API.md)
|
||||||
@@ -9,60 +11,6 @@
|
|||||||
- Prometheus-format Metrics
|
- Prometheus-format Metrics
|
||||||
- TLS-Fronting and TCP-Splicing for masking from "prying" eyes
|
- TLS-Fronting and TCP-Splicing for masking from "prying" eyes
|
||||||
|
|
||||||
[**Telemt Chat in Telegram**](https://t.me/telemtrs)
|
|
||||||
|
|
||||||
## NEWS and EMERGENCY
|
|
||||||
### ✈️ Telemt 3 is released!
|
|
||||||
<table>
|
|
||||||
<tr>
|
|
||||||
<td width="50%" valign="top">
|
|
||||||
|
|
||||||
### 🇷🇺 RU
|
|
||||||
|
|
||||||
#### О релизах
|
|
||||||
|
|
||||||
[3.3.27](https://github.com/telemt/telemt/releases/tag/3.3.27) даёт баланс стабильности и передового функционала, а так же последние исправления по безопасности и багам
|
|
||||||
|
|
||||||
Будем рады вашему фидбеку и предложениям по улучшению — особенно в части **API**, **статистики**, **UX**
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
Если у вас есть компетенции в:
|
|
||||||
|
|
||||||
- Асинхронных сетевых приложениях
|
|
||||||
- Анализе трафика
|
|
||||||
- Реверс-инжиниринге
|
|
||||||
- Сетевых расследованиях
|
|
||||||
|
|
||||||
Мы открыты к архитектурным предложениям, идеям и pull requests
|
|
||||||
</td>
|
|
||||||
<td width="50%" valign="top">
|
|
||||||
|
|
||||||
### 🇬🇧 EN
|
|
||||||
|
|
||||||
#### About releases
|
|
||||||
|
|
||||||
[3.3.27](https://github.com/telemt/telemt/releases/tag/3.3.27) provides a balance of stability and advanced functionality, as well as the latest security and bug fixes
|
|
||||||
|
|
||||||
We are looking forward to your feedback and improvement proposals — especially regarding **API**, **statistics**, **UX**
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
If you have expertise in:
|
|
||||||
|
|
||||||
- Asynchronous network applications
|
|
||||||
- Traffic analysis
|
|
||||||
- Reverse engineering
|
|
||||||
- Network forensics
|
|
||||||
|
|
||||||
We welcome ideas, architectural feedback, and pull requests.
|
|
||||||
</td>
|
|
||||||
</tr>
|
|
||||||
</table>
|
|
||||||
|
|
||||||
# Features
|
|
||||||
💥 The configuration structure has changed since version 1.1.0.0. change it in your environment!
|
|
||||||
|
|
||||||
⚓ Our implementation of **TLS-fronting** is one of the most deeply debugged, focused, advanced and *almost* **"behaviorally consistent to real"**: we are confident we have it right - [see evidence on our validation and traces](#recognizability-for-dpi-and-crawler)
|
⚓ Our implementation of **TLS-fronting** is one of the most deeply debugged, focused, advanced and *almost* **"behaviorally consistent to real"**: we are confident we have it right - [see evidence on our validation and traces](#recognizability-for-dpi-and-crawler)
|
||||||
|
|
||||||
⚓ Our ***Middle-End Pool*** is fastest by design in standard scenarios, compared to other implementations of connecting to the Middle-End Proxy: non dramatically, but usual
|
⚓ Our ***Middle-End Pool*** is fastest by design in standard scenarios, compared to other implementations of connecting to the Middle-End Proxy: non dramatically, but usual
|
||||||
|
|||||||
+3058
-317
File diff suppressed because it is too large
Load Diff
+54
-45
@@ -1,113 +1,122 @@
|
|||||||
## How to set up "proxy sponsor" channel and statistics via @MTProxybot bot
|
## How to set up a "proxy sponsor" channel and statistics via the @MTProxybot
|
||||||
|
|
||||||
1. Go to @MTProxybot bot.
|
1. Go to the @MTProxybot.
|
||||||
2. Enter the command `/newproxy`
|
2. Enter the `/newproxy` command.
|
||||||
3. Send the server IP and port. For example: 1.2.3.4:443
|
3. Send your server's IP address and port. For example: `1.2.3.4:443`.
|
||||||
4. Open the config `nano /etc/telemt/telemt.toml`.
|
4. Open the configuration file: `nano /etc/telemt/telemt.toml`.
|
||||||
5. Copy and send the user secret from the [access.users] section to the bot.
|
5. Copy and send the user secret from the `[access.users]` section to the bot.
|
||||||
6. Copy the tag received from the bot. For example 1234567890abcdef1234567890abcdef.
|
6. Copy the tag provided by the bot. For example: `1234567890abcdef1234567890abcdef`.
|
||||||
> [!WARNING]
|
> [!WARNING]
|
||||||
> The link provided by the bot will not work. Do not copy or use it!
|
> The link provided by the bot will not work. Do not copy or use it!
|
||||||
7. Uncomment the ad_tag parameter and enter the tag received from the bot.
|
7. Uncomment the `ad_tag` parameter and enter the tag received from the bot.
|
||||||
8. Uncomment/add the parameter `use_middle_proxy = true`.
|
8. Uncomment or add the `use_middle_proxy = true` parameter.
|
||||||
|
|
||||||
Config example:
|
Configuration example:
|
||||||
```toml
|
```toml
|
||||||
[general]
|
[general]
|
||||||
ad_tag = "1234567890abcdef1234567890abcdef"
|
ad_tag = "1234567890abcdef1234567890abcdef"
|
||||||
use_middle_proxy = true
|
use_middle_proxy = true
|
||||||
```
|
```
|
||||||
9. Save the config. Ctrl+S -> Ctrl+X.
|
9. Save the changes (in nano: Ctrl+S -> Ctrl+X).
|
||||||
10. Restart telemt `systemctl restart telemt`.
|
10. Restart the telemt service: `systemctl restart telemt`.
|
||||||
11. In the bot, send the command /myproxies and select the added server.
|
11. Send the `/myproxies` command to the bot and select the added server.
|
||||||
12. Click the "Set promotion" button.
|
12. Click the "Set promotion" button.
|
||||||
13. Send a **public link** to the channel. Private channels cannot be added!
|
13. Send a **public link** to the channel. Private channels cannot be added!
|
||||||
14. Wait approximately 1 hour for the information to update on Telegram servers.
|
14. Wait for about 1 hour for the information to update on Telegram servers.
|
||||||
> [!WARNING]
|
> [!WARNING]
|
||||||
> You will not see the "proxy sponsor" if you are already subscribed to the channel.
|
> The sponsored channel will not be displayed to you if you are already subscribed to it.
|
||||||
|
|
||||||
**You can also set up different channels for different users.**
|
**You can also configure different sponsored channels for different users:**
|
||||||
```toml
|
```toml
|
||||||
[access.user_ad_tags]
|
[access.user_ad_tags]
|
||||||
hello = "ad_tag"
|
hello = "ad_tag"
|
||||||
hello2 = "ad_tag2"
|
hello2 = "ad_tag2"
|
||||||
```
|
```
|
||||||
|
|
||||||
## Why is middle proxy (ME) needed
|
## Why do you need a middle proxy (ME)
|
||||||
https://github.com/telemt/telemt/discussions/167
|
https://github.com/telemt/telemt/discussions/167
|
||||||
|
|
||||||
## How many people can use 1 link
|
|
||||||
|
|
||||||
By default, 1 link can be used by any number of people.
|
## How many people can use one link
|
||||||
You can limit the number of IPs using the proxy.
|
|
||||||
|
By default, an unlimited number of people can use a single link.
|
||||||
|
However, you can limit the number of unique IP addresses for each user:
|
||||||
```toml
|
```toml
|
||||||
[access.user_max_unique_ips]
|
[access.user_max_unique_ips]
|
||||||
hello = 1
|
hello = 1
|
||||||
```
|
```
|
||||||
This parameter limits how many unique IPs can use 1 link simultaneously. If one user disconnects, a second user can connect. Also, multiple users can sit behind the same IP.
|
This parameter sets the maximum number of unique IP addresses from which a single link can be used simultaneously. If the first user disconnects, a second one can connect. At the same time, multiple users can connect from a single IP address simultaneously (for example, devices on the same Wi-Fi network).
|
||||||
|
|
||||||
## How to create multiple different links
|
## How to create multiple different links
|
||||||
|
|
||||||
1. Generate the required number of secrets `openssl rand -hex 16`
|
1. Generate the required number of secrets using the command: `openssl rand -hex 16`.
|
||||||
2. Open the config `nano /etc/telemt.toml`
|
2. Open the configuration file: `nano /etc/telemt/telemt.toml`.
|
||||||
3. Add new users.
|
3. Add new users to the `[access.users]` section:
|
||||||
```toml
|
```toml
|
||||||
[access.users]
|
[access.users]
|
||||||
user1 = "00000000000000000000000000000001"
|
user1 = "00000000000000000000000000000001"
|
||||||
user2 = "00000000000000000000000000000002"
|
user2 = "00000000000000000000000000000002"
|
||||||
user3 = "00000000000000000000000000000003"
|
user3 = "00000000000000000000000000000003"
|
||||||
```
|
```
|
||||||
4. Save the config. Ctrl+S -> Ctrl+X. You don't need to restart telemt.
|
4. Save the configuration (Ctrl+S -> Ctrl+X). There is no need to restart the telemt service.
|
||||||
5. Get the links via
|
5. Get the ready-to-use links using the command:
|
||||||
```bash
|
```bash
|
||||||
curl -s http://127.0.0.1:9091/v1/users | jq
|
curl -s http://127.0.0.1:9091/v1/users | jq
|
||||||
```
|
```
|
||||||
|
|
||||||
## "Unknown TLS SNI" Error
|
## "Unknown TLS SNI" error
|
||||||
You probably updated tls_domain, but users are still connecting via old links with the previous domain.
|
Usually, this error occurs if you have changed the `tls_domain` parameter, but users continue to connect using old links with the previous domain.
|
||||||
|
|
||||||
|
If you need to allow connections with any domains (ignoring SNI mismatches), add the following parameters:
|
||||||
|
```toml
|
||||||
|
[censorship]
|
||||||
|
unknown_sni_action = "mask"
|
||||||
|
```
|
||||||
|
|
||||||
## How to view metrics
|
## How to view metrics
|
||||||
|
|
||||||
1. Open the config `nano /etc/telemt/telemt.toml`
|
1. Open the configuration file: `nano /etc/telemt/telemt.toml`.
|
||||||
2. Add the following parameters
|
2. Add the following parameters:
|
||||||
```toml
|
```toml
|
||||||
[server]
|
[server]
|
||||||
metrics_port = 9090
|
metrics_port = 9090
|
||||||
metrics_whitelist = ["127.0.0.1/32", "::1/128", "0.0.0.0/0"]
|
metrics_whitelist = ["127.0.0.1/32", "::1/128", "0.0.0.0/0"]
|
||||||
```
|
```
|
||||||
3. Save the config. Ctrl+S -> Ctrl+X.
|
3. Save the changes (Ctrl+S -> Ctrl+X).
|
||||||
4. Metrics are available at SERVER_IP:9090/metrics.
|
4. After that, metrics will be available at: `SERVER_IP:9090/metrics`.
|
||||||
> [!WARNING]
|
> [!WARNING]
|
||||||
> "0.0.0.0/0" in metrics_whitelist opens access from any IP. Replace with your own IP. For example "1.2.3.4"
|
> The value `"0.0.0.0/0"` in `metrics_whitelist` opens access to metrics from any IP address. It is recommended to replace it with your personal IP, for example: `"1.2.3.4/32"`.
|
||||||
|
|
||||||
## Additional parameters
|
## Additional parameters
|
||||||
|
|
||||||
### Domain in link instead of IP
|
### Domain in the link instead of IP
|
||||||
To specify a domain in the links, add to the `[general.links]` section of the config file.
|
To display a domain instead of an IP address in the connection links, add the following lines to the configuration file:
|
||||||
```toml
|
```toml
|
||||||
[general.links]
|
[general.links]
|
||||||
public_host = "proxy.example.com"
|
public_host = "proxy.example.com"
|
||||||
```
|
```
|
||||||
|
|
||||||
### Server connection limit
|
### Total server connection limit
|
||||||
Limits the total number of open connections to the server:
|
This parameter limits the total number of active connections to the server:
|
||||||
```toml
|
```toml
|
||||||
[server]
|
[server]
|
||||||
max_connections = 10000 # 0 - unlimited, 10000 - default
|
max_connections = 10000 # 0 - unlimited, 10000 - default
|
||||||
```
|
```
|
||||||
|
|
||||||
### Upstream Manager
|
### Upstream Manager
|
||||||
To specify an upstream, add to the `[[upstreams]]` section of the config.toml file:
|
To configure outbound connections (upstreams), add the corresponding parameters to the `[[upstreams]]` section of the configuration file:
|
||||||
#### Binding to IP
|
|
||||||
|
#### Binding to an outbound IP address
|
||||||
```toml
|
```toml
|
||||||
[[upstreams]]
|
[[upstreams]]
|
||||||
type = "direct"
|
type = "direct"
|
||||||
weight = 1
|
weight = 1
|
||||||
enabled = true
|
enabled = true
|
||||||
interface = "192.168.1.100" # Change to your outgoing IP
|
interface = "192.168.1.100" # Replace with your outbound IP
|
||||||
```
|
```
|
||||||
#### SOCKS4/5 as Upstream
|
|
||||||
- Without authentication:
|
#### Using SOCKS4/5 as an Upstream
|
||||||
|
- Without authorization:
|
||||||
```toml
|
```toml
|
||||||
[[upstreams]]
|
[[upstreams]]
|
||||||
type = "socks5" # Specify SOCKS4 or SOCKS5
|
type = "socks5" # Specify SOCKS4 or SOCKS5
|
||||||
@@ -116,7 +125,7 @@ weight = 1 # Set Weight for Scenarios
|
|||||||
enabled = true
|
enabled = true
|
||||||
```
|
```
|
||||||
|
|
||||||
- With authentication:
|
- With authorization:
|
||||||
```toml
|
```toml
|
||||||
[[upstreams]]
|
[[upstreams]]
|
||||||
type = "socks5" # Specify SOCKS4 or SOCKS5
|
type = "socks5" # Specify SOCKS4 or SOCKS5
|
||||||
@@ -127,8 +136,8 @@ weight = 1 # Set Weight for Scenarios
|
|||||||
enabled = true
|
enabled = true
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Shadowsocks as Upstream
|
#### Using Shadowsocks as an Upstream
|
||||||
Requires `use_middle_proxy = false`.
|
For this method to work, the `use_middle_proxy = false` parameter must be set.
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
[general]
|
[general]
|
||||||
|
|||||||
+51
-43
@@ -1,32 +1,32 @@
|
|||||||
## Как настроить канал "спонсор прокси" и статистику через бота @MTProxybot
|
## Как настроить канал "спонсор прокси" и статистику через бота @MTProxybot
|
||||||
|
|
||||||
1. Зайти в бота @MTProxybot.
|
1. Зайдите в бота @MTProxybot.
|
||||||
2. Ввести команду `/newproxy`
|
2. Введите команду `/newproxy`.
|
||||||
3. Отправить IP и порт сервера. Например: 1.2.3.4:443
|
3. Отправьте IP-адрес и порт сервера. Например: `1.2.3.4:443`.
|
||||||
4. Открыть конфиг `nano /etc/telemt/telemt.toml`.
|
4. Откройте файл конфигурации: `nano /etc/telemt/telemt.toml`.
|
||||||
5. Скопировать и отправить боту секрет пользователя из раздела [access.users].
|
5. Скопируйте и отправьте боту секрет пользователя из раздела `[access.users]`.
|
||||||
6. Скопировать полученный tag у бота. Например 1234567890abcdef1234567890abcdef.
|
6. Скопируйте тег (tag), который выдаст бот. Например: `1234567890abcdef1234567890abcdef`.
|
||||||
> [!WARNING]
|
> [!WARNING]
|
||||||
> Ссылка, которую выдает бот, не будет работать. Не копируйте и не используйте её!
|
> Ссылка, которую выдает бот, работать не будет. Не копируйте и не используйте её!
|
||||||
7. Раскомментировать параметр ad_tag и вписать tag, полученный у бота.
|
7. Раскомментируйте параметр `ad_tag` и впишите тег, полученный от бота.
|
||||||
8. Раскомментировать/добавить параметр use_middle_proxy = true.
|
8. Раскомментируйте или добавьте параметр `use_middle_proxy = true`.
|
||||||
|
|
||||||
Пример конфига:
|
Пример конфигурации:
|
||||||
```toml
|
```toml
|
||||||
[general]
|
[general]
|
||||||
ad_tag = "1234567890abcdef1234567890abcdef"
|
ad_tag = "1234567890abcdef1234567890abcdef"
|
||||||
use_middle_proxy = true
|
use_middle_proxy = true
|
||||||
```
|
```
|
||||||
9. Сохранить конфиг. Ctrl+S -> Ctrl+X.
|
9. Сохраните изменения (в nano: Ctrl+S -> Ctrl+X).
|
||||||
10. Перезапустить telemt `systemctl restart telemt`.
|
10. Перезапустите службу telemt: `systemctl restart telemt`.
|
||||||
11. В боте отправить команду /myproxies и выбрать добавленный сервер.
|
11. В боте отправьте команду `/myproxies` и выберите добавленный сервер.
|
||||||
12. Нажать кнопку "Set promotion".
|
12. Нажмите кнопку «Set promotion».
|
||||||
13. Отправить **публичную ссылку** на канал. Приватный канал добавить нельзя!
|
13. Отправьте **публичную ссылку** на канал. Приватные каналы добавлять нельзя!
|
||||||
14. Подождать примерно 1 час, пока информация обновится на серверах Telegram.
|
14. Подождите примерно 1 час, пока информация обновится на серверах Telegram.
|
||||||
> [!WARNING]
|
> [!WARNING]
|
||||||
> У вас не будет отображаться "спонсор прокси" если вы уже подписаны на канал.
|
> Спонсорский канал не будет у вас отображаться, если вы уже на него подписаны.
|
||||||
|
|
||||||
**Также вы можете настроить разные каналы для разных пользователей.**
|
**Вы также можете настроить разные спонсорские каналы для разных пользователей:**
|
||||||
```toml
|
```toml
|
||||||
[access.user_ad_tags]
|
[access.user_ad_tags]
|
||||||
hello = "ad_tag"
|
hello = "ad_tag"
|
||||||
@@ -37,77 +37,85 @@ hello2 = "ad_tag2"
|
|||||||
https://github.com/telemt/telemt/discussions/167
|
https://github.com/telemt/telemt/discussions/167
|
||||||
|
|
||||||
|
|
||||||
## Сколько человек может пользоваться 1 ссылкой
|
## Сколько человек может пользоваться одной ссылкой
|
||||||
|
|
||||||
По умолчанию 1 ссылкой может пользоваться сколько угодно человек.
|
По умолчанию одной ссылкой может пользоваться неограниченное число людей.
|
||||||
Вы можете ограничить число IP, использующих прокси.
|
Однако вы можете ограничить количество уникальных IP-адресов для каждого пользователя:
|
||||||
```toml
|
```toml
|
||||||
[access.user_max_unique_ips]
|
[access.user_max_unique_ips]
|
||||||
hello = 1
|
hello = 1
|
||||||
```
|
```
|
||||||
Этот параметр ограничивает, сколько уникальных IP может использовать 1 ссылку одновременно. Если один пользователь отключится, второй сможет подключиться. Также с одного IP может сидеть несколько пользователей.
|
Этот параметр задает максимальное количество уникальных IP-адресов, с которых можно одновременно использовать одну ссылку. Если первый пользователь отключится, второй сможет подключиться. При этом с одного IP-адреса могут подключаться несколько пользователей одновременно (например, устройства в одной Wi-Fi сети).
|
||||||
|
|
||||||
## Как сделать несколько разных ссылок
|
## Как создать несколько разных ссылок
|
||||||
|
|
||||||
1. Сгенерируйте нужное число секретов `openssl rand -hex 16`
|
1. Сгенерируйте необходимое количество секретов с помощью команды: `openssl rand -hex 16`.
|
||||||
2. Открыть конфиг `nano /etc/telemt.toml`
|
2. Откройте файл конфигурации: `nano /etc/telemt/telemt.toml`.
|
||||||
3. Добавить новых пользователей.
|
3. Добавьте новых пользователей в секцию `[access.users]`:
|
||||||
```toml
|
```toml
|
||||||
[access.users]
|
[access.users]
|
||||||
user1 = "00000000000000000000000000000001"
|
user1 = "00000000000000000000000000000001"
|
||||||
user2 = "00000000000000000000000000000002"
|
user2 = "00000000000000000000000000000002"
|
||||||
user3 = "00000000000000000000000000000003"
|
user3 = "00000000000000000000000000000003"
|
||||||
```
|
```
|
||||||
4. Сохранить конфиг. Ctrl+S -> Ctrl+X. Перезапускать telemt не нужно.
|
4. Сохраните конфигурацию (Ctrl+S -> Ctrl+X). Перезапускать службу telemt не нужно.
|
||||||
5. Получить ссылки через
|
5. Получите готовые ссылки с помощью команды:
|
||||||
```bash
|
```bash
|
||||||
curl -s http://127.0.0.1:9091/v1/users | jq
|
curl -s http://127.0.0.1:9091/v1/users | jq
|
||||||
```
|
```
|
||||||
|
|
||||||
## Ошибка "Unknown TLS SNI"
|
## Ошибка "Unknown TLS SNI"
|
||||||
Возможно, вы обновили tls_domain, но пользователи всё ещё пытаются подключаться по старым ссылкам с прежним доменом.
|
Обычно эта ошибка возникает, если вы изменили параметр `tls_domain`, но пользователи продолжают подключаться по старым ссылкам с прежним доменом.
|
||||||
|
|
||||||
|
Если необходимо разрешить подключение с любыми доменами (игнорируя несовпадения SNI), добавьте следующие параметры:
|
||||||
|
```toml
|
||||||
|
[censorship]
|
||||||
|
unknown_sni_action = "mask"
|
||||||
|
```
|
||||||
|
|
||||||
## Как посмотреть метрики
|
## Как посмотреть метрики
|
||||||
|
|
||||||
1. Открыть конфиг `nano /etc/telemt/telemt.toml`
|
1. Откройте файл конфигурации: `nano /etc/telemt/telemt.toml`.
|
||||||
2. Добавить следующие параметры
|
2. Добавьте следующие параметры:
|
||||||
```toml
|
```toml
|
||||||
[server]
|
[server]
|
||||||
metrics_port = 9090
|
metrics_port = 9090
|
||||||
metrics_whitelist = ["127.0.0.1/32", "::1/128", "0.0.0.0/0"]
|
metrics_whitelist = ["127.0.0.1/32", "::1/128", "0.0.0.0/0"]
|
||||||
```
|
```
|
||||||
3. Сохранить конфиг. Ctrl+S -> Ctrl+X.
|
3. Сохраните изменения (Ctrl+S -> Ctrl+X).
|
||||||
4. Метрики доступны по адресу SERVER_IP:9090/metrics.
|
4. После этого метрики будут доступны по адресу: `SERVER_IP:9090/metrics`.
|
||||||
> [!WARNING]
|
> [!WARNING]
|
||||||
> "0.0.0.0/0" в metrics_whitelist открывает доступ с любого IP. Замените на свой ip. Например "1.2.3.4"
|
> Значение `"0.0.0.0/0"` в `metrics_whitelist` открывает доступ к метрикам с любого IP-адреса. Рекомендуется заменить его на ваш личный IP, например: `"1.2.3.4/32"`.
|
||||||
|
|
||||||
## Дополнительные параметры
|
## Дополнительные параметры
|
||||||
|
|
||||||
### Домен в ссылке вместо IP
|
### Домен в ссылке вместо IP
|
||||||
Чтобы указать домен в ссылках, добавьте в секцию `[general.links]` файла config.
|
Чтобы в ссылках для подключения отображался домен вместо IP-адреса, добавьте следующие строки в файл конфигурации:
|
||||||
```toml
|
```toml
|
||||||
[general.links]
|
[general.links]
|
||||||
public_host = "proxy.example.com"
|
public_host = "proxy.example.com"
|
||||||
```
|
```
|
||||||
|
|
||||||
### Общий лимит подключений к серверу
|
### Общий лимит подключений к серверу
|
||||||
Ограничивает общее число открытых подключений к серверу:
|
Этот параметр ограничивает общее количество активных подключений к серверу:
|
||||||
```toml
|
```toml
|
||||||
[server]
|
[server]
|
||||||
max_connections = 10000 # 0 - unlimited, 10000 - default
|
max_connections = 10000 # 0 - без ограничений, 10000 - по умолчанию
|
||||||
```
|
```
|
||||||
|
|
||||||
### Upstream Manager
|
### Upstream Manager
|
||||||
Чтобы указать апстрим, добавьте в секцию `[[upstreams]]` файла config.toml:
|
Для настройки исходящих подключений (апстримов) добавьте соответствующие параметры в секцию `[[upstreams]]` файла конфигурации:
|
||||||
#### Привязка к IP
|
|
||||||
|
#### Привязка к исходящему IP-адресу
|
||||||
```toml
|
```toml
|
||||||
[[upstreams]]
|
[[upstreams]]
|
||||||
type = "direct"
|
type = "direct"
|
||||||
weight = 1
|
weight = 1
|
||||||
enabled = true
|
enabled = true
|
||||||
interface = "192.168.1.100" # Change to your outgoing IP
|
interface = "192.168.1.100" # Замените на ваш исходящий IP
|
||||||
```
|
```
|
||||||
#### SOCKS4/5 как Upstream
|
|
||||||
|
#### Использование SOCKS4/5 в качестве Upstream
|
||||||
- Без авторизации:
|
- Без авторизации:
|
||||||
```toml
|
```toml
|
||||||
[[upstreams]]
|
[[upstreams]]
|
||||||
@@ -128,8 +136,8 @@ weight = 1 # Set Weight for Scenarios
|
|||||||
enabled = true
|
enabled = true
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Shadowsocks как Upstream
|
#### Использование Shadowsocks в качестве Upstream
|
||||||
Требует `use_middle_proxy = false`.
|
Для работы этого метода требуется установить параметр `use_middle_proxy = false`.
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
[general]
|
[general]
|
||||||
|
|||||||
+51
-12
@@ -37,12 +37,12 @@ mod runtime_watch;
|
|||||||
mod runtime_zero;
|
mod runtime_zero;
|
||||||
mod users;
|
mod users;
|
||||||
|
|
||||||
use config_store::{current_revision, parse_if_match};
|
use config_store::{current_revision, load_config_from_disk, parse_if_match};
|
||||||
use events::ApiEventStore;
|
use events::ApiEventStore;
|
||||||
use http_utils::{error_response, read_json, read_optional_json, success_response};
|
use http_utils::{error_response, read_json, read_optional_json, success_response};
|
||||||
use model::{
|
use model::{
|
||||||
ApiFailure, CreateUserRequest, HealthData, PatchUserRequest, RotateSecretRequest, SummaryData,
|
ApiFailure, CreateUserRequest, DeleteUserResponse, HealthData, PatchUserRequest,
|
||||||
UserActiveIps,
|
RotateSecretRequest, SummaryData, UserActiveIps,
|
||||||
};
|
};
|
||||||
use runtime_edge::{
|
use runtime_edge::{
|
||||||
EdgeConnectionsCacheEntry, build_runtime_connections_summary_data,
|
EdgeConnectionsCacheEntry, build_runtime_connections_summary_data,
|
||||||
@@ -380,13 +380,16 @@ async fn handle(
|
|||||||
}
|
}
|
||||||
("GET", "/v1/stats/users") | ("GET", "/v1/users") => {
|
("GET", "/v1/stats/users") | ("GET", "/v1/users") => {
|
||||||
let revision = current_revision(&shared.config_path).await?;
|
let revision = current_revision(&shared.config_path).await?;
|
||||||
|
let disk_cfg = load_config_from_disk(&shared.config_path).await?;
|
||||||
|
let runtime_cfg = config_rx.borrow().clone();
|
||||||
let (detected_ip_v4, detected_ip_v6) = shared.detected_link_ips();
|
let (detected_ip_v4, detected_ip_v6) = shared.detected_link_ips();
|
||||||
let users = users_from_config(
|
let users = users_from_config(
|
||||||
&cfg,
|
&disk_cfg,
|
||||||
&shared.stats,
|
&shared.stats,
|
||||||
&shared.ip_tracker,
|
&shared.ip_tracker,
|
||||||
detected_ip_v4,
|
detected_ip_v4,
|
||||||
detected_ip_v6,
|
detected_ip_v6,
|
||||||
|
Some(runtime_cfg.as_ref()),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
Ok(success_response(StatusCode::OK, users, revision))
|
Ok(success_response(StatusCode::OK, users, revision))
|
||||||
@@ -405,7 +408,7 @@ async fn handle(
|
|||||||
let expected_revision = parse_if_match(req.headers());
|
let expected_revision = parse_if_match(req.headers());
|
||||||
let body = read_json::<CreateUserRequest>(req.into_body(), body_limit).await?;
|
let body = read_json::<CreateUserRequest>(req.into_body(), body_limit).await?;
|
||||||
let result = create_user(body, expected_revision, &shared).await;
|
let result = create_user(body, expected_revision, &shared).await;
|
||||||
let (data, revision) = match result {
|
let (mut data, revision) = match result {
|
||||||
Ok(ok) => ok,
|
Ok(ok) => ok,
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
shared
|
shared
|
||||||
@@ -414,11 +417,18 @@ async fn handle(
|
|||||||
return Err(error);
|
return Err(error);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let runtime_cfg = config_rx.borrow().clone();
|
||||||
|
data.user.in_runtime = runtime_cfg.access.users.contains_key(&data.user.username);
|
||||||
shared.runtime_events.record(
|
shared.runtime_events.record(
|
||||||
"api.user.create.ok",
|
"api.user.create.ok",
|
||||||
format!("username={}", data.user.username),
|
format!("username={}", data.user.username),
|
||||||
);
|
);
|
||||||
Ok(success_response(StatusCode::CREATED, data, revision))
|
let status = if data.user.in_runtime {
|
||||||
|
StatusCode::CREATED
|
||||||
|
} else {
|
||||||
|
StatusCode::ACCEPTED
|
||||||
|
};
|
||||||
|
Ok(success_response(status, data, revision))
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
if let Some(user) = path.strip_prefix("/v1/users/")
|
if let Some(user) = path.strip_prefix("/v1/users/")
|
||||||
@@ -427,13 +437,16 @@ async fn handle(
|
|||||||
{
|
{
|
||||||
if method == Method::GET {
|
if method == Method::GET {
|
||||||
let revision = current_revision(&shared.config_path).await?;
|
let revision = current_revision(&shared.config_path).await?;
|
||||||
|
let disk_cfg = load_config_from_disk(&shared.config_path).await?;
|
||||||
|
let runtime_cfg = config_rx.borrow().clone();
|
||||||
let (detected_ip_v4, detected_ip_v6) = shared.detected_link_ips();
|
let (detected_ip_v4, detected_ip_v6) = shared.detected_link_ips();
|
||||||
let users = users_from_config(
|
let users = users_from_config(
|
||||||
&cfg,
|
&disk_cfg,
|
||||||
&shared.stats,
|
&shared.stats,
|
||||||
&shared.ip_tracker,
|
&shared.ip_tracker,
|
||||||
detected_ip_v4,
|
detected_ip_v4,
|
||||||
detected_ip_v6,
|
detected_ip_v6,
|
||||||
|
Some(runtime_cfg.as_ref()),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
if let Some(user_info) =
|
if let Some(user_info) =
|
||||||
@@ -461,7 +474,7 @@ async fn handle(
|
|||||||
let body =
|
let body =
|
||||||
read_json::<PatchUserRequest>(req.into_body(), body_limit).await?;
|
read_json::<PatchUserRequest>(req.into_body(), body_limit).await?;
|
||||||
let result = patch_user(user, body, expected_revision, &shared).await;
|
let result = patch_user(user, body, expected_revision, &shared).await;
|
||||||
let (data, revision) = match result {
|
let (mut data, revision) = match result {
|
||||||
Ok(ok) => ok,
|
Ok(ok) => ok,
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
shared.runtime_events.record(
|
shared.runtime_events.record(
|
||||||
@@ -471,10 +484,17 @@ async fn handle(
|
|||||||
return Err(error);
|
return Err(error);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let runtime_cfg = config_rx.borrow().clone();
|
||||||
|
data.in_runtime = runtime_cfg.access.users.contains_key(&data.username);
|
||||||
shared
|
shared
|
||||||
.runtime_events
|
.runtime_events
|
||||||
.record("api.user.patch.ok", format!("username={}", data.username));
|
.record("api.user.patch.ok", format!("username={}", data.username));
|
||||||
return Ok(success_response(StatusCode::OK, data, revision));
|
let status = if data.in_runtime {
|
||||||
|
StatusCode::OK
|
||||||
|
} else {
|
||||||
|
StatusCode::ACCEPTED
|
||||||
|
};
|
||||||
|
return Ok(success_response(status, data, revision));
|
||||||
}
|
}
|
||||||
if method == Method::DELETE {
|
if method == Method::DELETE {
|
||||||
if api_cfg.read_only {
|
if api_cfg.read_only {
|
||||||
@@ -502,7 +522,18 @@ async fn handle(
|
|||||||
shared
|
shared
|
||||||
.runtime_events
|
.runtime_events
|
||||||
.record("api.user.delete.ok", format!("username={}", deleted_user));
|
.record("api.user.delete.ok", format!("username={}", deleted_user));
|
||||||
return Ok(success_response(StatusCode::OK, deleted_user, revision));
|
let runtime_cfg = config_rx.borrow().clone();
|
||||||
|
let in_runtime = runtime_cfg.access.users.contains_key(&deleted_user);
|
||||||
|
let response = DeleteUserResponse {
|
||||||
|
username: deleted_user,
|
||||||
|
in_runtime,
|
||||||
|
};
|
||||||
|
let status = if response.in_runtime {
|
||||||
|
StatusCode::ACCEPTED
|
||||||
|
} else {
|
||||||
|
StatusCode::OK
|
||||||
|
};
|
||||||
|
return Ok(success_response(status, response, revision));
|
||||||
}
|
}
|
||||||
if method == Method::POST
|
if method == Method::POST
|
||||||
&& let Some(base_user) = user.strip_suffix("/rotate-secret")
|
&& let Some(base_user) = user.strip_suffix("/rotate-secret")
|
||||||
@@ -530,7 +561,7 @@ async fn handle(
|
|||||||
&shared,
|
&shared,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
let (data, revision) = match result {
|
let (mut data, revision) = match result {
|
||||||
Ok(ok) => ok,
|
Ok(ok) => ok,
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
shared.runtime_events.record(
|
shared.runtime_events.record(
|
||||||
@@ -540,11 +571,19 @@ async fn handle(
|
|||||||
return Err(error);
|
return Err(error);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let runtime_cfg = config_rx.borrow().clone();
|
||||||
|
data.user.in_runtime =
|
||||||
|
runtime_cfg.access.users.contains_key(&data.user.username);
|
||||||
shared.runtime_events.record(
|
shared.runtime_events.record(
|
||||||
"api.user.rotate_secret.ok",
|
"api.user.rotate_secret.ok",
|
||||||
format!("username={}", base_user),
|
format!("username={}", base_user),
|
||||||
);
|
);
|
||||||
return Ok(success_response(StatusCode::OK, data, revision));
|
let status = if data.user.in_runtime {
|
||||||
|
StatusCode::OK
|
||||||
|
} else {
|
||||||
|
StatusCode::ACCEPTED
|
||||||
|
};
|
||||||
|
return Ok(success_response(status, data, revision));
|
||||||
}
|
}
|
||||||
if method == Method::POST {
|
if method == Method::POST {
|
||||||
return Ok(error_response(
|
return Ok(error_response(
|
||||||
|
|||||||
@@ -428,6 +428,7 @@ pub(super) struct UserLinks {
|
|||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
pub(super) struct UserInfo {
|
pub(super) struct UserInfo {
|
||||||
pub(super) username: String,
|
pub(super) username: String,
|
||||||
|
pub(super) in_runtime: bool,
|
||||||
pub(super) user_ad_tag: Option<String>,
|
pub(super) user_ad_tag: Option<String>,
|
||||||
pub(super) max_tcp_conns: Option<usize>,
|
pub(super) max_tcp_conns: Option<usize>,
|
||||||
pub(super) expiration_rfc3339: Option<String>,
|
pub(super) expiration_rfc3339: Option<String>,
|
||||||
@@ -454,6 +455,12 @@ pub(super) struct CreateUserResponse {
|
|||||||
pub(super) secret: String,
|
pub(super) secret: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
pub(super) struct DeleteUserResponse {
|
||||||
|
pub(super) username: String,
|
||||||
|
pub(super) in_runtime: bool,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub(super) struct CreateUserRequest {
|
pub(super) struct CreateUserRequest {
|
||||||
pub(super) username: String,
|
pub(super) username: String,
|
||||||
|
|||||||
+52
-4
@@ -136,6 +136,7 @@ pub(super) async fn create_user(
|
|||||||
&shared.ip_tracker,
|
&shared.ip_tracker,
|
||||||
detected_ip_v4,
|
detected_ip_v4,
|
||||||
detected_ip_v6,
|
detected_ip_v6,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
let user = users
|
let user = users
|
||||||
@@ -143,6 +144,7 @@ pub(super) async fn create_user(
|
|||||||
.find(|entry| entry.username == body.username)
|
.find(|entry| entry.username == body.username)
|
||||||
.unwrap_or(UserInfo {
|
.unwrap_or(UserInfo {
|
||||||
username: body.username.clone(),
|
username: body.username.clone(),
|
||||||
|
in_runtime: false,
|
||||||
user_ad_tag: None,
|
user_ad_tag: None,
|
||||||
max_tcp_conns: cfg
|
max_tcp_conns: cfg
|
||||||
.access
|
.access
|
||||||
@@ -243,6 +245,7 @@ pub(super) async fn patch_user(
|
|||||||
&shared.ip_tracker,
|
&shared.ip_tracker,
|
||||||
detected_ip_v4,
|
detected_ip_v4,
|
||||||
detected_ip_v6,
|
detected_ip_v6,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
let user_info = users
|
let user_info = users
|
||||||
@@ -300,6 +303,7 @@ pub(super) async fn rotate_secret(
|
|||||||
&shared.ip_tracker,
|
&shared.ip_tracker,
|
||||||
detected_ip_v4,
|
detected_ip_v4,
|
||||||
detected_ip_v6,
|
detected_ip_v6,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
let user_info = users
|
let user_info = users
|
||||||
@@ -372,6 +376,7 @@ pub(super) async fn users_from_config(
|
|||||||
ip_tracker: &UserIpTracker,
|
ip_tracker: &UserIpTracker,
|
||||||
startup_detected_ip_v4: Option<IpAddr>,
|
startup_detected_ip_v4: Option<IpAddr>,
|
||||||
startup_detected_ip_v6: Option<IpAddr>,
|
startup_detected_ip_v6: Option<IpAddr>,
|
||||||
|
runtime_cfg: Option<&ProxyConfig>,
|
||||||
) -> Vec<UserInfo> {
|
) -> Vec<UserInfo> {
|
||||||
let mut names = cfg.access.users.keys().cloned().collect::<Vec<_>>();
|
let mut names = cfg.access.users.keys().cloned().collect::<Vec<_>>();
|
||||||
names.sort();
|
names.sort();
|
||||||
@@ -401,6 +406,9 @@ pub(super) async fn users_from_config(
|
|||||||
tls: Vec::new(),
|
tls: Vec::new(),
|
||||||
});
|
});
|
||||||
users.push(UserInfo {
|
users.push(UserInfo {
|
||||||
|
in_runtime: runtime_cfg
|
||||||
|
.map(|runtime| runtime.access.users.contains_key(&username))
|
||||||
|
.unwrap_or(false),
|
||||||
user_ad_tag: cfg.access.user_ad_tags.get(&username).cloned(),
|
user_ad_tag: cfg.access.user_ad_tags.get(&username).cloned(),
|
||||||
max_tcp_conns: cfg
|
max_tcp_conns: cfg
|
||||||
.access
|
.access
|
||||||
@@ -605,35 +613,75 @@ mod tests {
|
|||||||
let stats = Stats::new();
|
let stats = Stats::new();
|
||||||
let tracker = UserIpTracker::new();
|
let tracker = UserIpTracker::new();
|
||||||
|
|
||||||
let users = users_from_config(&cfg, &stats, &tracker, None, None).await;
|
let users = users_from_config(&cfg, &stats, &tracker, None, None, None).await;
|
||||||
let alice = users
|
let alice = users
|
||||||
.iter()
|
.iter()
|
||||||
.find(|entry| entry.username == "alice")
|
.find(|entry| entry.username == "alice")
|
||||||
.expect("alice must be present");
|
.expect("alice must be present");
|
||||||
|
assert!(!alice.in_runtime);
|
||||||
assert_eq!(alice.max_tcp_conns, Some(7));
|
assert_eq!(alice.max_tcp_conns, Some(7));
|
||||||
|
|
||||||
cfg.access.user_max_tcp_conns.insert("alice".to_string(), 5);
|
cfg.access.user_max_tcp_conns.insert("alice".to_string(), 5);
|
||||||
let users = users_from_config(&cfg, &stats, &tracker, None, None).await;
|
let users = users_from_config(&cfg, &stats, &tracker, None, None, None).await;
|
||||||
let alice = users
|
let alice = users
|
||||||
.iter()
|
.iter()
|
||||||
.find(|entry| entry.username == "alice")
|
.find(|entry| entry.username == "alice")
|
||||||
.expect("alice must be present");
|
.expect("alice must be present");
|
||||||
|
assert!(!alice.in_runtime);
|
||||||
assert_eq!(alice.max_tcp_conns, Some(5));
|
assert_eq!(alice.max_tcp_conns, Some(5));
|
||||||
|
|
||||||
cfg.access.user_max_tcp_conns.insert("alice".to_string(), 0);
|
cfg.access.user_max_tcp_conns.insert("alice".to_string(), 0);
|
||||||
let users = users_from_config(&cfg, &stats, &tracker, None, None).await;
|
let users = users_from_config(&cfg, &stats, &tracker, None, None, None).await;
|
||||||
let alice = users
|
let alice = users
|
||||||
.iter()
|
.iter()
|
||||||
.find(|entry| entry.username == "alice")
|
.find(|entry| entry.username == "alice")
|
||||||
.expect("alice must be present");
|
.expect("alice must be present");
|
||||||
|
assert!(!alice.in_runtime);
|
||||||
assert_eq!(alice.max_tcp_conns, Some(7));
|
assert_eq!(alice.max_tcp_conns, Some(7));
|
||||||
|
|
||||||
cfg.access.user_max_tcp_conns_global_each = 0;
|
cfg.access.user_max_tcp_conns_global_each = 0;
|
||||||
let users = users_from_config(&cfg, &stats, &tracker, None, None).await;
|
let users = users_from_config(&cfg, &stats, &tracker, None, None, None).await;
|
||||||
let alice = users
|
let alice = users
|
||||||
.iter()
|
.iter()
|
||||||
.find(|entry| entry.username == "alice")
|
.find(|entry| entry.username == "alice")
|
||||||
.expect("alice must be present");
|
.expect("alice must be present");
|
||||||
|
assert!(!alice.in_runtime);
|
||||||
assert_eq!(alice.max_tcp_conns, None);
|
assert_eq!(alice.max_tcp_conns, None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn users_from_config_marks_runtime_membership_when_snapshot_is_provided() {
|
||||||
|
let mut disk_cfg = ProxyConfig::default();
|
||||||
|
disk_cfg.access.users.insert(
|
||||||
|
"alice".to_string(),
|
||||||
|
"0123456789abcdef0123456789abcdef".to_string(),
|
||||||
|
);
|
||||||
|
disk_cfg.access.users.insert(
|
||||||
|
"bob".to_string(),
|
||||||
|
"fedcba9876543210fedcba9876543210".to_string(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut runtime_cfg = ProxyConfig::default();
|
||||||
|
runtime_cfg.access.users.insert(
|
||||||
|
"alice".to_string(),
|
||||||
|
"0123456789abcdef0123456789abcdef".to_string(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let stats = Stats::new();
|
||||||
|
let tracker = UserIpTracker::new();
|
||||||
|
let users =
|
||||||
|
users_from_config(&disk_cfg, &stats, &tracker, None, None, Some(&runtime_cfg)).await;
|
||||||
|
|
||||||
|
let alice = users
|
||||||
|
.iter()
|
||||||
|
.find(|entry| entry.username == "alice")
|
||||||
|
.expect("alice must be present");
|
||||||
|
let bob = users
|
||||||
|
.iter()
|
||||||
|
.find(|entry| entry.username == "bob")
|
||||||
|
.expect("bob must be present");
|
||||||
|
|
||||||
|
assert!(alice.in_runtime);
|
||||||
|
assert!(!bob.in_runtime);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -595,6 +595,14 @@ pub(crate) fn default_mask_relay_max_bytes() -> usize {
|
|||||||
32 * 1024
|
32 * 1024
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_mask_relay_timeout_secs() -> u64 {
|
||||||
|
60
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_mask_relay_idle_timeout_secs() -> u64 {
|
||||||
|
5
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_mask_classifier_prefetch_timeout_ms() -> u64 {
|
pub(crate) fn default_mask_classifier_prefetch_timeout_ms() -> u64 {
|
||||||
5
|
5
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1568,6 +1568,17 @@ pub struct AntiCensorshipConfig {
|
|||||||
#[serde(default = "default_mask_relay_max_bytes")]
|
#[serde(default = "default_mask_relay_max_bytes")]
|
||||||
pub mask_relay_max_bytes: usize,
|
pub mask_relay_max_bytes: usize,
|
||||||
|
|
||||||
|
/// Wall-clock cap for the full masking relay on non-MTProto fallback paths.
|
||||||
|
/// Raise when the mask target is a long-lived service (e.g. WebSocket).
|
||||||
|
#[serde(default = "default_mask_relay_timeout_secs")]
|
||||||
|
pub mask_relay_timeout_secs: u64,
|
||||||
|
|
||||||
|
/// Per-read idle timeout on masking relay and drain paths.
|
||||||
|
/// Limits resource consumption by slow-loris attacks and port scanners.
|
||||||
|
/// A read call stalling beyond this is treated as an abandoned connection.
|
||||||
|
#[serde(default = "default_mask_relay_idle_timeout_secs")]
|
||||||
|
pub mask_relay_idle_timeout_secs: u64,
|
||||||
|
|
||||||
/// Prefetch timeout (ms) for extending fragmented masking classifier window.
|
/// Prefetch timeout (ms) for extending fragmented masking classifier window.
|
||||||
#[serde(default = "default_mask_classifier_prefetch_timeout_ms")]
|
#[serde(default = "default_mask_classifier_prefetch_timeout_ms")]
|
||||||
pub mask_classifier_prefetch_timeout_ms: u64,
|
pub mask_classifier_prefetch_timeout_ms: u64,
|
||||||
@@ -1613,6 +1624,8 @@ impl Default for AntiCensorshipConfig {
|
|||||||
mask_shape_above_cap_blur: default_mask_shape_above_cap_blur(),
|
mask_shape_above_cap_blur: default_mask_shape_above_cap_blur(),
|
||||||
mask_shape_above_cap_blur_max_bytes: default_mask_shape_above_cap_blur_max_bytes(),
|
mask_shape_above_cap_blur_max_bytes: default_mask_shape_above_cap_blur_max_bytes(),
|
||||||
mask_relay_max_bytes: default_mask_relay_max_bytes(),
|
mask_relay_max_bytes: default_mask_relay_max_bytes(),
|
||||||
|
mask_relay_timeout_secs: default_mask_relay_timeout_secs(),
|
||||||
|
mask_relay_idle_timeout_secs: default_mask_relay_idle_timeout_secs(),
|
||||||
mask_classifier_prefetch_timeout_ms: default_mask_classifier_prefetch_timeout_ms(),
|
mask_classifier_prefetch_timeout_ms: default_mask_classifier_prefetch_timeout_ms(),
|
||||||
mask_timing_normalization_enabled: default_mask_timing_normalization_enabled(),
|
mask_timing_normalization_enabled: default_mask_timing_normalization_enabled(),
|
||||||
mask_timing_normalization_floor_ms: default_mask_timing_normalization_floor_ms(),
|
mask_timing_normalization_floor_ms: default_mask_timing_normalization_floor_ms(),
|
||||||
|
|||||||
@@ -26,6 +26,15 @@ pub struct UserIpTracker {
|
|||||||
cleanup_drain_lock: Arc<AsyncMutex<()>>,
|
cleanup_drain_lock: Arc<AsyncMutex<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub struct UserIpTrackerMemoryStats {
|
||||||
|
pub active_users: usize,
|
||||||
|
pub recent_users: usize,
|
||||||
|
pub active_entries: usize,
|
||||||
|
pub recent_entries: usize,
|
||||||
|
pub cleanup_queue_len: usize,
|
||||||
|
}
|
||||||
|
|
||||||
impl UserIpTracker {
|
impl UserIpTracker {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -141,6 +150,13 @@ impl UserIpTracker {
|
|||||||
|
|
||||||
let mut active_ips = self.active_ips.write().await;
|
let mut active_ips = self.active_ips.write().await;
|
||||||
let mut recent_ips = self.recent_ips.write().await;
|
let mut recent_ips = self.recent_ips.write().await;
|
||||||
|
let window = *self.limit_window.read().await;
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
for user_recent in recent_ips.values_mut() {
|
||||||
|
Self::prune_recent(user_recent, now, window);
|
||||||
|
}
|
||||||
|
|
||||||
let mut users =
|
let mut users =
|
||||||
Vec::<String>::with_capacity(active_ips.len().saturating_add(recent_ips.len()));
|
Vec::<String>::with_capacity(active_ips.len().saturating_add(recent_ips.len()));
|
||||||
users.extend(active_ips.keys().cloned());
|
users.extend(active_ips.keys().cloned());
|
||||||
@@ -166,6 +182,26 @@ impl UserIpTracker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn memory_stats(&self) -> UserIpTrackerMemoryStats {
|
||||||
|
let cleanup_queue_len = self
|
||||||
|
.cleanup_queue
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|poisoned| poisoned.into_inner())
|
||||||
|
.len();
|
||||||
|
let active_ips = self.active_ips.read().await;
|
||||||
|
let recent_ips = self.recent_ips.read().await;
|
||||||
|
let active_entries = active_ips.values().map(HashMap::len).sum();
|
||||||
|
let recent_entries = recent_ips.values().map(HashMap::len).sum();
|
||||||
|
|
||||||
|
UserIpTrackerMemoryStats {
|
||||||
|
active_users: active_ips.len(),
|
||||||
|
recent_users: recent_ips.len(),
|
||||||
|
active_entries,
|
||||||
|
recent_entries,
|
||||||
|
cleanup_queue_len,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn set_limit_policy(&self, mode: UserMaxUniqueIpsMode, window_secs: u64) {
|
pub async fn set_limit_policy(&self, mode: UserMaxUniqueIpsMode, window_secs: u64) {
|
||||||
{
|
{
|
||||||
let mut current_mode = self.limit_mode.write().await;
|
let mut current_mode = self.limit_mode.write().await;
|
||||||
@@ -451,6 +487,7 @@ impl Default for UserIpTracker {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
fn test_ipv4(oct1: u8, oct2: u8, oct3: u8, oct4: u8) -> IpAddr {
|
fn test_ipv4(oct1: u8, oct2: u8, oct3: u8, oct4: u8) -> IpAddr {
|
||||||
IpAddr::V4(Ipv4Addr::new(oct1, oct2, oct3, oct4))
|
IpAddr::V4(Ipv4Addr::new(oct1, oct2, oct3, oct4))
|
||||||
@@ -764,4 +801,54 @@ mod tests {
|
|||||||
tokio::time::sleep(Duration::from_millis(1100)).await;
|
tokio::time::sleep(Duration::from_millis(1100)).await;
|
||||||
assert!(tracker.check_and_add("test_user", ip2).await.is_ok());
|
assert!(tracker.check_and_add("test_user", ip2).await.is_ok());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_memory_stats_reports_queue_and_entry_counts() {
|
||||||
|
let tracker = UserIpTracker::new();
|
||||||
|
tracker.set_user_limit("test_user", 4).await;
|
||||||
|
let ip1 = test_ipv4(10, 2, 0, 1);
|
||||||
|
let ip2 = test_ipv4(10, 2, 0, 2);
|
||||||
|
|
||||||
|
tracker.check_and_add("test_user", ip1).await.unwrap();
|
||||||
|
tracker.check_and_add("test_user", ip2).await.unwrap();
|
||||||
|
tracker.enqueue_cleanup("test_user".to_string(), ip1);
|
||||||
|
|
||||||
|
let snapshot = tracker.memory_stats().await;
|
||||||
|
assert_eq!(snapshot.active_users, 1);
|
||||||
|
assert_eq!(snapshot.recent_users, 1);
|
||||||
|
assert_eq!(snapshot.active_entries, 2);
|
||||||
|
assert_eq!(snapshot.recent_entries, 2);
|
||||||
|
assert_eq!(snapshot.cleanup_queue_len, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_compact_prunes_stale_recent_entries() {
|
||||||
|
let tracker = UserIpTracker::new();
|
||||||
|
tracker
|
||||||
|
.set_limit_policy(UserMaxUniqueIpsMode::TimeWindow, 1)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let stale_user = "stale-user".to_string();
|
||||||
|
let stale_ip = test_ipv4(10, 3, 0, 1);
|
||||||
|
{
|
||||||
|
let mut recent_ips = tracker.recent_ips.write().await;
|
||||||
|
recent_ips
|
||||||
|
.entry(stale_user.clone())
|
||||||
|
.or_insert_with(HashMap::new)
|
||||||
|
.insert(stale_ip, Instant::now() - Duration::from_secs(5));
|
||||||
|
}
|
||||||
|
|
||||||
|
tracker.last_compact_epoch_secs.store(0, Ordering::Relaxed);
|
||||||
|
tracker
|
||||||
|
.check_and_add("trigger-user", test_ipv4(10, 3, 0, 2))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let recent_ips = tracker.recent_ips.read().await;
|
||||||
|
let stale_exists = recent_ips
|
||||||
|
.get(&stale_user)
|
||||||
|
.map(|ips| ips.contains_key(&stale_ip))
|
||||||
|
.unwrap_or(false);
|
||||||
|
assert!(!stale_exists);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+102
@@ -293,6 +293,27 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_buffer_pool_buffers_total Snapshot of pooled and allocated buffers"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_buffer_pool_buffers_total gauge");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_buffer_pool_buffers_total{{kind=\"pooled\"}} {}",
|
||||||
|
stats.get_buffer_pool_pooled_gauge()
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_buffer_pool_buffers_total{{kind=\"allocated\"}} {}",
|
||||||
|
stats.get_buffer_pool_allocated_gauge()
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_buffer_pool_buffers_total{{kind=\"in_use\"}} {}",
|
||||||
|
stats.get_buffer_pool_in_use_gauge()
|
||||||
|
);
|
||||||
|
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
out,
|
out,
|
||||||
"# HELP telemt_connections_total Total accepted connections"
|
"# HELP telemt_connections_total Total accepted connections"
|
||||||
@@ -941,6 +962,39 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_c2me_enqueue_events_total ME client->ME enqueue outcomes"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_c2me_enqueue_events_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_c2me_enqueue_events_total{{event=\"full\"}} {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_c2me_send_full_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_c2me_enqueue_events_total{{event=\"high_water\"}} {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_c2me_send_high_water_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_c2me_enqueue_events_total{{event=\"timeout\"}} {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_c2me_send_timeout_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
out,
|
out,
|
||||||
"# HELP telemt_me_d2c_batches_total Total DC->Client flush batches"
|
"# HELP telemt_me_d2c_batches_total Total DC->Client flush batches"
|
||||||
@@ -2490,6 +2544,48 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||||||
if user_enabled { 0 } else { 1 }
|
if user_enabled { 0 } else { 1 }
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let ip_memory = ip_tracker.memory_stats().await;
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_ip_tracker_users Number of users tracked by IP limiter state"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_ip_tracker_users gauge");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_ip_tracker_users{{scope=\"active\"}} {}",
|
||||||
|
ip_memory.active_users
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_ip_tracker_users{{scope=\"recent\"}} {}",
|
||||||
|
ip_memory.recent_users
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_ip_tracker_entries Number of IP entries tracked by limiter state"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_ip_tracker_entries gauge");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_ip_tracker_entries{{scope=\"active\"}} {}",
|
||||||
|
ip_memory.active_entries
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_ip_tracker_entries{{scope=\"recent\"}} {}",
|
||||||
|
ip_memory.recent_entries
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_ip_tracker_cleanup_queue_len Deferred disconnect cleanup queue length"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_ip_tracker_cleanup_queue_len gauge");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_ip_tracker_cleanup_queue_len {}",
|
||||||
|
ip_memory.cleanup_queue_len
|
||||||
|
);
|
||||||
|
|
||||||
if user_enabled {
|
if user_enabled {
|
||||||
for entry in stats.iter_user_stats() {
|
for entry in stats.iter_user_stats() {
|
||||||
let user = entry.key();
|
let user = entry.key();
|
||||||
@@ -2728,6 +2824,9 @@ mod tests {
|
|||||||
assert!(output.contains("telemt_user_unique_ips_recent_window{user=\"alice\"} 1"));
|
assert!(output.contains("telemt_user_unique_ips_recent_window{user=\"alice\"} 1"));
|
||||||
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 4"));
|
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 4"));
|
||||||
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.250000"));
|
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.250000"));
|
||||||
|
assert!(output.contains("telemt_ip_tracker_users{scope=\"active\"} 1"));
|
||||||
|
assert!(output.contains("telemt_ip_tracker_entries{scope=\"active\"} 1"));
|
||||||
|
assert!(output.contains("telemt_ip_tracker_cleanup_queue_len 0"));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -2799,6 +2898,9 @@ mod tests {
|
|||||||
assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge"));
|
assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge"));
|
||||||
assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge"));
|
assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge"));
|
||||||
assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge"));
|
assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge"));
|
||||||
|
assert!(output.contains("# TYPE telemt_ip_tracker_users gauge"));
|
||||||
|
assert!(output.contains("# TYPE telemt_ip_tracker_entries gauge"));
|
||||||
|
assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_queue_len gauge"));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|||||||
@@ -276,6 +276,7 @@ where
|
|||||||
stats.increment_user_connects(user);
|
stats.increment_user_connects(user);
|
||||||
let _direct_connection_lease = stats.acquire_direct_connection_lease();
|
let _direct_connection_lease = stats.acquire_direct_connection_lease();
|
||||||
|
|
||||||
|
let buffer_pool_trim = Arc::clone(&buffer_pool);
|
||||||
let relay_result = relay_bidirectional(
|
let relay_result = relay_bidirectional(
|
||||||
client_reader,
|
client_reader,
|
||||||
client_writer,
|
client_writer,
|
||||||
@@ -321,6 +322,13 @@ where
|
|||||||
Err(e) => debug!(user = %user, error = %e, "Direct relay ended with error"),
|
Err(e) => debug!(user = %user, error = %e, "Direct relay ended with error"),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
buffer_pool_trim.trim_to(buffer_pool_trim.max_buffers().min(64));
|
||||||
|
let pool_snapshot = buffer_pool_trim.stats();
|
||||||
|
stats.set_buffer_pool_gauges(
|
||||||
|
pool_snapshot.pooled,
|
||||||
|
pool_snapshot.allocated,
|
||||||
|
pool_snapshot.allocated.saturating_sub(pool_snapshot.pooled),
|
||||||
|
);
|
||||||
relay_result
|
relay_result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+58
-21
@@ -28,14 +28,10 @@ use tracing::debug;
|
|||||||
const MASK_TIMEOUT: Duration = Duration::from_secs(5);
|
const MASK_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
const MASK_TIMEOUT: Duration = Duration::from_millis(50);
|
const MASK_TIMEOUT: Duration = Duration::from_millis(50);
|
||||||
/// Maximum duration for the entire masking relay.
|
/// Maximum duration for the entire masking relay under test (replaced by config at runtime).
|
||||||
/// Limits resource consumption from slow-loris attacks and port scanners.
|
|
||||||
#[cfg(not(test))]
|
|
||||||
const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60);
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
const MASK_RELAY_TIMEOUT: Duration = Duration::from_millis(200);
|
const MASK_RELAY_TIMEOUT: Duration = Duration::from_millis(200);
|
||||||
#[cfg(not(test))]
|
/// Per-read idle timeout for masking relay and drain paths under test (replaced by config at runtime).
|
||||||
const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_secs(5);
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
|
const MASK_RELAY_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
|
||||||
const MASK_BUFFER_SIZE: usize = 8192;
|
const MASK_BUFFER_SIZE: usize = 8192;
|
||||||
@@ -55,6 +51,7 @@ async fn copy_with_idle_timeout<R, W>(
|
|||||||
writer: &mut W,
|
writer: &mut W,
|
||||||
byte_cap: usize,
|
byte_cap: usize,
|
||||||
shutdown_on_eof: bool,
|
shutdown_on_eof: bool,
|
||||||
|
idle_timeout: Duration,
|
||||||
) -> CopyOutcome
|
) -> CopyOutcome
|
||||||
where
|
where
|
||||||
R: AsyncRead + Unpin,
|
R: AsyncRead + Unpin,
|
||||||
@@ -78,7 +75,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
let read_len = remaining_budget.min(MASK_BUFFER_SIZE);
|
let read_len = remaining_budget.min(MASK_BUFFER_SIZE);
|
||||||
let read_res = timeout(MASK_RELAY_IDLE_TIMEOUT, reader.read(&mut buf[..read_len])).await;
|
let read_res = timeout(idle_timeout, reader.read(&mut buf[..read_len])).await;
|
||||||
let n = match read_res {
|
let n = match read_res {
|
||||||
Ok(Ok(n)) => n,
|
Ok(Ok(n)) => n,
|
||||||
Ok(Err(_)) | Err(_) => break,
|
Ok(Err(_)) | Err(_) => break,
|
||||||
@@ -86,13 +83,13 @@ where
|
|||||||
if n == 0 {
|
if n == 0 {
|
||||||
ended_by_eof = true;
|
ended_by_eof = true;
|
||||||
if shutdown_on_eof {
|
if shutdown_on_eof {
|
||||||
let _ = timeout(MASK_RELAY_IDLE_TIMEOUT, writer.shutdown()).await;
|
let _ = timeout(idle_timeout, writer.shutdown()).await;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
total = total.saturating_add(n);
|
total = total.saturating_add(n);
|
||||||
|
|
||||||
let write_res = timeout(MASK_RELAY_IDLE_TIMEOUT, writer.write_all(&buf[..n])).await;
|
let write_res = timeout(idle_timeout, writer.write_all(&buf[..n])).await;
|
||||||
match write_res {
|
match write_res {
|
||||||
Ok(Ok(())) => {}
|
Ok(Ok(())) => {}
|
||||||
Ok(Err(_)) | Err(_) => break,
|
Ok(Err(_)) | Err(_) => break,
|
||||||
@@ -230,11 +227,15 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn consume_client_data_with_timeout_and_cap<R>(reader: R, byte_cap: usize)
|
async fn consume_client_data_with_timeout_and_cap<R>(
|
||||||
where
|
reader: R,
|
||||||
|
byte_cap: usize,
|
||||||
|
relay_timeout: Duration,
|
||||||
|
idle_timeout: Duration,
|
||||||
|
) where
|
||||||
R: AsyncRead + Unpin,
|
R: AsyncRead + Unpin,
|
||||||
{
|
{
|
||||||
if timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader, byte_cap))
|
if timeout(relay_timeout, consume_client_data(reader, byte_cap, idle_timeout))
|
||||||
.await
|
.await
|
||||||
.is_err()
|
.is_err()
|
||||||
{
|
{
|
||||||
@@ -598,10 +599,18 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
beobachten.record(client_type, peer.ip(), ttl);
|
beobachten.record(client_type, peer.ip(), ttl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let relay_timeout = Duration::from_secs(config.censorship.mask_relay_timeout_secs);
|
||||||
|
let idle_timeout = Duration::from_secs(config.censorship.mask_relay_idle_timeout_secs);
|
||||||
|
|
||||||
if !config.censorship.mask {
|
if !config.censorship.mask {
|
||||||
// Masking disabled, just consume data
|
// Masking disabled, just consume data
|
||||||
consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes)
|
consume_client_data_with_timeout_and_cap(
|
||||||
.await;
|
reader,
|
||||||
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
relay_timeout,
|
||||||
|
idle_timeout,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -633,7 +642,7 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if timeout(
|
if timeout(
|
||||||
MASK_RELAY_TIMEOUT,
|
relay_timeout,
|
||||||
relay_to_mask(
|
relay_to_mask(
|
||||||
reader,
|
reader,
|
||||||
writer,
|
writer,
|
||||||
@@ -647,6 +656,7 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
config.censorship.mask_shape_above_cap_blur_max_bytes,
|
config.censorship.mask_shape_above_cap_blur_max_bytes,
|
||||||
config.censorship.mask_shape_hardening_aggressive_mode,
|
config.censorship.mask_shape_hardening_aggressive_mode,
|
||||||
config.censorship.mask_relay_max_bytes,
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
idle_timeout,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -662,6 +672,8 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
consume_client_data_with_timeout_and_cap(
|
consume_client_data_with_timeout_and_cap(
|
||||||
reader,
|
reader,
|
||||||
config.censorship.mask_relay_max_bytes,
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
relay_timeout,
|
||||||
|
idle_timeout,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
wait_mask_outcome_budget(outcome_started, config).await;
|
wait_mask_outcome_budget(outcome_started, config).await;
|
||||||
@@ -671,6 +683,8 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
consume_client_data_with_timeout_and_cap(
|
consume_client_data_with_timeout_and_cap(
|
||||||
reader,
|
reader,
|
||||||
config.censorship.mask_relay_max_bytes,
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
relay_timeout,
|
||||||
|
idle_timeout,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
wait_mask_outcome_budget(outcome_started, config).await;
|
wait_mask_outcome_budget(outcome_started, config).await;
|
||||||
@@ -701,8 +715,13 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
local = %local_addr,
|
local = %local_addr,
|
||||||
"Mask target resolves to local listener; refusing self-referential masking fallback"
|
"Mask target resolves to local listener; refusing self-referential masking fallback"
|
||||||
);
|
);
|
||||||
consume_client_data_with_timeout_and_cap(reader, config.censorship.mask_relay_max_bytes)
|
consume_client_data_with_timeout_and_cap(
|
||||||
.await;
|
reader,
|
||||||
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
relay_timeout,
|
||||||
|
idle_timeout,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
wait_mask_outcome_budget(outcome_started, config).await;
|
wait_mask_outcome_budget(outcome_started, config).await;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -736,7 +755,7 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if timeout(
|
if timeout(
|
||||||
MASK_RELAY_TIMEOUT,
|
relay_timeout,
|
||||||
relay_to_mask(
|
relay_to_mask(
|
||||||
reader,
|
reader,
|
||||||
writer,
|
writer,
|
||||||
@@ -750,6 +769,7 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
config.censorship.mask_shape_above_cap_blur_max_bytes,
|
config.censorship.mask_shape_above_cap_blur_max_bytes,
|
||||||
config.censorship.mask_shape_hardening_aggressive_mode,
|
config.censorship.mask_shape_hardening_aggressive_mode,
|
||||||
config.censorship.mask_relay_max_bytes,
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
idle_timeout,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -765,6 +785,8 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
consume_client_data_with_timeout_and_cap(
|
consume_client_data_with_timeout_and_cap(
|
||||||
reader,
|
reader,
|
||||||
config.censorship.mask_relay_max_bytes,
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
relay_timeout,
|
||||||
|
idle_timeout,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
wait_mask_outcome_budget(outcome_started, config).await;
|
wait_mask_outcome_budget(outcome_started, config).await;
|
||||||
@@ -774,6 +796,8 @@ pub async fn handle_bad_client<R, W>(
|
|||||||
consume_client_data_with_timeout_and_cap(
|
consume_client_data_with_timeout_and_cap(
|
||||||
reader,
|
reader,
|
||||||
config.censorship.mask_relay_max_bytes,
|
config.censorship.mask_relay_max_bytes,
|
||||||
|
relay_timeout,
|
||||||
|
idle_timeout,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
wait_mask_outcome_budget(outcome_started, config).await;
|
wait_mask_outcome_budget(outcome_started, config).await;
|
||||||
@@ -795,6 +819,7 @@ async fn relay_to_mask<R, W, MR, MW>(
|
|||||||
shape_above_cap_blur_max_bytes: usize,
|
shape_above_cap_blur_max_bytes: usize,
|
||||||
shape_hardening_aggressive_mode: bool,
|
shape_hardening_aggressive_mode: bool,
|
||||||
mask_relay_max_bytes: usize,
|
mask_relay_max_bytes: usize,
|
||||||
|
idle_timeout: Duration,
|
||||||
) where
|
) where
|
||||||
R: AsyncRead + Unpin + Send + 'static,
|
R: AsyncRead + Unpin + Send + 'static,
|
||||||
W: AsyncWrite + Unpin + Send + 'static,
|
W: AsyncWrite + Unpin + Send + 'static,
|
||||||
@@ -816,11 +841,19 @@ async fn relay_to_mask<R, W, MR, MW>(
|
|||||||
&mut mask_write,
|
&mut mask_write,
|
||||||
mask_relay_max_bytes,
|
mask_relay_max_bytes,
|
||||||
!shape_hardening_enabled,
|
!shape_hardening_enabled,
|
||||||
|
idle_timeout,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
},
|
},
|
||||||
async {
|
async {
|
||||||
copy_with_idle_timeout(&mut mask_read, &mut writer, mask_relay_max_bytes, true).await
|
copy_with_idle_timeout(
|
||||||
|
&mut mask_read,
|
||||||
|
&mut writer,
|
||||||
|
mask_relay_max_bytes,
|
||||||
|
true,
|
||||||
|
idle_timeout,
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -848,7 +881,11 @@ async fn relay_to_mask<R, W, MR, MW>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Just consume all data from client without responding.
|
/// Just consume all data from client without responding.
|
||||||
async fn consume_client_data<R: AsyncRead + Unpin>(mut reader: R, byte_cap: usize) {
|
async fn consume_client_data<R: AsyncRead + Unpin>(
|
||||||
|
mut reader: R,
|
||||||
|
byte_cap: usize,
|
||||||
|
idle_timeout: Duration,
|
||||||
|
) {
|
||||||
if byte_cap == 0 {
|
if byte_cap == 0 {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -864,7 +901,7 @@ async fn consume_client_data<R: AsyncRead + Unpin>(mut reader: R, byte_cap: usiz
|
|||||||
}
|
}
|
||||||
|
|
||||||
let read_len = remaining_budget.min(MASK_BUFFER_SIZE);
|
let read_len = remaining_budget.min(MASK_BUFFER_SIZE);
|
||||||
let n = match timeout(MASK_RELAY_IDLE_TIMEOUT, reader.read(&mut buf[..read_len])).await {
|
let n = match timeout(idle_timeout, reader.read(&mut buf[..read_len])).await {
|
||||||
Ok(Ok(n)) => n,
|
Ok(Ok(n)) => n,
|
||||||
Ok(Err(_)) | Err(_) => break,
|
Ok(Err(_)) | Err(_) => break,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -645,11 +645,14 @@ async fn enqueue_c2me_command(
|
|||||||
tx: &mpsc::Sender<C2MeCommand>,
|
tx: &mpsc::Sender<C2MeCommand>,
|
||||||
cmd: C2MeCommand,
|
cmd: C2MeCommand,
|
||||||
send_timeout: Option<Duration>,
|
send_timeout: Option<Duration>,
|
||||||
|
stats: &Stats,
|
||||||
) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> {
|
) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> {
|
||||||
match tx.try_send(cmd) {
|
match tx.try_send(cmd) {
|
||||||
Ok(()) => Ok(()),
|
Ok(()) => Ok(()),
|
||||||
Err(mpsc::error::TrySendError::Closed(cmd)) => Err(mpsc::error::SendError(cmd)),
|
Err(mpsc::error::TrySendError::Closed(cmd)) => Err(mpsc::error::SendError(cmd)),
|
||||||
Err(mpsc::error::TrySendError::Full(cmd)) => {
|
Err(mpsc::error::TrySendError::Full(cmd)) => {
|
||||||
|
stats.increment_me_c2me_send_full_total();
|
||||||
|
stats.increment_me_c2me_send_high_water_total();
|
||||||
note_relay_pressure_event();
|
note_relay_pressure_event();
|
||||||
// Cooperative yield reduces burst catch-up when the per-conn queue is near saturation.
|
// Cooperative yield reduces burst catch-up when the per-conn queue is near saturation.
|
||||||
if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS {
|
if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS {
|
||||||
@@ -658,7 +661,10 @@ async fn enqueue_c2me_command(
|
|||||||
let reserve_result = match send_timeout {
|
let reserve_result = match send_timeout {
|
||||||
Some(send_timeout) => match timeout(send_timeout, tx.reserve()).await {
|
Some(send_timeout) => match timeout(send_timeout, tx.reserve()).await {
|
||||||
Ok(result) => result,
|
Ok(result) => result,
|
||||||
Err(_) => return Err(mpsc::error::SendError(cmd)),
|
Err(_) => {
|
||||||
|
stats.increment_me_c2me_send_timeout_total();
|
||||||
|
return Err(mpsc::error::SendError(cmd));
|
||||||
|
}
|
||||||
},
|
},
|
||||||
None => tx.reserve().await,
|
None => tx.reserve().await,
|
||||||
};
|
};
|
||||||
@@ -667,7 +673,10 @@ async fn enqueue_c2me_command(
|
|||||||
permit.send(cmd);
|
permit.send(cmd);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Err(_) => Err(mpsc::error::SendError(cmd)),
|
Err(_) => {
|
||||||
|
stats.increment_me_c2me_send_timeout_total();
|
||||||
|
Err(mpsc::error::SendError(cmd))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -841,11 +850,23 @@ where
|
|||||||
let me_writer = tokio::spawn(async move {
|
let me_writer = tokio::spawn(async move {
|
||||||
let mut writer = crypto_writer;
|
let mut writer = crypto_writer;
|
||||||
let mut frame_buf = Vec::with_capacity(16 * 1024);
|
let mut frame_buf = Vec::with_capacity(16 * 1024);
|
||||||
|
let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes;
|
||||||
|
|
||||||
|
fn shrink_session_vec(buf: &mut Vec<u8>, threshold: usize) {
|
||||||
|
if buf.capacity() > threshold {
|
||||||
|
buf.clear();
|
||||||
|
buf.shrink_to(threshold);
|
||||||
|
} else {
|
||||||
|
buf.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
msg = me_rx_task.recv() => {
|
msg = me_rx_task.recv() => {
|
||||||
let Some(first) = msg else {
|
let Some(first) = msg else {
|
||||||
debug!(conn_id, "ME channel closed");
|
debug!(conn_id, "ME channel closed");
|
||||||
|
shrink_session_vec(&mut frame_buf, shrink_threshold);
|
||||||
return Err(ProxyError::Proxy("ME connection lost".into()));
|
return Err(ProxyError::Proxy("ME connection lost".into()));
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -901,6 +922,7 @@ where
|
|||||||
batch_bytes,
|
batch_bytes,
|
||||||
flush_duration_us,
|
flush_duration_us,
|
||||||
);
|
);
|
||||||
|
shrink_session_vec(&mut frame_buf, shrink_threshold);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -962,6 +984,7 @@ where
|
|||||||
batch_bytes,
|
batch_bytes,
|
||||||
flush_duration_us,
|
flush_duration_us,
|
||||||
);
|
);
|
||||||
|
shrink_session_vec(&mut frame_buf, shrink_threshold);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1027,6 +1050,7 @@ where
|
|||||||
batch_bytes,
|
batch_bytes,
|
||||||
flush_duration_us,
|
flush_duration_us,
|
||||||
);
|
);
|
||||||
|
shrink_session_vec(&mut frame_buf, shrink_threshold);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1091,6 +1115,7 @@ where
|
|||||||
batch_bytes,
|
batch_bytes,
|
||||||
flush_duration_us,
|
flush_duration_us,
|
||||||
);
|
);
|
||||||
|
shrink_session_vec(&mut frame_buf, shrink_threshold);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1098,6 +1123,7 @@ where
|
|||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
debug!(conn_id, "ME channel closed");
|
debug!(conn_id, "ME channel closed");
|
||||||
|
shrink_session_vec(&mut frame_buf, shrink_threshold);
|
||||||
return Err(ProxyError::Proxy("ME connection lost".into()));
|
return Err(ProxyError::Proxy("ME connection lost".into()));
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
@@ -1147,6 +1173,7 @@ where
|
|||||||
}
|
}
|
||||||
_ = &mut stop_rx => {
|
_ = &mut stop_rx => {
|
||||||
debug!(conn_id, "ME writer stop signal");
|
debug!(conn_id, "ME writer stop signal");
|
||||||
|
shrink_session_vec(&mut frame_buf, shrink_threshold);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1172,7 +1199,13 @@ where
|
|||||||
user = %user,
|
user = %user,
|
||||||
"Middle-relay pressure eviction for idle-candidate session"
|
"Middle-relay pressure eviction for idle-candidate session"
|
||||||
);
|
);
|
||||||
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await;
|
let _ = enqueue_c2me_command(
|
||||||
|
&c2me_tx,
|
||||||
|
C2MeCommand::Close,
|
||||||
|
c2me_send_timeout,
|
||||||
|
stats.as_ref(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
main_result = Err(ProxyError::Proxy(
|
main_result = Err(ProxyError::Proxy(
|
||||||
"middle-relay session evicted under pressure (idle-candidate)".to_string(),
|
"middle-relay session evicted under pressure (idle-candidate)".to_string(),
|
||||||
));
|
));
|
||||||
@@ -1191,7 +1224,13 @@ where
|
|||||||
"Cutover affected middle session, closing client connection"
|
"Cutover affected middle session, closing client connection"
|
||||||
);
|
);
|
||||||
tokio::time::sleep(delay).await;
|
tokio::time::sleep(delay).await;
|
||||||
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await;
|
let _ = enqueue_c2me_command(
|
||||||
|
&c2me_tx,
|
||||||
|
C2MeCommand::Close,
|
||||||
|
c2me_send_timeout,
|
||||||
|
stats.as_ref(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
|
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -1253,6 +1292,7 @@ where
|
|||||||
&c2me_tx,
|
&c2me_tx,
|
||||||
C2MeCommand::Data { payload, flags },
|
C2MeCommand::Data { payload, flags },
|
||||||
c2me_send_timeout,
|
c2me_send_timeout,
|
||||||
|
stats.as_ref(),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.is_err()
|
.is_err()
|
||||||
@@ -1264,9 +1304,13 @@ where
|
|||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
debug!(conn_id, "Client EOF");
|
debug!(conn_id, "Client EOF");
|
||||||
client_closed = true;
|
client_closed = true;
|
||||||
let _ =
|
let _ = enqueue_c2me_command(
|
||||||
enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout)
|
&c2me_tx,
|
||||||
.await;
|
C2MeCommand::Close,
|
||||||
|
c2me_send_timeout,
|
||||||
|
stats.as_ref(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -1317,6 +1361,13 @@ where
|
|||||||
);
|
);
|
||||||
clear_relay_idle_candidate(conn_id);
|
clear_relay_idle_candidate(conn_id);
|
||||||
me_pool.registry().unregister(conn_id).await;
|
me_pool.registry().unregister(conn_id).await;
|
||||||
|
buffer_pool.trim_to(buffer_pool.max_buffers().min(64));
|
||||||
|
let pool_snapshot = buffer_pool.stats();
|
||||||
|
stats.set_buffer_pool_gauges(
|
||||||
|
pool_snapshot.pooled,
|
||||||
|
pool_snapshot.allocated,
|
||||||
|
pool_snapshot.allocated.saturating_sub(pool_snapshot.pooled),
|
||||||
|
);
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -562,9 +562,10 @@ async fn timing_classifier_light_fuzz_pairwise_bucketed_accuracy_stays_bounded_u
|
|||||||
if low_info_pair_count > 0 {
|
if low_info_pair_count > 0 {
|
||||||
let low_info_baseline_avg = low_info_baseline_sum / low_info_pair_count as f64;
|
let low_info_baseline_avg = low_info_baseline_sum / low_info_pair_count as f64;
|
||||||
let low_info_hardened_avg = low_info_hardened_sum / low_info_pair_count as f64;
|
let low_info_hardened_avg = low_info_hardened_sum / low_info_pair_count as f64;
|
||||||
|
let low_info_avg_jitter_budget = 0.40 + acc_quant_step;
|
||||||
assert!(
|
assert!(
|
||||||
low_info_hardened_avg <= low_info_baseline_avg + 0.40,
|
low_info_hardened_avg <= low_info_baseline_avg + low_info_avg_jitter_budget,
|
||||||
"normalization low-info average drift exceeded jitter budget: baseline_avg={low_info_baseline_avg:.3} hardened_avg={low_info_hardened_avg:.3}"
|
"normalization low-info average drift exceeded jitter budget: baseline_avg={low_info_baseline_avg:.3} hardened_avg={low_info_hardened_avg:.3} tolerated={low_info_avg_jitter_budget:.3}"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ async fn consume_client_data_stops_after_byte_cap_without_eof() {
|
|||||||
};
|
};
|
||||||
let cap = 10_000usize;
|
let cap = 10_000usize;
|
||||||
|
|
||||||
consume_client_data(reader, cap).await;
|
consume_client_data(reader, cap, MASK_RELAY_IDLE_TIMEOUT).await;
|
||||||
|
|
||||||
let total = produced.load(Ordering::Relaxed);
|
let total = produced.load(Ordering::Relaxed);
|
||||||
assert!(
|
assert!(
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ async fn stalling_client_terminates_at_idle_not_relay_timeout() {
|
|||||||
|
|
||||||
let result = tokio::time::timeout(
|
let result = tokio::time::timeout(
|
||||||
MASK_RELAY_TIMEOUT,
|
MASK_RELAY_TIMEOUT,
|
||||||
consume_client_data(reader, MASK_BUFFER_SIZE * 4),
|
consume_client_data(reader, MASK_BUFFER_SIZE * 4, MASK_RELAY_IDLE_TIMEOUT),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -57,7 +57,7 @@ async fn fast_reader_drains_to_eof() {
|
|||||||
let data = vec![0xAAu8; 32 * 1024];
|
let data = vec![0xAAu8; 32 * 1024];
|
||||||
let reader = std::io::Cursor::new(data);
|
let reader = std::io::Cursor::new(data);
|
||||||
|
|
||||||
tokio::time::timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader, usize::MAX))
|
tokio::time::timeout(MASK_RELAY_TIMEOUT, consume_client_data(reader, usize::MAX, MASK_RELAY_IDLE_TIMEOUT))
|
||||||
.await
|
.await
|
||||||
.expect("consume_client_data did not complete for fast EOF reader");
|
.expect("consume_client_data did not complete for fast EOF reader");
|
||||||
}
|
}
|
||||||
@@ -81,7 +81,7 @@ async fn io_error_terminates_cleanly() {
|
|||||||
|
|
||||||
tokio::time::timeout(
|
tokio::time::timeout(
|
||||||
MASK_RELAY_TIMEOUT,
|
MASK_RELAY_TIMEOUT,
|
||||||
consume_client_data(ErrReader, usize::MAX),
|
consume_client_data(ErrReader, usize::MAX, MASK_RELAY_IDLE_TIMEOUT),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.expect("consume_client_data did not return on I/O error");
|
.expect("consume_client_data did not return on I/O error");
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ async fn consume_stall_stress_finishes_within_idle_budget() {
|
|||||||
set.spawn(async {
|
set.spawn(async {
|
||||||
tokio::time::timeout(
|
tokio::time::timeout(
|
||||||
MASK_RELAY_TIMEOUT,
|
MASK_RELAY_TIMEOUT,
|
||||||
consume_client_data(OneByteThenStall { sent: false }, usize::MAX),
|
consume_client_data(OneByteThenStall { sent: false }, usize::MAX, MASK_RELAY_IDLE_TIMEOUT),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.expect("consume_client_data exceeded relay timeout under stall load");
|
.expect("consume_client_data exceeded relay timeout under stall load");
|
||||||
@@ -56,7 +56,7 @@ async fn consume_stall_stress_finishes_within_idle_budget() {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn consume_zero_cap_returns_immediately() {
|
async fn consume_zero_cap_returns_immediately() {
|
||||||
let started = Instant::now();
|
let started = Instant::now();
|
||||||
consume_client_data(tokio::io::empty(), 0).await;
|
consume_client_data(tokio::io::empty(), 0, MASK_RELAY_IDLE_TIMEOUT).await;
|
||||||
assert!(
|
assert!(
|
||||||
started.elapsed() < MASK_RELAY_IDLE_TIMEOUT,
|
started.elapsed() < MASK_RELAY_IDLE_TIMEOUT,
|
||||||
"zero byte cap must return immediately"
|
"zero byte cap must return immediately"
|
||||||
|
|||||||
@@ -127,7 +127,8 @@ async fn positive_copy_with_production_cap_stops_exactly_at_budget() {
|
|||||||
let mut reader = FinitePatternReader::new(PROD_CAP_BYTES + (256 * 1024), 4096, read_calls);
|
let mut reader = FinitePatternReader::new(PROD_CAP_BYTES + (256 * 1024), 4096, read_calls);
|
||||||
let mut writer = CountingWriter::default();
|
let mut writer = CountingWriter::default();
|
||||||
|
|
||||||
let outcome = copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await;
|
let outcome =
|
||||||
|
copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true, MASK_RELAY_IDLE_TIMEOUT).await;
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
outcome.total, PROD_CAP_BYTES,
|
outcome.total, PROD_CAP_BYTES,
|
||||||
@@ -145,7 +146,7 @@ async fn negative_consume_with_zero_cap_performs_no_reads() {
|
|||||||
let read_calls = Arc::new(AtomicUsize::new(0));
|
let read_calls = Arc::new(AtomicUsize::new(0));
|
||||||
let reader = FinitePatternReader::new(1024, 64, Arc::clone(&read_calls));
|
let reader = FinitePatternReader::new(1024, 64, Arc::clone(&read_calls));
|
||||||
|
|
||||||
consume_client_data_with_timeout_and_cap(reader, 0).await;
|
consume_client_data_with_timeout_and_cap(reader, 0, MASK_RELAY_TIMEOUT, MASK_RELAY_IDLE_TIMEOUT).await;
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
read_calls.load(Ordering::Relaxed),
|
read_calls.load(Ordering::Relaxed),
|
||||||
@@ -161,7 +162,8 @@ async fn edge_copy_below_cap_reports_eof_without_overread() {
|
|||||||
let mut reader = FinitePatternReader::new(payload, 3072, read_calls);
|
let mut reader = FinitePatternReader::new(payload, 3072, read_calls);
|
||||||
let mut writer = CountingWriter::default();
|
let mut writer = CountingWriter::default();
|
||||||
|
|
||||||
let outcome = copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await;
|
let outcome =
|
||||||
|
copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true, MASK_RELAY_IDLE_TIMEOUT).await;
|
||||||
|
|
||||||
assert_eq!(outcome.total, payload);
|
assert_eq!(outcome.total, payload);
|
||||||
assert_eq!(writer.written, payload);
|
assert_eq!(writer.written, payload);
|
||||||
@@ -175,7 +177,7 @@ async fn edge_copy_below_cap_reports_eof_without_overread() {
|
|||||||
async fn adversarial_blackhat_never_ready_reader_is_bounded_by_timeout_guards() {
|
async fn adversarial_blackhat_never_ready_reader_is_bounded_by_timeout_guards() {
|
||||||
let started = Instant::now();
|
let started = Instant::now();
|
||||||
|
|
||||||
consume_client_data_with_timeout_and_cap(NeverReadyReader, PROD_CAP_BYTES).await;
|
consume_client_data_with_timeout_and_cap(NeverReadyReader, PROD_CAP_BYTES, MASK_RELAY_TIMEOUT, MASK_RELAY_IDLE_TIMEOUT).await;
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
started.elapsed() < Duration::from_millis(350),
|
started.elapsed() < Duration::from_millis(350),
|
||||||
@@ -190,7 +192,7 @@ async fn integration_consume_path_honors_production_cap_for_large_payload() {
|
|||||||
|
|
||||||
let bounded = timeout(
|
let bounded = timeout(
|
||||||
Duration::from_millis(350),
|
Duration::from_millis(350),
|
||||||
consume_client_data_with_timeout_and_cap(reader, PROD_CAP_BYTES),
|
consume_client_data_with_timeout_and_cap(reader, PROD_CAP_BYTES, MASK_RELAY_TIMEOUT, MASK_RELAY_IDLE_TIMEOUT),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -206,7 +208,7 @@ async fn adversarial_consume_path_never_reads_beyond_declared_byte_cap() {
|
|||||||
let total_read = Arc::new(AtomicUsize::new(0));
|
let total_read = Arc::new(AtomicUsize::new(0));
|
||||||
let reader = BudgetProbeReader::new(256 * 1024, Arc::clone(&total_read));
|
let reader = BudgetProbeReader::new(256 * 1024, Arc::clone(&total_read));
|
||||||
|
|
||||||
consume_client_data_with_timeout_and_cap(reader, byte_cap).await;
|
consume_client_data_with_timeout_and_cap(reader, byte_cap, MASK_RELAY_TIMEOUT, MASK_RELAY_IDLE_TIMEOUT).await;
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
total_read.load(Ordering::Relaxed) <= byte_cap,
|
total_read.load(Ordering::Relaxed) <= byte_cap,
|
||||||
@@ -231,7 +233,7 @@ async fn light_fuzz_cap_and_payload_matrix_preserves_min_budget_invariant() {
|
|||||||
let mut reader = FinitePatternReader::new(payload, chunk, read_calls);
|
let mut reader = FinitePatternReader::new(payload, chunk, read_calls);
|
||||||
let mut writer = CountingWriter::default();
|
let mut writer = CountingWriter::default();
|
||||||
|
|
||||||
let outcome = copy_with_idle_timeout(&mut reader, &mut writer, cap, true).await;
|
let outcome = copy_with_idle_timeout(&mut reader, &mut writer, cap, true, MASK_RELAY_IDLE_TIMEOUT).await;
|
||||||
let expected = payload.min(cap);
|
let expected = payload.min(cap);
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
@@ -261,7 +263,7 @@ async fn stress_parallel_copy_tasks_with_production_cap_complete_without_leaks()
|
|||||||
read_calls,
|
read_calls,
|
||||||
);
|
);
|
||||||
let mut writer = CountingWriter::default();
|
let mut writer = CountingWriter::default();
|
||||||
copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true).await
|
copy_with_idle_timeout(&mut reader, &mut writer, PROD_CAP_BYTES, true, MASK_RELAY_IDLE_TIMEOUT).await
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ async fn relay_to_mask_enforces_masking_session_byte_cap() {
|
|||||||
0,
|
0,
|
||||||
false,
|
false,
|
||||||
32 * 1024,
|
32 * 1024,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
@@ -81,6 +82,7 @@ async fn relay_to_mask_propagates_client_half_close_without_waiting_for_other_di
|
|||||||
0,
|
0,
|
||||||
false,
|
false,
|
||||||
32 * 1024,
|
32 * 1024,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1377,6 +1377,7 @@ async fn relay_to_mask_keeps_backend_to_client_flow_when_client_to_backend_stall
|
|||||||
0,
|
0,
|
||||||
false,
|
false,
|
||||||
5 * 1024 * 1024,
|
5 * 1024 * 1024,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
@@ -1508,6 +1509,7 @@ async fn relay_to_mask_timeout_cancels_and_drops_all_io_endpoints() {
|
|||||||
0,
|
0,
|
||||||
false,
|
false,
|
||||||
5 * 1024 * 1024,
|
5 * 1024 * 1024,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|||||||
@@ -228,6 +228,7 @@ async fn relay_path_idle_timeout_eviction_remains_effective() {
|
|||||||
0,
|
0,
|
||||||
false,
|
false,
|
||||||
5 * 1024 * 1024,
|
5 * 1024 * 1024,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ async fn run_relay_case(
|
|||||||
above_cap_blur_max_bytes,
|
above_cap_blur_max_bytes,
|
||||||
false,
|
false,
|
||||||
5 * 1024 * 1024,
|
5 * 1024 * 1024,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -89,6 +89,7 @@ async fn relay_to_mask_applies_cap_clamped_padding_for_non_power_of_two_cap() {
|
|||||||
0,
|
0,
|
||||||
false,
|
false,
|
||||||
5 * 1024 * 1024,
|
5 * 1024 * 1024,
|
||||||
|
MASK_RELAY_IDLE_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::stats::Stats;
|
||||||
use crate::stream::BufferPool;
|
use crate::stream::BufferPool;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -119,6 +120,7 @@ async fn c2me_channel_full_path_yields_then_sends() {
|
|||||||
.expect("priming queue with one frame must succeed");
|
.expect("priming queue with one frame must succeed");
|
||||||
|
|
||||||
let tx2 = tx.clone();
|
let tx2 = tx.clone();
|
||||||
|
let stats = Stats::default();
|
||||||
let producer = tokio::spawn(async move {
|
let producer = tokio::spawn(async move {
|
||||||
enqueue_c2me_command(
|
enqueue_c2me_command(
|
||||||
&tx2,
|
&tx2,
|
||||||
@@ -127,6 +129,7 @@ async fn c2me_channel_full_path_yields_then_sends() {
|
|||||||
flags: 2,
|
flags: 2,
|
||||||
},
|
},
|
||||||
None,
|
None,
|
||||||
|
&stats,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
|
|||||||
+89
-1
@@ -200,6 +200,14 @@ pub struct Stats {
|
|||||||
me_d2c_flush_duration_us_bucket_1001_5000: AtomicU64,
|
me_d2c_flush_duration_us_bucket_1001_5000: AtomicU64,
|
||||||
me_d2c_flush_duration_us_bucket_5001_20000: AtomicU64,
|
me_d2c_flush_duration_us_bucket_5001_20000: AtomicU64,
|
||||||
me_d2c_flush_duration_us_bucket_gt_20000: AtomicU64,
|
me_d2c_flush_duration_us_bucket_gt_20000: AtomicU64,
|
||||||
|
// Buffer pool gauges
|
||||||
|
buffer_pool_pooled_gauge: AtomicU64,
|
||||||
|
buffer_pool_allocated_gauge: AtomicU64,
|
||||||
|
buffer_pool_in_use_gauge: AtomicU64,
|
||||||
|
// C2ME enqueue observability
|
||||||
|
me_c2me_send_full_total: AtomicU64,
|
||||||
|
me_c2me_send_high_water_total: AtomicU64,
|
||||||
|
me_c2me_send_timeout_total: AtomicU64,
|
||||||
me_d2c_batch_timeout_armed_total: AtomicU64,
|
me_d2c_batch_timeout_armed_total: AtomicU64,
|
||||||
me_d2c_batch_timeout_fired_total: AtomicU64,
|
me_d2c_batch_timeout_fired_total: AtomicU64,
|
||||||
me_writer_pick_sorted_rr_success_try_total: AtomicU64,
|
me_writer_pick_sorted_rr_success_try_total: AtomicU64,
|
||||||
@@ -1414,6 +1422,37 @@ impl Stats {
|
|||||||
.store(value, Ordering::Relaxed);
|
.store(value, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_buffer_pool_gauges(&self, pooled: usize, allocated: usize, in_use: usize) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.buffer_pool_pooled_gauge
|
||||||
|
.store(pooled as u64, Ordering::Relaxed);
|
||||||
|
self.buffer_pool_allocated_gauge
|
||||||
|
.store(allocated as u64, Ordering::Relaxed);
|
||||||
|
self.buffer_pool_in_use_gauge
|
||||||
|
.store(in_use as u64, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn increment_me_c2me_send_full_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_c2me_send_full_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn increment_me_c2me_send_high_water_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_c2me_send_high_water_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn increment_me_c2me_send_timeout_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_c2me_send_timeout_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
pub fn increment_me_floor_cap_block_total(&self) {
|
pub fn increment_me_floor_cap_block_total(&self) {
|
||||||
if self.telemetry_me_allows_normal() {
|
if self.telemetry_me_allows_normal() {
|
||||||
self.me_floor_cap_block_total
|
self.me_floor_cap_block_total
|
||||||
@@ -1780,6 +1819,30 @@ impl Stats {
|
|||||||
self.me_d2c_flush_duration_us_bucket_gt_20000
|
self.me_d2c_flush_duration_us_bucket_gt_20000
|
||||||
.load(Ordering::Relaxed)
|
.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_buffer_pool_pooled_gauge(&self) -> u64 {
|
||||||
|
self.buffer_pool_pooled_gauge.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_buffer_pool_allocated_gauge(&self) -> u64 {
|
||||||
|
self.buffer_pool_allocated_gauge.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_buffer_pool_in_use_gauge(&self) -> u64 {
|
||||||
|
self.buffer_pool_in_use_gauge.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_me_c2me_send_full_total(&self) -> u64 {
|
||||||
|
self.me_c2me_send_full_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_me_c2me_send_high_water_total(&self) -> u64 {
|
||||||
|
self.me_c2me_send_high_water_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_me_c2me_send_timeout_total(&self) -> u64 {
|
||||||
|
self.me_c2me_send_timeout_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
pub fn get_me_d2c_batch_timeout_armed_total(&self) -> u64 {
|
pub fn get_me_d2c_batch_timeout_armed_total(&self) -> u64 {
|
||||||
self.me_d2c_batch_timeout_armed_total
|
self.me_d2c_batch_timeout_armed_total
|
||||||
.load(Ordering::Relaxed)
|
.load(Ordering::Relaxed)
|
||||||
@@ -2171,6 +2234,8 @@ impl ReplayShard {
|
|||||||
|
|
||||||
fn cleanup(&mut self, now: Instant, window: Duration) {
|
fn cleanup(&mut self, now: Instant, window: Duration) {
|
||||||
if window.is_zero() {
|
if window.is_zero() {
|
||||||
|
self.cache.clear();
|
||||||
|
self.queue.clear();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let cutoff = now.checked_sub(window).unwrap_or(now);
|
let cutoff = now.checked_sub(window).unwrap_or(now);
|
||||||
@@ -2192,13 +2257,22 @@ impl ReplayShard {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn check(&mut self, key: &[u8], now: Instant, window: Duration) -> bool {
|
fn check(&mut self, key: &[u8], now: Instant, window: Duration) -> bool {
|
||||||
|
if window.is_zero() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
self.cleanup(now, window);
|
self.cleanup(now, window);
|
||||||
// key is &[u8], resolves Q=[u8] via Box<[u8]>: Borrow<[u8]>
|
// key is &[u8], resolves Q=[u8] via Box<[u8]>: Borrow<[u8]>
|
||||||
self.cache.get(key).is_some()
|
self.cache.get(key).is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add(&mut self, key: &[u8], now: Instant, window: Duration) {
|
fn add(&mut self, key: &[u8], now: Instant, window: Duration) {
|
||||||
|
if window.is_zero() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
self.cleanup(now, window);
|
self.cleanup(now, window);
|
||||||
|
if self.cache.peek(key).is_some() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let seq = self.next_seq();
|
let seq = self.next_seq();
|
||||||
let boxed_key: Box<[u8]> = key.into();
|
let boxed_key: Box<[u8]> = key.into();
|
||||||
@@ -2341,7 +2415,7 @@ impl ReplayChecker {
|
|||||||
let interval = if self.window.as_secs() > 60 {
|
let interval = if self.window.as_secs() > 60 {
|
||||||
Duration::from_secs(30)
|
Duration::from_secs(30)
|
||||||
} else {
|
} else {
|
||||||
Duration::from_secs(self.window.as_secs().max(1) / 2)
|
Duration::from_secs((self.window.as_secs().max(1) / 2).max(1))
|
||||||
};
|
};
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
@@ -2553,6 +2627,20 @@ mod tests {
|
|||||||
assert!(!checker.check_handshake(b"expire"));
|
assert!(!checker.check_handshake(b"expire"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_replay_checker_zero_window_does_not_retain_entries() {
|
||||||
|
let checker = ReplayChecker::new(100, Duration::ZERO);
|
||||||
|
|
||||||
|
for _ in 0..1_000 {
|
||||||
|
assert!(!checker.check_handshake(b"no-retain"));
|
||||||
|
checker.add_handshake(b"no-retain");
|
||||||
|
}
|
||||||
|
|
||||||
|
let stats = checker.stats();
|
||||||
|
assert_eq!(stats.total_entries, 0);
|
||||||
|
assert_eq!(stats.total_queue_len, 0);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_replay_checker_stats() {
|
fn test_replay_checker_stats() {
|
||||||
let checker = ReplayChecker::new(100, Duration::from_secs(60));
|
let checker = ReplayChecker::new(100, Duration::from_secs(60));
|
||||||
|
|||||||
@@ -35,6 +35,10 @@ pub struct BufferPool {
|
|||||||
misses: AtomicUsize,
|
misses: AtomicUsize,
|
||||||
/// Number of successful reuses
|
/// Number of successful reuses
|
||||||
hits: AtomicUsize,
|
hits: AtomicUsize,
|
||||||
|
/// Number of non-standard buffers replaced with a fresh default-sized buffer
|
||||||
|
replaced_nonstandard: AtomicUsize,
|
||||||
|
/// Number of buffers dropped because the pool queue was full
|
||||||
|
dropped_pool_full: AtomicUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BufferPool {
|
impl BufferPool {
|
||||||
@@ -52,6 +56,8 @@ impl BufferPool {
|
|||||||
allocated: AtomicUsize::new(0),
|
allocated: AtomicUsize::new(0),
|
||||||
misses: AtomicUsize::new(0),
|
misses: AtomicUsize::new(0),
|
||||||
hits: AtomicUsize::new(0),
|
hits: AtomicUsize::new(0),
|
||||||
|
replaced_nonstandard: AtomicUsize::new(0),
|
||||||
|
dropped_pool_full: AtomicUsize::new(0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -91,17 +97,36 @@ impl BufferPool {
|
|||||||
|
|
||||||
/// Return a buffer to the pool
|
/// Return a buffer to the pool
|
||||||
fn return_buffer(&self, mut buffer: BytesMut) {
|
fn return_buffer(&self, mut buffer: BytesMut) {
|
||||||
// Clear the buffer but keep capacity
|
const MAX_RETAINED_BUFFER_FACTOR: usize = 2;
|
||||||
buffer.clear();
|
|
||||||
|
|
||||||
// Only return if we haven't exceeded max and buffer is right size
|
// Clear the buffer but keep capacity.
|
||||||
if buffer.capacity() >= self.buffer_size {
|
buffer.clear();
|
||||||
// Try to push to pool, if full just drop
|
let max_retained_capacity = self
|
||||||
let _ = self.buffers.push(buffer);
|
.buffer_size
|
||||||
|
.saturating_mul(MAX_RETAINED_BUFFER_FACTOR)
|
||||||
|
.max(self.buffer_size);
|
||||||
|
|
||||||
|
// Keep only near-default capacities in the pool. Oversized buffers keep
|
||||||
|
// RSS elevated for hours under churn; replace them with default-sized
|
||||||
|
// buffers before re-pooling.
|
||||||
|
if buffer.capacity() < self.buffer_size || buffer.capacity() > max_retained_capacity {
|
||||||
|
self.replaced_nonstandard.fetch_add(1, Ordering::Relaxed);
|
||||||
|
buffer = BytesMut::with_capacity(self.buffer_size);
|
||||||
}
|
}
|
||||||
// If buffer was dropped (pool full), decrement allocated
|
|
||||||
// Actually we don't decrement here because the buffer might have been
|
// Try to return into the queue; if full, drop and update accounting.
|
||||||
// grown beyond our size - we just let it go
|
if self.buffers.push(buffer).is_err() {
|
||||||
|
self.dropped_pool_full.fetch_add(1, Ordering::Relaxed);
|
||||||
|
self.decrement_allocated();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decrement_allocated(&self) {
|
||||||
|
let _ = self
|
||||||
|
.allocated
|
||||||
|
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
|
||||||
|
Some(current.saturating_sub(1))
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get pool statistics
|
/// Get pool statistics
|
||||||
@@ -113,6 +138,8 @@ impl BufferPool {
|
|||||||
buffer_size: self.buffer_size,
|
buffer_size: self.buffer_size,
|
||||||
hits: self.hits.load(Ordering::Relaxed),
|
hits: self.hits.load(Ordering::Relaxed),
|
||||||
misses: self.misses.load(Ordering::Relaxed),
|
misses: self.misses.load(Ordering::Relaxed),
|
||||||
|
replaced_nonstandard: self.replaced_nonstandard.load(Ordering::Relaxed),
|
||||||
|
dropped_pool_full: self.dropped_pool_full.load(Ordering::Relaxed),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -121,6 +148,41 @@ impl BufferPool {
|
|||||||
self.buffer_size
|
self.buffer_size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Maximum number of buffers the pool will retain.
|
||||||
|
pub fn max_buffers(&self) -> usize {
|
||||||
|
self.max_buffers
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Current number of pooled buffers.
|
||||||
|
pub fn pooled(&self) -> usize {
|
||||||
|
self.buffers.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Total buffers allocated (pooled + checked out).
|
||||||
|
pub fn allocated(&self) -> usize {
|
||||||
|
self.allocated.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Best-effort number of buffers currently checked out.
|
||||||
|
pub fn in_use(&self) -> usize {
|
||||||
|
self.allocated().saturating_sub(self.pooled())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Trim pooled buffers down to a target count.
|
||||||
|
pub fn trim_to(&self, target_pooled: usize) {
|
||||||
|
let target = target_pooled.min(self.max_buffers);
|
||||||
|
loop {
|
||||||
|
if self.buffers.len() <= target {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if self.buffers.pop().is_some() {
|
||||||
|
self.decrement_allocated();
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Preallocate buffers to fill the pool
|
/// Preallocate buffers to fill the pool
|
||||||
pub fn preallocate(&self, count: usize) {
|
pub fn preallocate(&self, count: usize) {
|
||||||
let to_alloc = count.min(self.max_buffers);
|
let to_alloc = count.min(self.max_buffers);
|
||||||
@@ -160,6 +222,10 @@ pub struct PoolStats {
|
|||||||
pub hits: usize,
|
pub hits: usize,
|
||||||
/// Number of cache misses (new allocation)
|
/// Number of cache misses (new allocation)
|
||||||
pub misses: usize,
|
pub misses: usize,
|
||||||
|
/// Number of non-standard buffers replaced during return
|
||||||
|
pub replaced_nonstandard: usize,
|
||||||
|
/// Number of buffers dropped because the pool queue was full
|
||||||
|
pub dropped_pool_full: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PoolStats {
|
impl PoolStats {
|
||||||
@@ -185,6 +251,7 @@ pub struct PooledBuffer {
|
|||||||
impl PooledBuffer {
|
impl PooledBuffer {
|
||||||
/// Take the inner buffer, preventing return to pool
|
/// Take the inner buffer, preventing return to pool
|
||||||
pub fn take(mut self) -> BytesMut {
|
pub fn take(mut self) -> BytesMut {
|
||||||
|
self.pool.decrement_allocated();
|
||||||
self.buffer.take().unwrap()
|
self.buffer.take().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -364,6 +431,25 @@ mod tests {
|
|||||||
|
|
||||||
let stats = pool.stats();
|
let stats = pool.stats();
|
||||||
assert_eq!(stats.pooled, 0);
|
assert_eq!(stats.pooled, 0);
|
||||||
|
assert_eq!(stats.allocated, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_pool_replaces_oversized_buffers() {
|
||||||
|
let pool = Arc::new(BufferPool::with_config(1024, 10));
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut buf = pool.get();
|
||||||
|
buf.reserve(8192);
|
||||||
|
assert!(buf.capacity() > 2048);
|
||||||
|
}
|
||||||
|
|
||||||
|
let stats = pool.stats();
|
||||||
|
assert_eq!(stats.replaced_nonstandard, 1);
|
||||||
|
assert_eq!(stats.pooled, 1);
|
||||||
|
|
||||||
|
let buf = pool.get();
|
||||||
|
assert!(buf.capacity() <= 2048);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
Reference in New Issue
Block a user